Turi Create  4.0
buffered_writer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #include<vector>
7 #include<core/parallel/mutex.hpp>
8 #include<core/storage/sframe_data/sframe_constants.hpp>
9 
10 namespace turi {
11 /**
12  * \ingroup fileio
13  * Provide buffered write abstraction.
14  * The class manages buffered concurrent write to an output iterator.
15  *
16  * Example:
17  *
18  * Suppose there are M data sources randomly flow to N sinks. We can use
19  * buffered_writer to achieve efficient concurrent write.
20  *
21  * \code
22  *
23  * std::vector<input_iterator> sources; // size M
24  * std::vector<output_iterator> sinks; // size N
25  * std::vector<turi::mutex> sink_mutex; // size N
26  *
27  * parallel_for_each(s : sources) {
28  * std::vector<buffered_writer> writers;
29  * for (i = 1...N) {
30  * writers.push_back(buffered_writer(sinks[i], sink_mutex[i]));
31  * }
32  * while (s.has_next()) {
33  * size_t destination = random.randint(N);
34  * writers[destination].write(s.next());
35  * }
36  * for (i = 1...N) {
37  * writers[i].flush();
38  * }
39  * }
40  * \endcode
41  *
42  * Two parameters "soft_limit" and "hard_limit" are used to control the buffer
43  * size. When soft_limit is met, the writer will try to flush the buffer
44  * content to the sink. When hard_limit is met, the writer will force the flush.
45  */
46 template<typename ValueType, typename OutIterator>
48 public:
49  buffered_writer(OutIterator& out, turi::mutex& out_lock,
50  size_t soft_limit = SFRAME_WRITER_BUFFER_SOFT_LIMIT,
51  size_t hard_limit = SFRAME_WRITER_BUFFER_HARD_LIMIT) :
52  out(out), out_lock(out_lock),
53  soft_limit(soft_limit),
54  hard_limit(hard_limit) {
55  ASSERT_GT(hard_limit, soft_limit);
56  }
57 
58  /**
59  * Write the value to the buffer.
60  * Try flush when buffer exceeds soft limit and force
61  * flush when buffer exceeds hard limit.
62  */
63  void write(const ValueType& val) {
64  buffer.push_back(val);
65  if (buffer.size() >= soft_limit) {
66  bool locked = out_lock.try_lock();
67  if (locked || buffer.size() >= hard_limit) {
68  flush(locked);
69  }
70  }
71  }
72 
73  void write(ValueType&& val) {
74  buffer.push_back(val);
75  if (buffer.size() >= soft_limit) {
76  bool locked = out_lock.try_lock();
77  if (locked || buffer.size() >= hard_limit) {
78  flush(locked);
79  }
80  }
81  }
82 
83 
84  /**
85  * Flush the buffer to the output sink. Clear the buffer when finished.
86  */
87  void flush(bool is_locked = false) {
88  if (!is_locked) {
89  out_lock.lock();
90  }
91  std::lock_guard<turi::mutex> guard(out_lock, std::adopt_lock);
92  for (auto& val : buffer) {
93  *out++ = std::move(val);
94  }
95  buffer.clear();
96  }
97 
98 private:
99  OutIterator& out;
100  turi::mutex& out_lock;
101  size_t soft_limit;
102  size_t hard_limit;
103  std::vector<ValueType> buffer;
104 };
105 }
void flush(bool is_locked=false)
void lock() const
Acquires a lock on the mutex.
Definition: mutex.hpp:64
const size_t SFRAME_WRITER_BUFFER_HARD_LIMIT
bool try_lock() const
Non-blocking attempt to acquire a lock on the mutex.
Definition: mutex.hpp:82
void write(const ValueType &val)