Turi Create  4.0
aggregates.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_ENGINE_UTIL_AGGREGATES_HPP
7 #define TURI_SFRAME_QUERY_ENGINE_UTIL_AGGREGATES_HPP
8 #include <core/storage/sframe_data/sframe.hpp>
9 #include <core/storage/serialization/serialization_includes.hpp>
10 #include <core/storage/query_engine/operators/reduce.hpp>
11 #include <core/storage/query_engine/planning/planner.hpp>
12 
13 namespace turi {
14 
15 /**
16  * \ingroup sframe_query_engine
17  * \addtogroup Utilities Utilities
18  * \{
19  */
20 namespace query_eval {
21 /**
22  * Implements a generic aggregator where a function is called on each add method.
23  *
24  * Any attempt to combine, or save, will result in a failure.
25  */
26 template <typename T, typename AggregateFunctionType>
28  public:
29  generic_aggregator():value(T()) { }
30  generic_aggregator(AggregateFunctionType fn, const T& t):
31  fn(fn), initial_value(t), value(t) { }
32 
33  /// Returns a new empty instance of sum with the same type
35  generic_aggregator* ret =
36  new generic_aggregator(fn, initial_value);
37  return ret;
38  }
39 
40  /// Adds a new element to be summed
41  void add_element_simple(const flexible_type& flex) {
42  fn(flex, value);
43  }
44 
45  /// Emits the result
46  flexible_type emit() const {
47  // we just emit strings
48  std::stringstream strm;
49  oarchive oarc(strm);
50  oarc << value;
51  return strm.str();
52  }
53 
54  void combine(const group_aggregate_value& flex) {
55  ASSERT_TRUE(false);
56  }
57 
59  ASSERT_TRUE(false);
60  return false;
61  }
62 
63  /// Serializer
64  void save(oarchive& oarc) const {
65  ASSERT_TRUE(false);
66  }
67 
68  /// Deserializer
69  void load(iarchive& iarc) {
70  ASSERT_TRUE(false);
71  }
72 
73  std::string name() const {
74  ASSERT_TRUE(false);
75  return "";
76  }
77  private:
78  AggregateFunctionType fn;
79  T initial_value;
80  T value;
81 };
82 
83 /**
84  * Performs a reduction on input in parallel, this function decides the
85  * degree of parallelism, usually depend on number of CPUs.
86  *
87  * \param input The iterator supports parallel batch reading of items
88  * \param reduce_fn The reduction function to use. This must be of the form
89  * bool f(const array_value_type&, reduction_type&).
90  * \param init The initial value to use in the reduction
91  *
92  * \tparam ResultType The type of the intermediate result. Must be serializable
93  * \tparam ReduceFunctionType The result type of each reduction.
94  * \tparam AggregateFunctionType The type of the reduce function
95  *
96  *
97  */
98 template <typename ResultType,
99  typename ReduceFunctionType,
100  typename AggregateFunctionType>
101 ResultType reduce(
102  std::shared_ptr<planner_node> input,
103  ReduceFunctionType reduce_fn,
104  AggregateFunctionType aggregate_fn,
105  ResultType init = ResultType()) {
106 
108  auto output = op_reduce::make_planner_node(input, agg, flex_type_enum::STRING);
109  sframe sf = planner().materialize(output);
110  auto sfreader = sf.get_reader(1);
111  auto iter = sfreader->begin(0);
112  ResultType result = init;
113  ResultType curval;
114  while(iter != sfreader->end(0)) {
115  // data is serialized in an archive:w
116  std::string st = (*iter)[0];
117  iarchive iarc(st.c_str(), st.length());
118  iarc >> curval;
119  aggregate_fn(curval, result);
120  ++iter;
121  }
122  return result;
123 }
124 
125 
126 /// \}
127 
128 } // namespace query_eval
129 } // namespace turi
130 #endif
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
group_aggregate_value * new_instance() const
Returns a new empty instance of sum with the same type.
Definition: aggregates.hpp:34
sframe materialize(std::shared_ptr< planner_node > tip, materialize_options exec_params=materialize_options())
void add_element_simple(const flexible_type &flex)
Adds a new element to be summed.
Definition: aggregates.hpp:41
ResultType reduce(std::shared_ptr< planner_node > input, ReduceFunctionType reduce_fn, AggregateFunctionType aggregate_fn, ResultType init=ResultType())
Definition: aggregates.hpp:101
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
flexible_type emit() const
Emits the result.
Definition: aggregates.hpp:46
std::unique_ptr< reader_type > get_reader() const
void combine(const group_aggregate_value &flex)
Definition: aggregates.hpp:54
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
void load(iarchive &iarc)
Deserializer.
Definition: aggregates.hpp:69
void save(oarchive &oarc) const
Serializer.
Definition: aggregates.hpp:64
bool support_type(flex_type_enum) const
Definition: aggregates.hpp:58