Turi Create  4.0
group_aggregate_value.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_GROUP_AGGREGATE_VALUE_HPP
7 #define TURI_SFRAME_GROUP_AGGREGATE_VALUE_HPP
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 
11 namespace turi {
12 
13 
14 /**
15  * \ingroup sframe_physical
16  * \addtogroup groupby_aggregate Groupby Aggregation
17  * \{
18  */
19 
20 /**
21  * The base class specification for representing the intermediate state as well
22  * as the computation (aggregation, combining and output) for an aggregation
23  * operation.
24  *
25  * Essentially, the group_aggregate_value must describe a parallel reduction in
26  * the following form:
27  *
28  * \code
29  * for streamid in data_stream:
30  * for data in stream[streamid]:
31  * value[streamid].add_element(data)
32  * value[streamid].partial_finalize()
33  *
34  * for streamid in data_stream:
35  * final_value.combine(value[streamid])
36  *
37  * output = final_value.emit()
38  * \endcode
39  *
40  * Each value could have an arbitrary number of elements inserted into it.
41  * When all insertions are complete, partial_finalize() is called on the value.
42  * Values can be combined with each other to form a final value, which then
43  * emits a response.
44  */
46  public:
47  /**
48  * Creates a new instance of the aggregator. The new instance must
49  * remember the input type (see set_input_type()) but have have a new
50  * empty value.
51  */
52  virtual group_aggregate_value* new_instance() const = 0;
53 
54  /**
55  * Adds an element to the aggregate. Elements to be added will be either
56  * the input_type (as set by set_input_type()) or UNDEFINED.
57  *
58  * Operator that expects more than one input values need to overwrite this function
59  */
60  virtual void add_element(const std::vector<flexible_type>& values) {
61  DASSERT_TRUE(values.size() == 1);
62  add_element_simple(values[0]);
63  }
64 
65  /**
66  * Adds an element to the aggregate. Simple version of add_element
67  * where there is only one input value for the operator
68  */
69  virtual void add_element_simple(const flexible_type& flex) = 0;
70 
71  /**
72  * No more elements will be added to this value. However, this value
73  * may still be combined with other values.
74  */
75  virtual void partial_finalize() { }
76 
77  /**
78  * Combines two partial aggregates
79  */
80  virtual void combine(const group_aggregate_value& other) = 0;
81 
82  /**
83  * Emits the result of a groupby operation
84  */
85  virtual flexible_type emit() const = 0;
86 
87  /**
88  * Returns true if the the aggregate_value can consume a column of this type,
89  * and false otherwise. (For instance, a sum aggregator can consume integers
90  * and floats, and not anything else).
91  */
92  virtual bool support_type(flex_type_enum type) const = 0;
93 
94  /**
95  * Sets the input types and returns the output type. For instance,
96  * a sum aggregator when summing integers will return an integer, and when
97  * summing doubles will return doubles.
98  *
99  * Default implementation assumes there is ony one input, and output
100  * type is the same as input type.
101  */
102  virtual flex_type_enum set_input_types(const std::vector<flex_type_enum>& types) {
103  DASSERT_TRUE(types.size() == 1);
104  return set_input_type(types[0]);
105  }
106 
107  /**
108  * Returns a printable name of the operation.
109  */
110  virtual std::string name() const = 0;
111 
112  /**
113  * Saves the state of the aggregation to an archive
114  */
115  virtual void save(oarchive& oarc) const = 0;
116 
117  /**
118  * Loads the state of the aggregation from an archive
119  */
120  virtual void load(iarchive& iarc) = 0;
121 
122  /**
123  * Destruction
124  */
125  virtual inline ~group_aggregate_value() { }
126 
127  /**
128  * Override this function for allowing the operator to be easily printed.
129  *
130  * \code
131  * std::cout << aggregator <<s std::endl;
132  * \endcode
133  */
134  virtual void print(std::ostream& os) const {
135  os << this->name() << "(value = " << this->emit() << ")";
136  }
137 
138  virtual flex_type_enum set_input_type(flex_type_enum type) {
139  return type;
140  }
141 };
142 
143 inline std::ostream& operator<<(std::ostream& os, const group_aggregate_value& dt) {
144  dt.print(os);
145  return os;
146 }
147 
148 
149 /**
150  * Helper function to convert string aggregator name into builtin aggregator value.
151  *
152  * Implementation is in groupby_operators.hpp
153  */
154 std::shared_ptr<group_aggregate_value> get_builtin_group_aggregator(const std::string&);
155 
156 
157 /// \}
158 } // end of turi
159 #endif // TURI_SFRAME_GROUP_AGGREGATE_VALUE_HPP
virtual void print(std::ostream &os) const
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
virtual flexible_type emit() const =0
virtual bool support_type(flex_type_enum type) const =0
virtual void add_element_simple(const flexible_type &flex)=0
virtual void combine(const group_aggregate_value &other)=0
virtual group_aggregate_value * new_instance() const =0
virtual flex_type_enum set_input_types(const std::vector< flex_type_enum > &types)
std::shared_ptr< group_aggregate_value > get_builtin_group_aggregator(const std::string &)
std::set< T > values(const std::map< Key, T > &map)
Definition: stl_util.hpp:386
virtual void add_element(const std::vector< flexible_type > &values)
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
virtual std::string name() const =0
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
virtual void load(iarchive &iarc)=0
virtual void save(oarchive &oarc) const =0