Turi Create  4.0
groupby_aggregate_impl.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
7 #define TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
8 
9 #include <memory>
10 #include <vector>
11 #include <cstdint>
12 #include <functional>
13 #include <unordered_set>
14 #include <core/storage/sframe_data/sframe.hpp>
15 #include <core/util/cityhash_tc.hpp>
16 #include <core/parallel/mutex.hpp>
17 #include <core/storage/sframe_data/group_aggregate_value.hpp>
18 #include <core/generics/hopscotch_map.hpp>
19 
20 namespace turi {
21 
22 
23 /**
24  * \ingroup sframe_physical
25  * \addtogroup groupby_aggregate Groupby Aggregation
26  * \{
27  */
28 
29 /**
30  * \internal
31  * Groupby Aggregation Implementation Detail
32  */
33 namespace groupby_aggregate_impl {
34 /**
35  * A description of a group operation.
36  */
38  /// The column number of operate on
39  std::vector<size_t> column_numbers;
40  /// The aggregator
41  std::shared_ptr<group_aggregate_value> aggregator;
42 };
43 
44 
45 
46 /**
47  * This class manages all the intermedate aggregation result of a given key.
48  * It contains a key, and an array of multiple aggregated values for that key
49  * (each aggregated value is for a different aggregator. For instance, one
50  * could be sum, one could be count).
51  * It then provides
52  */
54 
55  /// The key of this aggregation
56  std::vector<flexible_type> key;
57  /// All the aggregated values
58  mutable std::vector<std::unique_ptr<group_aggregate_value> > values;
59 
60  /// A cache of the hash of the key
61  size_t hash_val;
62 
63  groupby_element() = default;
64 
65  /**
66  * Constructs a group element from a key, and a description of all
67  * the group operations. All the aggregated values will be initialized as
68  * new empty values.
69  */
70  groupby_element(std::vector<flexible_type> group_key,
71  const std::vector<group_descriptor>& group_desc);
72 
73  /**
74  * Constructs a groupby_element from a string which contains a
75  * serialization of the element. The array of all the descriptors is
76  * required.
77  */
78  groupby_element(const std::string& val,
79  const std::vector<group_descriptor>& group_desc);
80 
81  /**
82  * Constructs a group element from a key, and a description of all
83  * the group operations. All the aggregated values will be initialized as
84  * new empty values.
85  */
86  void init(std::vector<flexible_type> group_key,
87  const std::vector<group_descriptor>& group_desc);
88 
89 
90  /// Writes the group result into an output archive
91  void save(oarchive& oarc) const;
92 
93  /**
94  * Loads the group result from an input archive and a group
95  * operation descriptor
96  */
97  void load(iarchive& iarc,
98  const std::vector<group_descriptor>& group_desc);
99 
100  /**
101  * Provides a total ordering on group-by elements
102  */
103  bool operator>(const groupby_element& other) const;
104 
105  /**
106  * Provides a total ordering on group-by elements
107  */
108  bool operator<(const groupby_element& other) const;
109 
110  /**
111  * Returns true if this, and other have identical keys
112  */
113  bool operator==(const groupby_element& other) const;
114 
115  /**
116  * Combines values another groupby element which is performing the
117  * same set of operations
118  */
119  void operator+=(const groupby_element& other);
120 
121  template <typename T>
122  void add_element(const T& val,
123  const std::vector<group_descriptor>& group_desc) const;
124 
125  static size_t hash_key(const std::vector<flexible_type>& key);
126 
127  static size_t hash_key(const std::vector<flexible_type>& key, size_t keylen);
128 
129  static size_t hash_key(const sframe_rows::row& key);
130 
131  static size_t hash_key(const sframe_rows::row& key, size_t keylen);
132 
133  size_t hash() const;
134 
135  void compute_hash();
136 };
137 
138 
139 } // namespace grouby_aggregate_impl
140 
141 /// \}
142 } // namespace turi
143 
144 // we need to put the hash struct in std
145 namespace std {
146 
147 /**
148  * \ingroup sframe_physical
149  * \addtogroup groupby_aggregate Groupby Aggregation
150  * Hash function.
151  *
152  * This allows us to add groupby_element to an std::unordered_set
153  */
154 template<>
156  size_t operator()(
157  const turi::groupby_aggregate_impl::groupby_element& element) const {
158  return element.hash();
159  }
160 };
161 } // namespace std
162 
163 
164 
165 
166 namespace turi {
167 
168 /**
169  * \ingroup sframe_physical
170  * \addtogroup groupby_aggregate Groupby Aggregation
171  * \{
172  */
173 
174 namespace groupby_aggregate_impl {
175 
176 /**
177  * This maintains the complete aggregation result for an SFrame.
178  *
179  * This class essentially implements the entire groupby aggregation algorithm.
180  * Aggregation groups are defined using \ref define_group.
181  * After which, rows are inserted using the add method.
182  *
183  * num_segments is the maximum degree of parallelism permissible. This is
184  * the number of segments of the output SFrame.
185  *
186  * Keys are first hashed into a segment, and each segment is then executed
187  * independently.
188  * For each segment, rows are aggregated in memory as much as possible until we
189  * exceed the max_buffer_size number of keys stored in memory. If more keys are
190  * needed the existing keys are all sorted and flushed out to disk. This
191  * process repeats until all data is read.
192  *
193  * Then when \ref group_and_write is called, a k-way merge is performed across
194  * all the sorted ranges of keys on disk to write the final output.
195  */
197 
198  public:
199  /**
200  * \param max_buffer_size Maximum number of aggregators to store in memory per segment
201  * \param num_segments Maximum degree of parallelism
202  */
203  group_aggregate_container(size_t max_buffer_size,
204  size_t num_segments);
205 
206  /// Deleted copy constructor
208 
209  /// Deleted assignment operator
211  operator=(const group_aggregate_container& other) = delete;
212 
213  /**
214  * Adds a new group operation which groups the values of a column
215  */
216  void define_group(std::vector<size_t> column_numbers,
217  std::shared_ptr<group_aggregate_value> aggregator);
218 
219  /// Add a new element to the container.
220  void add(const std::vector<flexible_type>& val,
221  size_t num_keys);
222 
223  /// Add a new element to the container.
224  void add(const sframe_rows::row& val,
225  size_t num_keys);
226 
227  /// Sort all elements in the container and writes to the output.
228  void group_and_write(sframe& out);
229 
230  /// init tls data members
231  void init_tls();
232 
233  void flush_tls();
234 
235  private:
236  /*********************** helper functions *******************/
237  inline void throw_if_not_initialized() const {
238  if (UNLIKELY(!tss_.init_))
239  log_and_throw("group_aggregate_container is not initialized");
240  }
241 
242  /// Writes the content into the sarray segment backend.
243  void flush_segment(size_t segmentid);
244  /// merge all local buffers into global buffer
245  void merge_local_buffer_set();
246 
247  /************************ data members **********************/
248  /// collection of all the group operations
249  std::vector<group_descriptor> group_descriptors;
250  /// constants
251  const size_t max_buffer_size;
252  const size_t num_segments;
253 
254  struct segment_information {
256  /// The temporary storage for the grouped values
257  size_t segment_id_;
258  };
259 
260  using vec_segment_t = std::vector<segment_information>;
261  using vec_chunk_t = std::vector<size_t>;
262 
263  struct tls_segment_set {
264  size_t id_ = 0;
265  bool init_{false};
266  vec_segment_t segments_{};
267  };
268 
269  /// used during initialization to mark task_id (each thread should have one task_id)
270  unsigned task_id_;
271  /* life cycle is from the start of a thread to end of a thread */
272  static thread_local tls_segment_set tss_;
273 
274  struct sa_buffer_t {
275  sa_buffer_t() = default;
276 
277  sa_buffer_t(size_t num_segments)
278  : sa_buffer_ptr_(new sarray<std::string>()),
279  sa_seg_chunks_(num_segments),
280  sa_seg_locks_(num_segments),
281  refctr_(0) {};
282 
283  // avoid inadvertent copying sarray; exclusive ownership
284  std::unique_ptr<sarray<std::string>> sa_buffer_ptr_;
285  std::vector<vec_chunk_t> sa_seg_chunks_;
286  std::vector<turi::simple_spinlock> sa_seg_locks_;
287  turi::atomic<unsigned> refctr_;
288  };
289 
290  std::vector<sa_buffer_t> local_buffer_set_;
291 
292  std::atomic<unsigned> merry_go_round_;
293 
294  /// gloabl buffer lock, used during the initialization
295  turi::simple_spinlock gl_buffer_lock_;
296  /// global buffer
297  sarray<std::string> gl_buffer_;
298  /// lock guard for each segment in global buffer
299  std::vector<turi::mutex> gl_lock_pool_;
300  /// Storing the size of each sorted chunk for each segment
301  std::vector<vec_chunk_t> gl_chunk_size_set_;
302  /// global buffer reader
303  std::unique_ptr<sarray<std::string>::reader_type> reader;
304 
305  /// Sort all elements in the container and writes to the output.
306  void group_and_write_segment(sframe& out,
307  std::shared_ptr<sarray<std::string>::reader_type> reader,
308  size_t segmentid);
309 };
310 
311 
312 } // namespace groupby_aggregate_impl
313 
314 /// \}
315 } // namespace turi
316 
317 
318 #endif // TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
STL namespace.
size_t hash_val
A cache of the hash of the key.
std::vector< flexible_type > key
The key of this aggregation.
std::vector< size_t > column_numbers
The column number of operate on.
std::vector< std::unique_ptr< group_aggregate_value > > values
All the aggregated values.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
std::shared_ptr< group_aggregate_value > aggregator
The aggregator.