Turi Create  4.0
groupby.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_GROUPBY_HPP
7 #define TURI_SFRAME_GROUPBY_HPP
8 
9 #include <core/parallel/mutex.hpp>
10 #include<memory>
11 #include<vector>
12 #include<core/storage/sframe_data/sarray.hpp>
13 #include<core/storage/sframe_data/sframe.hpp>
14 
15 namespace turi {
16 
17 
18 /**
19  * \ingroup sframe_physical
20  * \addtogroup groupby_aggregate Groupby Aggregation
21  * \{
22  */
23 
24 /**
25  * Group the sframe rows by the key_column.
26  *
27  * Like a sort, but not.
28  */
29 sframe group(sframe sframe_in, std::string key_column);
30 
31 // Forward declaration
32 template<typename T>
34 
35 /**
36  * A container of a collection of "hash_bucket"s. Each hash_bucket
37  * store the value in sorted order. If the element is added to bucket
38  * by its hash_value, then all elements in the container are partially sorted,
39  * or grouped.
40  *
41  * Below is an example of using the it to group an sframe by its first column.
42  *
43  * \code
44  * typedef std::vector<flexible_type> valuetype;
45  * sframe sf = ...;
46  *
47  * hash_bucket_container<std::vector<flexible_type>> hash_container(
48  * sf.num_segments(),
49  * [](const value_type& a, const value_type& b) { return a[0] < b[0]; }
50  * );
51  *
52  * parallel_for(0, sf.num_segments(); [&](size_t i) {
53  * auto iter = sf.get_reader().begin(i);
54  * auto end = sf.get_reader().end(i);
55  * while (iter != end) {
56  * size_t hash = *iter[0].hash();
57  * hash_container.add(*iter, hash % hash_container.num_buckets());
58  * ++iter;
59  * }
60  * });
61  *
62  * sframe outsf;
63  * hash_container.sort_and_write(outsf);
64  *
65  * \endcode
66  *
67  * Each hash_bucket has an in memory buffer, and is backed by an sarray segment.
68  * When the buffer is full, it is sorted and written into the sarray segment as
69  * a sorted chunk.
70  *
71  * The sort_and_write function then merges the sorted chunks and write out to
72  * a new sarray or sframe.
73  */
74 template<typename T>
76 
77  public:
78  // type of the stored value
79  typedef T value_type;
80 
81  private:
82  // type of the sarray disk backend.
84 
85  // type of the comparator used for comparing the values within each bucket.
86  typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
87 
88  public:
89  /// Constructs a container with n buckets, and a comparator for sorting the values.
91  size_t num_buckets,
92  comparator_type comparator = std::less<value_type>()
93  ) {
94  sarray_sink.reset(new sink_type());
95  sarray_sink->open_for_write(num_buckets);
96  for (size_t i = 0; i < num_buckets; ++i) {
97  buckets.push_back(new hash_bucket<value_type>(buffer_size, sarray_sink, i, comparator));
98  }
99  };
100 
101  // Delete copy and copy assignment
102  hash_bucket_container(const hash_bucket_container& other) = delete;
103  hash_bucket_container& operator=(const hash_bucket_container& other) = delete;
104 
105  // Destructor
106  ~hash_bucket_container() { for(auto bucket_ptr : buckets) { delete bucket_ptr; } }
107 
108  // Add a new element to the specified bucket.
109  void add(const value_type& val, size_t bucketid) {
110  DASSERT_LT(bucketid, buckets.size());
111  buckets[bucketid]->add(val);
112  };
113 
114  // Sort each bucket and write out the result to an sarray or sframe.
115  template<typename SIterableType>
116  void sort_and_write(SIterableType& out) {
117  parallel_for (0, num_buckets(), [&](size_t i) { buckets[i]->flush();} );
118  sarray_sink->close();
119  typedef typename SIterableType::iterator OutIterator;
120  DASSERT_EQ(out.num_segments(), buckets.size());
121  parallel_for(0, buckets.size(),
122  [&](size_t i) {
123  buckets[i]->template sort_and_write<OutIterator>(out.get_output_iterator(i));
124  });
125  out.close();
126  };
127 
128  // returns the number of buckets in the container.
129  size_t num_buckets() const { return buckets.size(); }
130 
131  private:
132 
133  // buffer size for each hash bucket.
134  // optimal size is about sqrt(N)
135  const size_t buffer_size = 1024 * 1024;
136 
137  // vector of hash bucket which stores elements in sorted order.
138  std::vector<hash_bucket<value_type>*> buckets;
139 
140  // the disk backend shared by all the buckets for dumping the buffer.
141  std::shared_ptr<sarray<std::string>> sarray_sink;
142 };
143 
144 /**
145  * Storing elements that gets hashed to the bucket in sorted order.
146  *
147  * The container has an in memory buffer, and is backed by an sarray segment. When the buffer is full, it is sorted and written into the sarray segment as a sorted chunk.
148  *
149  * The sort_and_write function then merges the sorted chunks and output to the destination array.
150  */
151 template<typename T>
152 class hash_bucket {
153  typedef T value_type;
154  typedef sarray<std::string>::iterator sink_iterator_type;
155  typedef sarray<std::string> sink_type;
156  typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
157 
158  public:
159  /// construct with given sarray and the segmentid as sink.
160  hash_bucket(size_t buffer_size,
161  std::shared_ptr<sink_type> sink,
162  size_t segmentid,
163  comparator_type comparator,
164  bool deduplicate = false);
165 
166  hash_bucket(const hash_bucket& other) = delete;
167 
168  hash_bucket& operator=(const hash_bucket& other) = delete;
169 
170  /// Add a new element to the container.
171  void add(const value_type& val);
172  void add(value_type&& val);
173 
174  /// Flush the last buffer
175  void flush();
176 
177  /// Sort all elements in the container and writes to the output.
178  /// If deduplicate is true, only output unique elements.
179  template<typename OutIterator>
180  void sort_and_write(OutIterator out);
181 
182 
183  private:
184  /// Writes the content into the sarray segment backend.
185  void save_buffer(std::vector<value_type>& swap_buffer);
186 
187  /// The segment id to dump the buffer.
188  size_t segmentid;
189 
190  /// The sarray storing the elements.
191  std::shared_ptr<sarray<std::string>> sink;
192 
193  /// Internal output iterator for the sarray_sink segment.
194  sink_iterator_type out_iter;
195 
196  /// Storing the size of each sorted chunk.
197  std::vector<size_t> chunk_size;
198 
199  /// Guarding the sarray sink from parallel access.
200  turi::mutex sink_mutex;
201 
202  /// Buffer that stores the incoming elements.
203  std::vector<value_type> buffer;
204 
205  /// The limit of the buffer size.
206  size_t buffer_size;
207 
208  /// Guarding the buffer from parallel access.
209 #ifdef __APPLE__
210  simple_spinlock buffer_mutex;
211 #else
212  turi::mutex buffer_mutex;
213 #endif
214 
215  /// Comparator for sorting the values.
216  comparator_type comparator;
217 
218  /// If true only keep the unique items.
219  bool deduplicate;
220 
221  private:
222  inline value_type deserialize(const std::string& buf) {
223  value_type ret;
224  iarchive iarc(buf.c_str(), buf.length());
225  iarc >> ret;
226  return ret;
227  };
228 };
229 
230 /// \}
231 } // end of turi
232 #endif
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
sframe group(sframe sframe_in, std::string key_column)
hash_bucket_container(size_t num_buckets, comparator_type comparator=std::less< value_type >())
Constructs a container with n buckets, and a comparator for sorting the values.
Definition: groupby.hpp:90