Turi Create  4.0
groupby_aggregate_operators.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP
7 #define TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP
8 #include <core/storage/sframe_data/group_aggregate_value.hpp>
9 #include <ml/sketches/streaming_quantile_sketch.hpp>
10 namespace turi {
11 
12 
13 /**
14  * \ingroup sframe_physical
15  * \addtogroup groupby_aggregate Groupby Aggregation
16  * \{
17  */
18 
19 /**
20  * Groupby Aggregation Operators
21  */
22 namespace groupby_operators {
23 /**
24  * Implements a vector sum aggregator
25  */
27  public:
28  /// Returns a new empty instance of sum with the same type
30  vector_sum* ret = new vector_sum;
31  ret->set_input_type(value.get_type());
32  return ret;
33  }
34 
35  /// Adds a new element to be summed
36  void add_element_simple(const flexible_type& flex) {
37  // add everything except undefined
38  if (!failure && flex.get_type() != flex_type_enum::UNDEFINED) {
39  DASSERT_EQ((int)flex.get_type(), (int)value.get_type());
40  // If this is first element being added, just copy since intialized vector
41  // is size 0 and you cannot add vectors of different size.
42  if (!init){
43  value = flex;
44  init = true;
45  } else{
46  if (flex.size() != value.size()){
47  failure = true;
48  } else {
49  value += flex;
50  }
51  }
52  }
53  }
54 
55  /// combines two partial sums
56  void combine(const group_aggregate_value& other) {
57  const vector_sum& other_casted = dynamic_cast<const vector_sum&>(other);
58  // If you add vectors of different lengths, make result be undefined.
59  if (!other_casted.failure && !failure){
60  if (!init) {
61  // I am not initialized.
62  (*this) = other_casted;
63  } else if (other_casted.init) {
64  if (value.size() != other_casted.value.size()) {
65  failure = true;
66  } else {
67  value += other_casted.value;
68  }
69  }
70  }
71  }
72 
73 
74  /// Emits the sum result
75  flexible_type emit() const {
76  if (failure) {
78  }
79  return value;
80  }
81 
82  /// The types supported by the sum
83  bool support_type(flex_type_enum type) const {
84  return type == flex_type_enum::VECTOR || type == flex_type_enum::ND_VECTOR;
85  }
86 
87  /// The input type to be summed
89  value.reset(type);
90  return type;
91  }
92 
93  /// Name of the class
94  std::string name() const {
95  return "Vector Sum";
96  }
97 
98  /// Serializer
99  void save(oarchive& oarc) const {
100  oarc << value << init << failure;
101  }
102 
103  /// Deserializer
104  void load(iarchive& iarc) {
105  iarc >> value >> init >>failure;
106  }
107 
108  private:
110  bool failure = false;
111  bool init = false;
112 };
113 
114 
115 /**
116  * Implements a sum aggregator
117  */
118 class sum : public group_aggregate_value {
119  public:
120  /// Returns a new empty instance of sum with the same type
122  sum* ret = new sum;
123  ret->set_input_type(value.get_type());
124  return ret;
125  }
126 
127  /// Adds a new element to be summed
128  void add_element_simple(const flexible_type& flex) {
129  // add everything except undefined
130  if (flex.get_type() != flex_type_enum::UNDEFINED){
131  DASSERT_EQ((int)flex.get_type(), (int)value.get_type());
132  value += flex;
133  }
134  }
135 
136  /// combines two partial sums
137  void combine(const group_aggregate_value& other) {
138  value += dynamic_cast<const sum&>(other).value;
139  }
140 
141 
142  /// Emits the sum result
143  flexible_type emit() const {
144  return value;
145  }
146 
147  /// The types supported by the sum
148  bool support_type(flex_type_enum type) const {
149  return type == flex_type_enum::INTEGER ||
150  type == flex_type_enum::FLOAT;
151  }
152 
153  /// The input type to be summed
155  value.reset(type);
156  return type;
157  }
158 
159  /// Name of the class
160  std::string name() const {
161  return "Sum";
162  }
163 
164  /// Serializer
165  void save(oarchive& oarc) const {
166  oarc << value;
167  }
168 
169  /// Deserializer
170  void load(iarchive& iarc) {
171  iarc >> value;
172  }
173 
174  private:
175  flexible_type value;
176 };
177 
178 
179 /**
180  * Implements a min aggregator
181  */
182 class min: public group_aggregate_value {
183  public:
184  /// Returns a new empty instance of min with the same type
186  min* ret = new min;
187  ret->set_input_type(value.get_type());
188  return ret;
189  }
190 
191  /// Adds a new element
192  void add_element_simple(const flexible_type& flex) {
193  // add everything except undefined
194  if (flex.get_type() != flex_type_enum::UNDEFINED) {
195  DASSERT_EQ((int)flex.get_type(), (int)value.get_type());
196  if (!init) {
197  init = true;
198  value = flex;
199  } else {
200  if (value > flex)
201  value = flex;
202  }
203  }
204  }
205 
206  /// combines two partial mins
207  void combine(const group_aggregate_value& other) {
208  const min& _other = dynamic_cast<const min&>(other);
209  if (_other.init) {
210  if (!init) {
211  init = true;
212  value = _other.value;
213  } else {
214  if (value > _other.value)
215  value = _other.value;
216  }
217  }
218  }
219 
220  /// Emits the min result
221  flexible_type emit() const {
222  if (!init) return FLEX_UNDEFINED;
223  else return value;
224  }
225 
226  /// The types supported by the min
227  bool support_type(flex_type_enum type) const {
228  return type == flex_type_enum::INTEGER ||
229  type == flex_type_enum::FLOAT ||
230  type == flex_type_enum::DATETIME;
231  }
232 
233  /// The input type
235  value.reset(type);
236  return type;
237  }
238 
239  /// Name of the class
240  std::string name() const {
241  return "Min";
242  }
243 
244  /// Serializer
245  void save(oarchive& oarc) const {
246  oarc << value << init;
247  }
248 
249  /// Deserializer
250  void load(iarchive& iarc) {
251  iarc >> value >> init;
252  }
253 
254  private:
255  flexible_type value;
256  bool init = false;
257 };
258 
259 /**
260  *
261  * Implemenets a argmin aggregator which is used with arg_func
262  *
263  */
264 
266  public:
267  /// Returns a new empty instance of argmin with the same type
269  argmin* ret = new argmin;
270  return ret;
271  }
272 
273 
274  void add_element(const std::vector<flexible_type>& values) {
275  // add everything except undefined
276  DASSERT_TRUE(values.size() > 0);
277  if (values[0].get_type() != flex_type_enum::UNDEFINED) {
278  if (!init) {
279  vec_value = values;
280  init = true;
281  } else {
282  if (vec_value[0] > values[0]) vec_value = values;
283  }
284  }
285  }
286 
287  /// Adds a new element
288  void add_element_simple(const flexible_type& flex) {
289  throw "argmin does not support add_element_simple with one value";
290  }
291 
292  /// combines two partial argmins
293  void combine(const group_aggregate_value& other) {
294  const argmin& _other = dynamic_cast<const argmin&>(other);
295  if (_other.init) {
296  if (!init) {
297  vec_value = _other.vec_value;
298  init = true;
299  } else {
300  if (vec_value[0] > _other.vec_value[0]) vec_value = _other.vec_value;
301  }
302  }
303  }
304 
305  /// Emits the argmin result
306  flexible_type emit() const {
307  if (vec_value.empty()) return FLEX_UNDEFINED;
308  else return flexible_type(vec_value[1]);
309  }
310 
311  /// The types supported by the argmin
312  bool support_type(flex_type_enum type) const {
313  return type == flex_type_enum::INTEGER ||
314  type == flex_type_enum::FLOAT ||
315  type == flex_type_enum::DATETIME;
316  }
317 
318  /// The input type to be argmined
319  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
320  DASSERT_TRUE(types.size() == 2);
321  // return type of the supporting column.¬
322  return types[1];
323  }
324 
325 
327  throw ("set_input_type is not supported for argmin");
328  }
329 
330  /// Name of the class
331  std::string name() const {
332  return "argmin";
333  }
334 
335  /// Serializer
336  void save(oarchive& oarc) const {
337  oarc << vec_value << init;
338  }
339 
340  /// Deserializer
341  void load(iarchive& iarc) {
342  iarc >> vec_value >> init;
343  }
344 
345  private:
346  // The input type to be maxed.
347  std::vector<flexible_type> vec_value;
348  bool init = false;
349 };
350 
351 /**
352  *
353  * Implemenets a argmax aggregator which is used with arg_func
354  *
355  */
356 
358  public:
359  /// Returns a new empty instance of argmax with the same type
361  argmax* ret = new argmax;
362  return ret;
363  }
364 
365 
366  void add_element(const std::vector<flexible_type>& values) {
367  // add everything except undefined
368  DASSERT_TRUE(values.size() > 0);
369  if (values[0].get_type() != flex_type_enum::UNDEFINED) {
370  if (!init) {
371  vec_value = values;
372  init = true;
373  } else {
374  if (vec_value[0] < values[0]) vec_value = values;
375  }
376  }
377  }
378 
379  /// Adds a new element
380  void add_element_simple(const flexible_type& flex) {
381  throw "argmax does not support add_element_simple with one value";
382  }
383 
384  /// combines two partial argmaxes
385  void combine(const group_aggregate_value& other) {
386  const argmax& _other = dynamic_cast<const argmax&>(other);
387  if (_other.init) {
388  if (!init) {
389  vec_value = _other.vec_value;
390  init = true;
391  } else {
392  if (vec_value[0] < _other.vec_value[0]) vec_value = _other.vec_value;
393  }
394  }
395  }
396 
397  /// Emits the argmax result
398  flexible_type emit() const {
399  if (vec_value.empty()) return FLEX_UNDEFINED;
400  else return flexible_type(vec_value[1]);
401  }
402 
403  /// The types supported by the argmax
404  bool support_type(flex_type_enum type) const {
405  return type == flex_type_enum::INTEGER ||
406  type == flex_type_enum::FLOAT ||
407  type == flex_type_enum::DATETIME;
408  }
409 
410  /// The input type to be argmaxed
411  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
412  DASSERT_TRUE(types.size() == 2);
413  // return type of the supporting column.
414  return types[1];
415  }
416 
418  throw ("set_input_type is not supported for argmax");
419  }
420 
421  /// Name of the class
422  std::string name() const {
423  return "argmax";
424  }
425 
426  /// Serializer
427  void save(oarchive& oarc) const {
428  oarc << vec_value << init;
429  }
430 
431  /// Deserializer
432  void load(iarchive& iarc) {
433  iarc >> vec_value >> init;
434  }
435 
436  private:
437  // The input type to be maxed.
438  std::vector<flexible_type> vec_value;
439  bool init = false;
440 };
441 
442 /**
443  * Implements a max aggregator
444  */
445 class max: public group_aggregate_value {
446  public:
447  /// Returns a new empty instance of max with the same type
449  max* ret = new max;
450  ret->set_input_type(value.get_type());
451  return ret;
452  }
453 
454  /// Adds a new element
455  void add_element_simple(const flexible_type& flex) {
456  // add everything except undefined
457  if (flex.get_type() != flex_type_enum::UNDEFINED) {
458  DASSERT_EQ((int)flex.get_type(), (int)value.get_type());
459  if (!init) {
460  value = flex;
461  init = true;
462  } else {
463  if (value < flex) value = flex;
464  }
465  }
466  }
467 
468  /// combines two partial maxes
469  void combine(const group_aggregate_value& other) {
470  const max& _other = dynamic_cast<const max&>(other);
471  if (_other.init) {
472  if (!init) {
473  value = _other.value;
474  init = true;
475  } else {
476  if (value < _other.value) value = _other.value;
477  }
478  }
479  }
480 
481  /// Emits the max result
482  flexible_type emit() const {
483  if (!init) return FLEX_UNDEFINED;
484  else return value;
485  }
486 
487  /// The types supported by the max
488  bool support_type(flex_type_enum type) const {
489  return type == flex_type_enum::INTEGER ||
490  type == flex_type_enum::FLOAT ||
491  type == flex_type_enum::DATETIME;
492  }
493 
494  /// The input type to be maxed
496  value.reset(type);
497  return type;
498  }
499 
500  /// Name of the class
501  std::string name() const {
502  return "Max";
503  }
504 
505  /// Serializer
506  void save(oarchive& oarc) const {
507  oarc << value << init;
508  }
509 
510  /// Deserializer
511  void load(iarchive& iarc) {
512  iarc >> value >> init;
513  }
514 
515  private:
516  flexible_type value;
517  bool init = false;
518 };
519 
520 /**
521  * Implements a count aggregator
522  */
524  public:
525  /// Returns a new empty instance of count
527  count* ret = new count;
528  return ret;
529  }
530 
531  void add_element_simple(const flexible_type& flex) {
532  ++value;
533  }
534 
535  /// combines two partial counts
536  void combine(const group_aggregate_value& other) {
537  value += dynamic_cast<const count&>(other).value;
538  }
539 
540  /// Emits the count result
541  flexible_type emit() const {
542  return flexible_type(value);
543  }
544 
545  /// The types supported by the count. (everything)
546  bool support_type(flex_type_enum type) const {
547  return true;
548  }
549 
550  /// The input type
551  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
552  DASSERT_TRUE(types.size() == 0);
554  }
555 
557  throw ("set_input_type is not supported for count");
558  }
559 
560  /// Name of the class
561  std::string name() const {
562  return "Count";
563  }
564 
565  /// Serializer
566  void save(oarchive& oarc) const {
567  oarc << value;
568  }
569 
570  /// Deserializer
571  void load(iarchive& iarc) {
572  iarc >> value;
573  }
574 
575  private:
576  size_t value = 0;
577 };
578 
579 /**
580  * Implements a count non-null aggregator
581  */
583  public:
584  /// Returns a new empty instance of count
586  non_null_count* ret = new non_null_count;
587  return ret;
588  }
589 
590  void add_element_simple(const flexible_type& flex) {
591  if(flex.get_type() != flex_type_enum::UNDEFINED)
592  ++value;
593  }
594 
595  /// combines two partial counts
596  void combine(const group_aggregate_value& other) {
597  value += dynamic_cast<const non_null_count&>(other).value;
598  }
599 
600  /// Emits the count result
601  flexible_type emit() const {
602  return flexible_type(value);
603  }
604 
605  /// The types supported by the count. (everything)
606  bool support_type(flex_type_enum type) const {
607  return true;
608  }
609 
610  /// The input type
611  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
612  DASSERT_TRUE(types.size() == 0);
614  }
615 
618  }
619 
620  /// Name of the class
621  std::string name() const {
622  return "Count";
623  }
624 
625  /// Serializer
626  void save(oarchive& oarc) const {
627  oarc << value;
628  }
629 
630  /// Deserializer
631  void load(iarchive& iarc) {
632  iarc >> value;
633  }
634 
635  private:
636  size_t value = 0;
637 };
638 
639 /**
640  * Implements a vector average aggregator
641  */
643  public:
644  /// Returns a new empty instance of count
646  vector_average* ret = new vector_average;
647  ret->set_input_type(value.get_type());
648  return ret;
649  }
650 
651  /// Adds a new element to be counted
652  void add_element_simple(const flexible_type& flex) {
653  if (!failure && flex.get_type() != flex_type_enum::UNDEFINED) {
654  // Copy if not initialized.
655  if (!init){
656  ++count;
657  value = flex;
658  init = true;
659  } else{
660  if (flex.size() != value.size()){
661  failure = true;
662  } else {
663  ++count;
664  // Use recurrence relation of mean to prevent overflow
665  value += (flex - value)/double(count);
666  }
667  }
668  }
669  }
670 
671  /// combines two partial counts
672  void combine(const group_aggregate_value& other) {
673  const vector_average& other_casted = dynamic_cast<const vector_average&>(other);
674  // Set to UNDEFINED if there are different vec sizes.
675  if (!other_casted.failure && !failure){
676  if (!init){
677  (*this) = other_casted;
678  } else if (other_casted.init){
679  if (value.size() != other_casted.value.size()) {
680  failure = true;
681  } else {
682  //weighted mean
683  value = ((value * count) + (other_casted.value
684  * other_casted.count)) / (count + other_casted.count);
685  count += other_casted.count;
686  }
687  }
688  }
689  }
690 
691  /// Emits the count result
692  flexible_type emit() const {
693  if (failure) {
695  }
696  return value;
697  }
698 
699  /// The types supported by the count. (everything)
700  bool support_type(flex_type_enum type) const {
701  return type == flex_type_enum::VECTOR || type == flex_type_enum::ND_VECTOR;
702  }
703 
704  /// The input type
706  value.reset(type);
707  return type;
708 
709  }
710 
711  /// Name of the class
712  std::string name() const {
713  return "Vector Avg";
714  }
715 
716  /// Serializer
717  void save(oarchive& oarc) const {
718  oarc << value << count << init << failure;
719  }
720 
721  /// Deserializer
722  void load(iarchive& iarc) {
723  iarc >> value >> count >> init >> failure;
724  }
725 
726  private:
728  bool init = false;
729  bool failure = false;
730  size_t count = 0;
731 
732 };
733 
734 /**
735  * Implements an average aggregator
736  */
738  public:
739  /// Returns a new empty instance of count
741  average* ret = new average;
742  return ret;
743  }
744 
745  /// Adds a new element to be counted
746  void add_element_simple(const flexible_type& flex) {
747  if (flex != FLEX_UNDEFINED) {
748  ++count;
749  // Use recurrence relation of mean to prevent overflow
750  value += ((double)flex - value)/double(count);
751  }
752  }
753 
754  /// combines two partial counts
755  void combine(const group_aggregate_value& other) {
756  const average& other_casted = dynamic_cast<const average&>(other);
757  //weighted mean
758  if (count + other_casted.count > 0){
759  value = ((value * count) + (other_casted.value
760  * other_casted.count)) / (count + other_casted.count);
761  count += other_casted.count;
762  }
763  }
764 
765  /// Emits the count result
766  flexible_type emit() const {
767  if (count == 0) return FLEX_UNDEFINED;
768  else return value;
769  }
770 
771  /// The types supported by the count. (everything)
772  bool support_type(flex_type_enum type) const {
773  return (type == flex_type_enum::INTEGER ||
774  type == flex_type_enum::FLOAT);
775  }
776 
777  /// The input type
779  return flex_type_enum::FLOAT;
780  }
781 
782  /// Name of the class
783  std::string name() const {
784  return "Avg";
785  }
786 
787  /// Serializer
788  void save(oarchive& oarc) const {
789  oarc << value << count;
790  }
791 
792  /// Deserializer
793  void load(iarchive& iarc) {
794  iarc >> value >> count;
795  }
796 
797  private:
798  double value = 0;
799  size_t count = 0;
800 };
801 
802 
803 /**
804  * Impelments the variance operator. Algorithm adapted from
805  * http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
806  */
808  public:
809  /// Returns a new empty instance of count
811  variance* ret = new variance;
812  return ret;
813  }
814 
815  /// Adds a new element to be counted
816  void add_element_simple(const flexible_type& flex) {
817  if (flex != FLEX_UNDEFINED) {
818  ++count;
819  double delta = (double)flex - mean;
820  mean += delta / count;
821  M2 += delta * ((double)flex - mean);
822  }
823  }
824 
825  /// combines two partial counts
826  void combine(const group_aggregate_value& other) {
827  const variance& _other = dynamic_cast<const variance&>(other);
828  if (_other.count == 0) {
829  return;
830  } else if (count == 0) {
831  mean = _other.mean;
832  count = _other.count;
833  M2 = _other.M2;
834  } else {
835  double delta = _other.mean - mean;
836  mean = ((mean * count) + (_other.mean * _other.count)) / (count + _other.count);
837  M2 += _other.M2 + delta * delta * _other.count * count / (count + _other.count);
838  count += _other.count;
839  }
840  }
841 
842  /// Emits the count result
843  virtual flexible_type emit() const {
844  return count <= 1 ? flexible_type(0.0) : flexible_type(M2 / (count));
845  }
846 
847  /// The types supported by the count.
848  bool support_type(flex_type_enum type) const {
849  return (type == flex_type_enum::INTEGER ||
850  type == flex_type_enum::FLOAT);
851  }
852 
853  /// The input type
855  return flex_type_enum::FLOAT;
856  }
857 
858  /// Name of the class
859  virtual std::string name() const {
860  return "Var";
861  }
862 
863  /// Serializer
864  void save(oarchive& oarc) const {
865  oarc << count << mean << M2;
866  }
867 
868  /// Deserializer
869  void load(iarchive& iarc) {
870  iarc >> count >> mean >> M2;
871  }
872 
873  virtual void print(std::ostream& os) const {
874  os << this->name() << "("
875  << "value = " << this->emit() << ", "
876  << "count = " << this->count << ", "
877  << "mean = " << this->mean << ", "
878  << "M2 = " << this->M2
879  << ")";
880  }
881 
882  protected:
883  size_t count = 0;
884  double mean = 0;
885  double M2 = 0;
886 };
887 
888 
889 class stdv : public variance {
890  public:
891  group_aggregate_value* new_instance() const override {
892  variance* ret = new stdv;
893  return ret;
894  }
895 
896  /// Name of the class
897  virtual std::string name() const override {
898  return "Stdv";
899  }
900 
901  /// Emits the count result
902  virtual flexible_type emit() const override {
903  return flexible_type(std::sqrt((double)(variance::emit())));
904  }
905 };
906 
907 
908 
909 /**
910  * Impelments the quantile operator.
911  */
913  public:
914 
915  /**
916  * Used to initialize the operator. To set the configuration for
917  * what quantiles to query.
918  */
919  void init(const std::vector<double>& quantiles_to_query) {
920  m_quantiles = quantiles_to_query;
921  }
922 
923  /** Returns a new empty instance of quantile with the same set
924  * of quantiles to be queried
925  */
927  quantile* ret = new quantile;
928  ret->m_quantiles = m_quantiles;
929  return ret;
930  }
931 
932  /// Adds a new element
933  void add_element_simple(const flexible_type& flex) {
934  if (flex != FLEX_UNDEFINED) {
935  m_sketch.add((double)(flex));
936  }
937  }
938 
939  /// Done adding elements
941  m_sketch.substream_finalize();
942  }
943 
944  /// combines two partial quantile sketches
945  void combine(const group_aggregate_value& other) {
946  const quantile& other_quantile = dynamic_cast<const quantile&>(other);
947  m_sketch.combine(other_quantile.m_sketch);
948  }
949 
950  /// Emits the desired quantiles
951  virtual flexible_type emit() const {
952  m_sketch.combine_finalize();
954  for (size_t i = 0; i < m_quantiles.size(); ++i) {
955  ret.push_back(m_sketch.query_quantile(m_quantiles[i]));
956  }
957  return ret;
958  }
959 
960  /// The types supported by the quantile sketch (int, float)
961  bool support_type(flex_type_enum type) const {
962  return (type == flex_type_enum::INTEGER ||
963  type == flex_type_enum::FLOAT);
964  }
965 
966  /// The input type
968  return flex_type_enum::VECTOR;
969  }
970 
971  /// Name of the class
972  virtual std::string name() const {
973  return "Quantiles";
974  }
975 
976  /// Serializer
977  void save(oarchive& oarc) const {
978  oarc << m_quantiles << m_sketch;
979  }
980 
981  /// Deserializer
982  void load(iarchive& iarc) {
983  iarc >> m_quantiles >> m_sketch;
984  }
985 
986  private:
987  std::vector<double> m_quantiles;
989 };
990 
991 
992 /**
993  * Implements an aggregator that convert two values from two column
994  * into a key/value
995  * value inside a dictionary
996  */
998  public:
999  /// Returns a new empty instance of sum with the same type
1001  zip_dict* ret = new zip_dict;
1002  return ret;
1003  }
1004 
1005  void add_element(const std::vector<flexible_type>& values) {
1006  DASSERT_TRUE(values.size() == 2);
1007 
1008  if (values[0].get_type() != flex_type_enum::UNDEFINED) {
1009  m_value.insert(std::make_pair(values[0], values[1]));
1010  } else {
1011  m_missing_value = true;
1012  }
1013  }
1014 
1015  void add_element_simple(const flexible_type& flex) {
1016  throw "zip_dict does not support add_element_simple with one value";
1017  }
1018 
1019  /// combines two partial zip
1020  void combine(const group_aggregate_value& other) {
1021  auto v = dynamic_cast<const zip_dict&>(other);
1022  m_missing_value |= v.m_missing_value;
1023 
1024  if (!m_missing_value) {
1025  m_value.insert(v.m_value.begin(), v.m_value.end());
1026  }
1027  }
1028 
1029  /// Emits the zip result
1031  // emit None if all we got is missing key
1032  if (m_missing_value && m_value.size() == 0) {
1033  return flex_dict();
1034  } else {
1035  flex_dict ret;
1036  ret.insert(ret.end(), m_value.begin(), m_value.end());
1037  return ret;
1038  }
1039  }
1040 
1041  /// The types supported by the zip
1042  bool support_type(flex_type_enum type) const {
1043  return true;
1044  }
1045 
1046  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
1047  DASSERT_TRUE(types.size() == 2);
1048  return flex_type_enum::DICT;
1049  }
1050 
1052  throw ("set_input_type is not supported for zip_dict");
1053  }
1054 
1055  /// Name of the class
1056  std::string name() const {
1057  return "Dict";
1058  }
1059 
1060  /// Serializer
1061  void save(oarchive& oarc) const {
1062  oarc << m_missing_value << m_value;
1063  }
1064 
1065  /// Deserializer
1066  void load(iarchive& iarc) {
1067  iarc >> m_missing_value >> m_value;
1068  }
1069 
1070  private:
1071  std::map<flexible_type, flexible_type> m_value;
1072  bool m_missing_value = false;
1073 };
1074 
1075 
1076 /**
1077  * Implements an aggregator that combine values from mutiple rows into one list value
1078  */
1080  public:
1081  /// Returns a new empty instance of sum with the same type
1083  zip_list* ret = new zip_list;
1084  ret->m_is_float = m_is_float;
1085  return ret;
1086  }
1087 
1088  void add_element_simple(const flexible_type& flex) {
1089  if (flex == FLEX_UNDEFINED) {
1090  m_missing_value = true;
1091  } else {
1092  m_value.push_back(flex);
1093  }
1094  }
1095 
1096  /// combines two partial zip
1097  void combine(const group_aggregate_value& other) {
1098  auto v = dynamic_cast<const zip_list&>(other);
1099  m_missing_value |= v.m_missing_value;
1100  std::copy(v.m_value.begin(), v.m_value.end(), back_inserter(m_value));
1101  }
1102 
1103  /// Emits the zip result
1105  if (m_missing_value && m_value.size() == 0) {
1106  if (m_is_float) return flex_vec();
1107  else return flex_list();
1108  } else {
1109  if (m_is_float) {
1110  return flex_vec(m_value.begin(), m_value.end());
1111  } else {
1112  return flex_list(std::move(m_value));
1113  }
1114  }
1115  }
1116 
1117  /// The types supported by the zip
1118  bool support_type(flex_type_enum type) const {
1119  return true;
1120  }
1121 
1122  flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
1123  if (types[0] == flex_type_enum::FLOAT) {
1124  m_is_float = true;
1125  return flex_type_enum::VECTOR;
1126  } else {
1127  m_is_float = false;
1128  return flex_type_enum::LIST;
1129  }
1130  }
1131 
1133  throw ("set_input_type is not supported for zip_list");
1134  }
1135 
1136  /// Name of the class
1137  std::string name() const {
1138  return "List";
1139  }
1140 
1141  /// Serializer
1142  void save(oarchive& oarc) const {
1143  oarc << m_missing_value << m_is_float << m_value;
1144  }
1145 
1146  /// Deserializer
1147  void load(iarchive& iarc) {
1148  iarc >> m_missing_value >> m_is_float >> m_value;
1149  }
1150 
1151  private:
1152  std::vector<flexible_type> m_value;
1153  bool m_missing_value = false;
1154  bool m_is_float;
1155 };
1156 
1157 /**
1158  * Implements a select one aggregator.
1159  *
1160  * This will select one occurance of the given column. There is no guarantee
1161  * which one will be selected. It depends on how the groupby is implemented.
1162  * I believe it will select the 1st one though.
1163  */
1165  public:
1166 
1167  /// Returns a new empty instance of select_nth with the same type
1169  return new select_one;
1170  }
1171 
1172  /// Adds a new element to be summed
1173  void add_element_simple(const flexible_type& flex) {
1174  if (!m_has_value) {
1175  m_value = flex;
1176  m_has_value = true;
1177  }
1178  }
1179 
1180  /// combines two partial sums
1181  void combine(const group_aggregate_value& other) { }
1182 
1183  /// Emits the sum result
1185  if (m_has_value == false) return FLEX_UNDEFINED;
1186  else return m_value;
1187  }
1188 
1189  /// The types supported by the sum
1190  bool support_type(flex_type_enum type) const {
1191  return true;
1192  }
1193 
1194  /// The input type to be summed
1196  return type;
1197  }
1198 
1199  /// Name of the class
1200  std::string name() const {
1201  return "Select One";
1202  }
1203 
1204  /// Serializer
1205  void save(oarchive& oarc) const {
1206  oarc << m_has_value << m_value;
1207  }
1208 
1209  /// Deserializer
1210  void load(iarchive& iarc) {
1211  iarc >> m_has_value >> m_value;
1212  }
1213 
1214  private:
1215  flexible_type m_value;
1216  bool m_has_value = false;
1217 };
1218 
1219 
1220 /**
1221  * Implements an aggregator that computes the exact number of unique elements
1222  */
1224  public:
1225  /// Returns a new empty instance of sum with the same type
1227  count_distinct* ret = new count_distinct;
1228  return ret;
1229  }
1230 
1231  void add_element_simple(const flexible_type& flex) {
1232  m_values.insert(flex);
1233  }
1234 
1235  /// combines two partial zip
1236  void combine(const group_aggregate_value& other) {
1237  auto& v = dynamic_cast<const count_distinct&>(other);
1238  m_values.insert(v.m_values.begin(), v.m_values.end());
1239  }
1240 
1241  /// Emits the zip result
1243  return m_values.size();
1244  }
1245 
1246  /// The types supported by the zip
1247  bool support_type(flex_type_enum type) const {
1248  return true;
1249  }
1250 
1252  return flex_type_enum::INTEGER;
1253  }
1254 
1255  /// Name of the class
1256  std::string name() const {
1257  return "Count Distinct";
1258  }
1259 
1260  /// Serializer
1261  void save(oarchive& oarc) const {
1262  oarc << m_values;
1263  }
1264 
1265  /// Deserializer
1266  void load(iarchive& iarc) {
1267  iarc >> m_values;
1268  }
1269 
1270  protected:
1271  std::unordered_set<flexible_type> m_values;
1272 };
1273 
1274 /**
1275  * Implements an aggregator that keeps track of the unique elements
1276  */
1277 class distinct: public count_distinct {
1278  public:
1279 
1281  distinct* ret = new distinct;
1282  return ret;
1283  }
1284 
1285  /// Emits the zip result
1286  flexible_type emit() const override {
1287  flex_list ret(m_values.size());
1288  size_t i = 0;
1289  for (const auto& k: m_values) {
1290  ret[i] = k;
1291  i++;
1292  }
1293  return ret;
1294  }
1295 
1297  return flex_type_enum::LIST;
1298  }
1299 
1300  /// Name of the class
1301  std::string name() const override {
1302  return "Distinct";
1303  }
1304 
1305 };
1306 
1307 /**
1308  * Implements an aggregator that computes frequncies for each unique value.
1309  */
1311  public:
1312  /// Returns a new empty instance of sum with the same type
1314  frequency_count* ret = new frequency_count;
1315  return ret;
1316  }
1317 
1318  void add_element_simple(const flexible_type& flex) {
1319  m_values[flex] += 1;
1320  }
1321 
1322  /// combines two partial zip
1323  void combine(const group_aggregate_value& other) {
1324  auto& v = dynamic_cast<const frequency_count&>(other);
1325  for (const auto& kvp : v.m_values) {
1326  const auto& key = kvp.first;
1327  const auto& value = kvp.second;
1328  if (m_values.find(key) != m_values.end()) {
1329  // combine both counts
1330  m_values[key] += value;
1331  } else {
1332  m_values.insert(kvp);
1333  }
1334  }
1335  }
1336 
1337  /// Emits the zip result
1339  flex_dict ret(m_values.size());
1340  size_t i = 0;
1341  for (const auto& kvp: m_values) {
1342  ret[i] = {kvp.first, flex_int(kvp.second)};
1343  i++;
1344  }
1345  return ret;
1346  }
1347 
1348  bool support_type(flex_type_enum type) const {
1349  return (type == flex_type_enum::INTEGER||
1350  type == flex_type_enum::STRING);
1351  }
1352 
1354  return flex_type_enum::DICT;
1355  }
1356 
1357  /// Name of the class
1358  std::string name() const {
1359  return "Frequency Count";
1360  }
1361 
1362  /// Serializer
1363  void save(oarchive& oarc) const {
1364  oarc << m_values;
1365  }
1366 
1367  /// Deserializer
1368  void load(iarchive& iarc) {
1369  iarc >> m_values;
1370  }
1371 
1372  protected:
1373  std::unordered_map<flexible_type, size_t> m_values;
1374 };
1375 
1376 } // namespace groupby_operators
1377 
1378 /// \}
1379 } // namespace turi
1380 #endif //TURI_SFRAME_GROUPBY_AGGREGATE_OPERATORS_HPP
virtual std::string name() const
Name of the class.
flexible_type emit() const
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element.
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
flexible_type emit() const
Emits the zip result.
std::vector< double > flex_vec
void combine(const group_aggregate_value &other)
combines two partial zip
flex_type_enum set_input_type(flex_type_enum type)
The input type.
std::string name() const
Name of the class.
group_aggregate_value * new_instance() const
Returns a new empty instance of min with the same type.
bool support_type(flex_type_enum type) const
The types supported by the quantile sketch (int, float)
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
Adds a new element.
flexible_type emit() const
Emits the count result.
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::string name() const
Name of the class.
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial argmins
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
void combine(const group_aggregate_value &other)
combines two partial zip
std::string name() const
Name of the class.
virtual group_aggregate_value * new_instance() const
Returns a new empty instance of count.
void add_element_simple(const flexible_type &flex)
Adds a new element.
group_aggregate_value * new_instance() const
Returns a new empty instance of argmin with the same type.
bool support_type(flex_type_enum type) const
The types supported by the sum.
bool support_type(flex_type_enum type) const
The types supported by the argmax.
void combine(const group_aggregate_value &other)
combines two partial quantile sketches
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
group_aggregate_value * new_instance() const
Returns a new empty instance of argmax with the same type.
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
bool support_type(flex_type_enum type) const
The types supported by the zip.
bool support_type(flex_type_enum type) const
The types supported by the min.
void add_element_simple(const flexible_type &flex)
void load(iarchive &iarc)
Deserializer.
void combine(const group_aggregate_value &other)
combines two partial counts
void load(iarchive &iarc)
Deserializer.
void load(iarchive &iarc)
Deserializer.
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
flexible_type emit() const
Emits the sum result.
void combine(const group_aggregate_value &other)
combines two partial sums
group_aggregate_value * new_instance() const override
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
flexible_type emit() const
Emits the argmin result.
void save(oarchive &oarc) const
Serializer.
void save(oarchive &oarc) const
Serializer.
flexible_type emit() const
Emits the zip result.
flexible_type emit() const
Emits the sum result.
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial argmaxes
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
group_aggregate_value * new_instance() const
void add_element_simple(const flexible_type &flex)
void save(oarchive &oarc) const
Serializer.
void load(iarchive &iarc)
Deserializer.
void combine(const group_aggregate_value &other)
combines two partial sums
void combine(const group_aggregate_value &other)
combines two partial maxes
std::string name() const
Name of the class.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
std::string name() const override
Name of the class.
void add_element(const std::vector< flexible_type > &values)
bool support_type(flex_type_enum type) const
The types supported by the max.
void add_element_simple(const flexible_type &flex)
Adds a new element.
void combine(const group_aggregate_value &other)
combines two partial counts
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type to be argmaxed.
bool support_type(flex_type_enum type) const
The types supported by the zip.
void combine(const group_aggregate_value &other)
combines two partial zip
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
void push_back(flex_float i)
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
bool support_type(flex_type_enum type) const
The types supported by the count. (everything)
flexible_type emit() const override
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element.
void load(iarchive &iarc)
Deserializer.
flexible_type emit() const
Emits the count result.
void save(oarchive &oarc) const
Serializer.
flexible_type emit() const
Emits the max result.
void copy(const std::string src, const std::string dest)
void load(iarchive &iarc)
Deserializer.
std::string name() const
Name of the class.
std::string name() const
Name of the class.
flex_type_enum get_type() const
void save(oarchive &oarc) const
Serializer.
void save(oarchive &oarc) const
Serializer.
void add_element(const std::vector< flexible_type > &values)
void load(iarchive &iarc)
Deserializer.
void load(iarchive &iarc)
Deserializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of select_nth with the same type.
group_aggregate_value * new_instance() const
Returns a new empty instance of max with the same type.
std::string name() const
Name of the class.
void init(const std::vector< double > &quantiles_to_query)
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
virtual flexible_type emit() const
Emits the count result.
void load(iarchive &iarc)
Deserializer.
bool support_type(flex_type_enum type) const
The types supported by the zip.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial counts
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be summed.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
void combine(const group_aggregate_value &other)
combines two partial mins
void load(iarchive &iarc)
Deserializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
flexible_type emit() const
Emits the argmax result.
void save(oarchive &oarc) const
Serializer.
void load(iarchive &iarc)
Deserializer.
bool support_type(flex_type_enum type) const
The types supported by the sum.
flexible_type emit() const
Emits the min result.
bool support_type(flex_type_enum type) const
The types supported by the count.
bool support_type(flex_type_enum type) const
The types supported by the sum.
flexible_type emit() const
Emits the count result.
std::set< T > values(const std::map< Key, T > &map)
Definition: stl_util.hpp:386
void combine(const group_aggregate_value &other)
combines two partial counts
flexible_type emit() const
Emits the count result.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
void combine(const group_aggregate_value &other)
combines two partial sums
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type to be argmined.
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
void save(oarchive &oarc) const
Serializer.
void add_element_simple(const flexible_type &flex)
Adds a new element to be counted.
flexible_type emit() const
Emits the zip result.
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type.
std::string name() const
Name of the class.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
std::string name() const
Name of the class.
virtual flexible_type emit() const
Emits the desired quantiles.
std::vector< std::pair< flexible_type, flexible_type > > flex_dict
void load(iarchive &iarc)
Deserializer.
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
flexible_type emit() const
Emits the sum result.
void save(oarchive &oarc) const
Serializer.
flex_type_enum set_input_type(flex_type_enum type)
The input type to be maxed.
group_aggregate_value * new_instance() const
Returns a new empty instance of count.
void combine(const group_aggregate_value &other)
combines two partial zip
bool support_type(flex_type_enum type) const
The types supported by the argmin.
static flexible_type FLEX_UNDEFINED
virtual std::string name() const
Name of the class.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
virtual void print(std::ostream &os) const
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
void combine(const group_aggregate_value &other)
combines two partial counts
void save(oarchive &oarc) const
Serializer.
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
void add_element(const std::vector< flexible_type > &values)
void reset(flex_type_enum target_type)
flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
The input type.
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
std::string name() const
Name of the class.