Turi Create  4.0
sarray_sorted_buffer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_SARRAY_SORTED_BUFFER_HPP
7 #define TURI_SFRAME_SARRAY_SORTED_BUFFER_HPP
8 
9 #include<core/parallel/mutex.hpp>
10 #include<memory>
11 #include<vector>
12 #include<future>
13 #include<core/storage/sframe_data/sarray.hpp>
14 #include<core/storage/sframe_data/sframe.hpp>
15 
16 
17 namespace turi {
18 
19 
20 /**
21  * \internal
22  * \ingroup sframe_physical
23  * \addtogroup sframe_internal SFrame Internal
24  * \{
25  */
26 
27 /**
28  * An SArray backed buffer that stores elements in sorted order.
29  *
30  * The container has an in memory buffer, and is backed by an sarray segment.
31  * When the buffer is full, it is sorted and written into the sarray segment as a sorted chunk.
32  *
33  * - The add function is used to push element into the buffer.
34  * - When finishing adding elements, close() can be called to close the buffer.
35  * - The sort_and_write function then merges the sorted chunks and output to the destination array.
36  * - When deduplicate is set in the constructor, the buffer will ignore duplicated items.
37  */
38 
39 template<typename T>
41  typedef T value_type;
42  typedef typename sarray<T>::iterator sink_iterator_type;
43  typedef sarray<T> sink_type;
44  typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
45 
46  public:
47  /// construct with given sarray and the segmentid as sink.
48  sarray_sorted_buffer(size_t buffer_size,
49  comparator_type comparator,
50  bool deduplicate = false);
51 
52  sarray_sorted_buffer(const sarray_sorted_buffer& other) = delete;
53 
54  sarray_sorted_buffer& operator=(const sarray_sorted_buffer& other) = delete;
55 
56  /// Add a new element to the container.
57  void add(value_type&& val, size_t thread_id = 0);
58  void add(const value_type& val, size_t thread_id = 0);
59 
60  /// Sort all elements in the container and writes to the output.
61  /// If deduplicate is true, only output unique elements.
62  template<typename OutIterator>
63  void sort_and_write(OutIterator out);
64 
65  size_t approx_size() const {
66  if (sink->is_opened_for_write()){
67  return 0;
68  } else {
69  size_t ret = 0;
70  for (auto i : chunk_size) ret += i;
71  return ret;
72  }
73  }
74 
75  /// Flush the last buffer, and close the sarray.
76  void close();
77 
78  private:
79  /// Writes the content into the sarray segment backend.
80  void save_buffer(std::shared_ptr<std::vector<value_type>> swap_buffer);
81 
82  /// The sarray storing the elements.
83  std::shared_ptr<sink_type> sink;
84 
85  /// Internal output iterator for the sarray_sink segment.
86  sink_iterator_type out_iter;
87 
88  /// Storing the size of each sorted chunk.
89  std::vector<size_t> chunk_size;
90 
91  /// Guarding the sarray sink from parallel access.
92  turi::mutex sink_mutex;
93 
94  /// Buffer that stores the incoming elements.
95  std::vector<std::vector<value_type>> buffer_array;
96 
97  /// The limit of the buffer size
98  size_t buffer_size;
99 
100  /// Guarding the buffer from parallel access.
101 #ifdef __APPLE__
102  std::vector<simple_spinlock> buffer_mutex_array;
103 #else
104  std::vector<turi::mutex> buffer_mutex_array;
105 #endif
106 
107  /// Comparator for sorting the values.
108  comparator_type comparator;
109 
110  /// If true only keep the unique items.
111  bool deduplicate;
112 }; // end of sarray_sorted_buffer class
113 
114 /// \}
115 
116 /**************************************************************************/
117 /* */
118 /* Implementation */
119 /* */
120 /**************************************************************************/
121 template<typename T>
122 template<typename OutIterator>
124  std::shared_ptr<typename sink_type::reader_type> reader = std::move(sink->get_reader());
125  // prepare the begin row and end row for each chunk.
126  size_t segment_start = 0;
127 
128  // each chunk stores a sequential read of the segments,
129  // and elements in each chunk are already sorted.
130  std::vector<sarray_reader_buffer<T>> chunk_readers;
131 
132  size_t prev_row_start = segment_start;
133  for (size_t i = 0; i < chunk_size.size(); ++i) {
134  size_t row_start = prev_row_start;
135  size_t row_end = row_start + chunk_size[i];
136  prev_row_start = row_end;
137  chunk_readers.push_back(sarray_reader_buffer<T>(reader, row_start, row_end));
138  }
139 
140  // id of the chunks that still have elements.
141  std::unordered_set<size_t> remaining_chunks;
142 
143  // merge the chunks and write to the out iterator
144  std::vector< std::pair<value_type, size_t> > pq;
145  // comparator for the pair type
146  auto pair_comparator = [=](const std::pair<value_type, size_t>& a,
147  const std::pair<value_type, size_t>& b) {
148  return !comparator(a.first, b.first);
149  };
150 
151  // insert one element from each chunk into the priority queue.
152  for (size_t i = 0; i < chunk_readers.size(); ++i) {
153  if (chunk_readers[i].has_next()) {
154  pq.push_back({chunk_readers[i].next(), i});
155  remaining_chunks.insert(i);
156  }
157  }
158  std::make_heap(pq.begin(), pq.end(), pair_comparator);
159 
160  bool is_first_elem = true;
161  value_type prev_value;
162  while (!pq.empty()) {
163  size_t id;
164  value_type value;
165  std::tie(value, id) = pq.front();
166  std::pop_heap(pq.begin(), pq.end(), pair_comparator);
167  pq.pop_back();
168  if (deduplicate) {
169  if ((value != prev_value) || is_first_elem) {
170  prev_value = value;
171  *out = std::move(value);
172  ++out;
173  is_first_elem = false;
174  }
175  } else {
176  *out = std::move(value);
177  ++out;
178  }
179  if (chunk_readers[id].has_next()) {
180  pq.push_back({chunk_readers[id].next(), id});
181  std::push_heap(pq.begin(), pq.end(), pair_comparator);
182  } else {
183  remaining_chunks.erase(id);
184  }
185  }
186 
187  // At most one chunk will be left
188  ASSERT_TRUE(remaining_chunks.size() <= 1);
189  if (remaining_chunks.size()) {
190  size_t id = *remaining_chunks.begin();
191  while(chunk_readers[id].has_next()) {
192  value_type value = chunk_readers[id].next();
193  if (deduplicate) {
194  if ((value != prev_value) || is_first_elem) {
195  prev_value = value;
196  *out = std::move(value);
197  ++out;
198  is_first_elem = false;
199  }
200  } else {
201  *out = std::move(value);
202  ++out;
203  }
204  }
205  }
206 }
207 
208 }
209 #endif
sarray_sorted_buffer(size_t buffer_size, comparator_type comparator, bool deduplicate=false)
construct with given sarray and the segmentid as sink.
void sort_and_write(OutIterator out)
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
void add(value_type &&val, size_t thread_id=0)
Add a new element to the container.
void close()
Flush the last buffer, and close the sarray.