6 #ifndef TURI_SFRAME_SARRAY_SORTED_BUFFER_HPP 7 #define TURI_SFRAME_SARRAY_SORTED_BUFFER_HPP 9 #include<core/parallel/mutex.hpp> 13 #include<core/storage/sframe_data/sarray.hpp> 14 #include<core/storage/sframe_data/sframe.hpp> 44 typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
49 comparator_type comparator,
50 bool deduplicate =
false);
57 void add(value_type&& val,
size_t thread_id = 0);
58 void add(
const value_type& val,
size_t thread_id = 0);
62 template<
typename OutIterator>
65 size_t approx_size()
const {
66 if (sink->is_opened_for_write()){
70 for (
auto i : chunk_size) ret += i;
80 void save_buffer(std::shared_ptr<std::vector<value_type>> swap_buffer);
83 std::shared_ptr<sink_type> sink;
86 sink_iterator_type out_iter;
89 std::vector<size_t> chunk_size;
95 std::vector<std::vector<value_type>> buffer_array;
102 std::vector<simple_spinlock> buffer_mutex_array;
104 std::vector<turi::mutex> buffer_mutex_array;
108 comparator_type comparator;
122 template<
typename OutIterator>
124 std::shared_ptr<typename sink_type::reader_type> reader = std::move(sink->get_reader());
126 size_t segment_start = 0;
130 std::vector<sarray_reader_buffer<T>> chunk_readers;
132 size_t prev_row_start = segment_start;
133 for (
size_t i = 0; i < chunk_size.size(); ++i) {
134 size_t row_start = prev_row_start;
135 size_t row_end = row_start + chunk_size[i];
136 prev_row_start = row_end;
141 std::unordered_set<size_t> remaining_chunks;
144 std::vector< std::pair<value_type, size_t> > pq;
146 auto pair_comparator = [=](
const std::pair<value_type, size_t>& a,
147 const std::pair<value_type, size_t>& b) {
148 return !comparator(a.first, b.first);
152 for (
size_t i = 0; i < chunk_readers.size(); ++i) {
153 if (chunk_readers[i].has_next()) {
154 pq.push_back({chunk_readers[i].next(), i});
155 remaining_chunks.insert(i);
158 std::make_heap(pq.begin(), pq.end(), pair_comparator);
160 bool is_first_elem =
true;
161 value_type prev_value;
162 while (!pq.empty()) {
165 std::tie(value,
id) = pq.front();
166 std::pop_heap(pq.begin(), pq.end(), pair_comparator);
169 if ((value != prev_value) || is_first_elem) {
171 *out = std::move(value);
173 is_first_elem =
false;
176 *out = std::move(value);
179 if (chunk_readers[
id].has_next()) {
180 pq.push_back({chunk_readers[id].next(),
id});
181 std::push_heap(pq.begin(), pq.end(), pair_comparator);
183 remaining_chunks.erase(
id);
189 if (remaining_chunks.size()) {
190 size_t id = *remaining_chunks.begin();
191 while(chunk_readers[
id].has_next()) {
192 value_type value = chunk_readers[id].next();
194 if ((value != prev_value) || is_first_elem) {
196 *out = std::move(value);
198 is_first_elem =
false;
201 *out = std::move(value);
sarray_sorted_buffer(size_t buffer_size, comparator_type comparator, bool deduplicate=false)
construct with given sarray and the segmentid as sink.
void sort_and_write(OutIterator out)
#define ASSERT_TRUE(cond)
void add(value_type &&val, size_t thread_id=0)
Add a new element to the container.
void close()
Flush the last buffer, and close the sarray.