6 #ifndef TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP 7 #define TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP 8 #include <core/storage/sframe_data/group_aggregate_value.hpp> 9 #include <ml/sketches/streaming_quantile_sketch.hpp> 22 namespace groupby_operators {
59 if (!other_casted.failure && !failure){
62 (*this) = other_casted;
63 }
else if (other_casted.init) {
64 if (value.
size() != other_casted.value.
size()) {
67 value += other_casted.value;
100 oarc << value << init << failure;
105 iarc >> value >> init >>failure;
110 bool failure =
false;
131 DASSERT_EQ((
int)flex.
get_type(), (int)value.get_type());
138 value +=
dynamic_cast<const sum&
>(other).value;
195 DASSERT_EQ((
int)flex.
get_type(), (int)value.get_type());
208 const min& _other =
dynamic_cast<const min&
>(other);
212 value = _other.value;
214 if (value > _other.value)
215 value = _other.value;
246 oarc << value << init;
251 iarc >> value >> init;
282 if (vec_value[0] > values[0]) vec_value =
values;
289 throw "argmin does not support add_element_simple with one value";
294 const argmin& _other =
dynamic_cast<const argmin&
>(other);
297 vec_value = _other.vec_value;
300 if (vec_value[0] > _other.vec_value[0]) vec_value = _other.vec_value;
327 throw (
"set_input_type is not supported for argmin");
337 oarc << vec_value << init;
342 iarc >> vec_value >> init;
347 std::vector<flexible_type> vec_value;
374 if (vec_value[0] < values[0]) vec_value =
values;
381 throw "argmax does not support add_element_simple with one value";
386 const argmax& _other =
dynamic_cast<const argmax&
>(other);
389 vec_value = _other.vec_value;
392 if (vec_value[0] < _other.vec_value[0]) vec_value = _other.vec_value;
418 throw (
"set_input_type is not supported for argmax");
428 oarc << vec_value << init;
433 iarc >> vec_value >> init;
438 std::vector<flexible_type> vec_value;
458 DASSERT_EQ((
int)flex.
get_type(), (int)value.get_type());
463 if (value < flex) value = flex;
470 const max& _other =
dynamic_cast<const max&
>(other);
473 value = _other.value;
476 if (value < _other.value) value = _other.value;
507 oarc << value << init;
512 iarc >> value >> init;
537 value +=
dynamic_cast<const count&
>(other).value;
557 throw (
"set_input_type is not supported for count");
660 if (flex.
size() != value.size()){
665 value += (flex - value)/
double(
count);
675 if (!other_casted.failure && !failure){
677 (*this) = other_casted;
678 }
else if (other_casted.init){
679 if (value.size() != other_casted.value.
size()) {
683 value = ((value *
count) + (other_casted.value
684 * other_casted.count)) / (
count + other_casted.count);
685 count += other_casted.count;
718 oarc << value <<
count << init << failure;
723 iarc >> value >>
count >> init >> failure;
729 bool failure =
false;
750 value += ((double)flex - value)/double(
count);
756 const average& other_casted =
dynamic_cast<const average&
>(other);
758 if (
count + other_casted.count > 0){
759 value = ((value *
count) + (other_casted.value
760 * other_casted.count)) / (
count + other_casted.count);
761 count += other_casted.count;
789 oarc << value <<
count;
794 iarc >> value >>
count;
819 double delta = (double)flex - mean;
820 mean += delta /
count;
821 M2 += delta * ((double)flex - mean);
828 if (_other.count == 0) {
830 }
else if (
count == 0) {
832 count = _other.count;
835 double delta = _other.mean - mean;
836 mean = ((mean *
count) + (_other.mean * _other.count)) / (
count + _other.count);
837 M2 += _other.M2 + delta * delta * _other.count *
count / (
count + _other.count);
838 count += _other.count;
859 virtual std::string
name()
const {
865 oarc <<
count << mean << M2;
870 iarc >>
count >> mean >> M2;
873 virtual void print(std::ostream& os)
const {
874 os << this->
name() <<
"(" 875 <<
"value = " << this->
emit() <<
", " 876 <<
"count = " << this->
count <<
", " 877 <<
"mean = " << this->mean <<
", " 878 <<
"M2 = " << this->M2
897 virtual std::string
name()
const override {
919 void init(
const std::vector<double>& quantiles_to_query) {
920 m_quantiles = quantiles_to_query;
928 ret->m_quantiles = m_quantiles;
935 m_sketch.add((
double)(flex));
941 m_sketch.substream_finalize();
947 m_sketch.
combine(other_quantile.m_sketch);
952 m_sketch.combine_finalize();
954 for (
size_t i = 0; i < m_quantiles.size(); ++i) {
955 ret.
push_back(m_sketch.query_quantile(m_quantiles[i]));
972 virtual std::string
name()
const {
978 oarc << m_quantiles << m_sketch;
983 iarc >> m_quantiles >> m_sketch;
987 std::vector<double> m_quantiles;
1009 m_value.insert(std::make_pair(values[0], values[1]));
1011 m_missing_value =
true;
1016 throw "zip_dict does not support add_element_simple with one value";
1021 auto v =
dynamic_cast<const zip_dict&
>(other);
1022 m_missing_value |= v.m_missing_value;
1024 if (!m_missing_value) {
1025 m_value.insert(v.m_value.begin(), v.m_value.end());
1032 if (m_missing_value && m_value.size() == 0) {
1036 ret.insert(ret.end(), m_value.begin(), m_value.end());
1052 throw (
"set_input_type is not supported for zip_dict");
1062 oarc << m_missing_value << m_value;
1067 iarc >> m_missing_value >> m_value;
1071 std::map<flexible_type, flexible_type> m_value;
1072 bool m_missing_value =
false;
1084 ret->m_is_float = m_is_float;
1090 m_missing_value =
true;
1092 m_value.push_back(flex);
1098 auto v =
dynamic_cast<const zip_list&
>(other);
1099 m_missing_value |= v.m_missing_value;
1100 std::copy(v.m_value.begin(), v.m_value.end(), back_inserter(m_value));
1105 if (m_missing_value && m_value.size() == 0) {
1110 return flex_vec(m_value.begin(), m_value.end());
1133 throw (
"set_input_type is not supported for zip_list");
1143 oarc << m_missing_value << m_is_float << m_value;
1148 iarc >> m_missing_value >> m_is_float >> m_value;
1152 std::vector<flexible_type> m_value;
1153 bool m_missing_value =
false;
1186 else return m_value;
1201 return "Select One";
1206 oarc << m_has_value << m_value;
1211 iarc >> m_has_value >> m_value;
1216 bool m_has_value =
false;
1232 m_values.insert(flex);
1238 m_values.insert(v.m_values.begin(), v.m_values.end());
1243 return m_values.
size();
1257 return "Count Distinct";
1271 std::unordered_set<flexible_type> m_values;
1289 for (
const auto& k: m_values) {
1301 std::string
name()
const override {
1319 m_values[flex] += 1;
1325 for (
const auto& kvp : v.m_values) {
1326 const auto& key = kvp.first;
1327 const auto& value = kvp.second;
1328 if (m_values.find(key) != m_values.end()) {
1330 m_values[key] += value;
1332 m_values.insert(kvp);
1341 for (
const auto& kvp: m_values) {
1342 ret[i] = {kvp.first,
flex_int(kvp.second)};
1359 return "Frequency Count";
1373 std::unordered_map<flexible_type, size_t> m_values;
1380 #endif //TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP
virtual std::string name() const
Name of the class.
flexible_type emit() const
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element.
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
flexible_type emit() const
Emits the zip result.
std::vector< double > flex_vec
void combine(const group_aggregate_value &other)
combines two partial zip
flex_type_enum set_input_type(flex_type_enum type)
The input type.
std::string name() const
Name of the class.
group_aggregate_value * new_instance() const
Returns a new empty instance of min with the same type.
bool support_type(flex_type_enum type) const
The types supported by the quantile sketch (int, float)
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
Adds a new element.
void partial_finalize()
Done adding elements.
flexible_type emit() const
Emits the count result.
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
std::string name() const
Name of the class.
void add_element_simple(const flexible_type &flex)
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial argmins
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
void combine(const group_aggregate_value &other)
combines two partial zip
std::string name() const
Name of the class.
virtual group_aggregate_value * new_instance() const
Returns a new empty instance of count.
void add_element_simple(const flexible_type &flex)
Adds a new element.
group_aggregate_value * new_instance() const
Returns a new empty instance of argmin with the same type.
bool support_type(flex_type_enum type) const
The types supported by the sum.
bool support_type(flex_type_enum type) const
The types supported by the argmax.
void combine(const group_aggregate_value &other)
combines two partial quantile sketches
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
void add_element_simple(const flexible_type &flex)
group_aggregate_value * new_instance() const
Returns a new empty instance of argmax with the same type.
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
bool support_type(flex_type_enum type) const
The types supported by the zip.
bool support_type(flex_type_enum type) const
The types supported by the min.
void add_element_simple(const flexible_type &flex)
void load(iarchive &iarc)
Deserializer.
void combine(const group_aggregate_value &other)
combines two partial counts
void load(iarchive &iarc)
Deserializer.
void load(iarchive &iarc)
Deserializer.
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
flexible_type emit() const
Emits the sum result.
void combine(const group_aggregate_value &other)
combines two partial sums
group_aggregate_value * new_instance() const override
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
flexible_type emit() const
Emits the argmin result.
void save(oarchive &oarc) const
Serializer.
void save(oarchive &oarc) const
Serializer.
flexible_type emit() const
Emits the zip result.
flexible_type emit() const
Emits the sum result.
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial argmaxes
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
group_aggregate_value * new_instance() const
void add_element_simple(const flexible_type &flex)
void save(oarchive &oarc) const
Serializer.
void load(iarchive &iarc)
Deserializer.
void combine(const group_aggregate_value &other)
combines two partial sums
void combine(const group_aggregate_value &other)
combines two partial maxes
void add_element_simple(const flexible_type &flex)
std::string name() const
Name of the class.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
std::string name() const override
Name of the class.
void add_element(const std::vector< flexible_type > &values)
bool support_type(flex_type_enum type) const
The types supported by the max.
void add_element_simple(const flexible_type &flex)
Adds a new element.
void combine(const group_aggregate_value &other)
combines two partial counts
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type to be argmaxed.
void load(iarchive &iarc)
Deserializer.
bool support_type(flex_type_enum type) const
The types supported by the zip.
void combine(const group_aggregate_value &other)
combines two partial zip
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
void push_back(flex_float i)
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
flexible_type emit() const override
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element.
void load(iarchive &iarc)
Deserializer.
flexible_type emit() const
Emits the count result.
void save(oarchive &oarc) const
Serializer.
flexible_type emit() const
Emits the max result.
void copy(const std::string src, const std::string dest)
void load(iarchive &iarc)
Deserializer.
std::string name() const
Name of the class.
std::string name() const
Name of the class.
flex_type_enum get_type() const
void save(oarchive &oarc) const
Serializer.
void save(oarchive &oarc) const
Serializer.
void add_element(const std::vector< flexible_type > &values)
void load(iarchive &iarc)
Deserializer.
void load(iarchive &iarc)
Deserializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of select_nth with the same type.
group_aggregate_value * new_instance() const
Returns a new empty instance of max with the same type.
std::string name() const
Name of the class.
void init(const std::vector< double > &quantiles_to_query)
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
virtual flexible_type emit() const
Emits the count result.
void load(iarchive &iarc)
Deserializer.
bool support_type(flex_type_enum type) const
The types supported by the zip.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial counts
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
void combine(const group_aggregate_value &other)
combines two partial mins
void load(iarchive &iarc)
Deserializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
flexible_type emit() const
Emits the argmax result.
void save(oarchive &oarc) const
Serializer.
void load(iarchive &iarc)
Deserializer.
bool support_type(flex_type_enum type) const
The types supported by the sum.
flexible_type emit() const
Emits the min result.
bool support_type(flex_type_enum type) const
The types supported by the count.
bool support_type(flex_type_enum type) const
The types supported by the sum.
flexible_type emit() const
Emits the count result.
void load(iarchive &iarc)
Deserializer.
std::set< T > values(const std::map< Key, T > &map)
void combine(const group_aggregate_value &other)
combines two partial counts
flexible_type emit() const
Emits the count result.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial sums
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type to be argmined.
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
flexible_type emit() const
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
std::string name() const
Name of the class.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
virtual flexible_type emit() const
Emits the desired quantiles.
std::vector< std::pair< flexible_type, flexible_type > > flex_dict
void load(iarchive &iarc)
Deserializer.
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
flexible_type emit() const
Emits the sum result.
void save(oarchive &oarc) const
Serializer.
void load(iarchive &iarc)
Deserializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be maxed.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
void combine(const group_aggregate_value &other)
combines two partial zip
bool support_type(flex_type_enum type) const
bool support_type(flex_type_enum type) const
The types supported by the argmin.
static flexible_type FLEX_UNDEFINED
virtual std::string name() const
Name of the class.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
virtual void print(std::ostream &os) const
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
void combine(const group_aggregate_value &other)
combines two partial counts
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
void load(iarchive &iarc)
Deserializer.
void add_element(const std::vector< flexible_type > &values)
void reset(flex_type_enum target_type)
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type.
void load(iarchive &iarc)
Deserializer.
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
std::string name() const
Name of the class.