Turi Create  4.0
broadcast_queue.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef SFRAME_QUERY_ENGINE_broadcast_queue_HPP
7 #define SFRAME_QUERY_ENGINE_broadcast_queue_HPP
8 #include <deque>
9 #include <string>
10 #include <memory>
11 #include <core/storage/fileio/general_fstream.hpp>
12 #include <core/storage/fileio/fixed_size_cache_manager.hpp>
13 #include <core/logging/assertions.hpp>
14 
15 namespace turi {
16 class oarchive;
17 class iarchive;
18 
19 template <typename T>
20 struct broadcast_queue_serializer {
21  void save(oarchive& oarc, const T& t) {
22  oarc << t;
23  }
24  void load(iarchive& iarc, T& t) {
25  iarc >> t;
26  }
27 };
28 /**
29  * \ingroup sframe_query_engine
30  * \addtogroup Utilities Utilities
31  * \{
32  */
33 /**
34  * This implements a external memory single producer, multiple-consumer queue
35  * where every consumer sees all the produced elements.
36  *
37  * This class is *not* thread safe.
38  *
39  * \tparam T Datatype to be saved. Must be serializable
40  *
41  * This class guarantees very high efficiency if the total number
42  * of elements do not exceed the cache limit.
43  *
44  * The key design constraint are that files are either open for reading
45  * or writing, but not both simultaneously. Random writes are prohibited, but
46  * random reads are allowed.
47  *
48  * Single consumer queue
49  * ---------------------
50  * The single consumer case is easy to understand so we will explain that first.
51  *
52  * 1. push file
53  * 2. center queue
54  * 3. pop file
55  *
56  * Elements enter at the top and leave from the bottom.
57  *
58  * When data is pushed, we push directly into the center queue. But if the
59  * center queue is full, we start writing to the push_file.
60  *
61  * When data is popped, we first read from the pop-file. When there are no more
62  * pop files, we reads from the center queue. When the center queue is empty,
63  * it closes the push and swaps it to a pop file and starts reading from the pop
64  * file.
65  *
66  * Thus there can be at most one pop file.
67  *
68  * There are two interesting properties.
69  * Where n is the total number of elements, and k is the limit on the size
70  * of center_queue.
71  *
72  * 1) no more than n / kfiles will be created even under
73  * adversial schedulings of consumer and producer.
74  *
75  * (Simple proof. Everytime a push file becomes a pop file, the center is
76  * empty. Hence at least k elements must be inserted
77  * into the center queue before a new push file is started.)
78  *
79  * 2) Assuming producer and consumer rates match. i.e. we alternate
80  * producing 'q' elements and consuming 'q' elements. At most a proportion of
81  * (q - k)/q elements will be written to or read from disk.
82  *
83  * Multiple consumer queue 1
84  * -------------------------
85  * In the multiple consumer case, every consumer must see every element. This
86  * makes the datastructure substantially trickier. Specifically you want to make
87  * sure the setting where there are 2 or more consumers and all consumers are
88  * very slow except for one which is as fast as the producer, do not result in:
89  * - producer writes a single element write to a push file
90  * - fast consumer needs the element, closes that push file turns it into a
91  * pop file and reads it.
92  * - This results in one file per element.
93  *
94  * In other words, we must try to preserve the property (1) above, that at most
95  * O(n / k) files are created for some datastructure cache parameter k.
96  *
97  * A simple method is to shift to a pure caching model.
98  * 1. memory queue
99  * 2. pop file
100  * 3. pop file
101  * 4. pop file
102  * 5. ...
103  *
104  * When we push:
105  * - It goes into the memory queue
106  * - When memory queue exceeds some threshold, it gets written to disk
107  * and a new memory queue is started.
108  *
109  * Every consumer remembers a current file position pointer, and
110  * - pops from file
111  * - or reads from the in memory queue. (a little bit of bookkeeping
112  * is needed since the queue may get flushed to disk)
113  *
114  * While this satisfies the property (1), this does not satisfy property (2).
115  * i.e. Assuming producer and consumer rates match. i.e. we alternate
116  * producing 'q' elements and every consumer consuming 'q' elements.
117  * When q > k, This procedure will write every element to file.
118  *
119  *
120  * Multiple consumer queue 2
121  * -------------------------
122  * Lets soften the cache a bit by allowing caching of up to 2k elements.
123  * When 2k elements are reached, we flush the first k elements to a file.
124  *
125  * It is easy to show that at most O(n / k) files are created since each file
126  * must have at least k elements. Similarly, when producer and consumer rates
127  * match. At most a proportion of (q - k)/q elements will be written to or read
128  * from disk.
129  *
130  * This thus satisfies the desirable properties of the single consumer queue.
131  *
132  * The architecture is hence:
133  * 1. memory queue
134  * 1. push file
135  * 2. pop file
136  * 3. pop file
137  * 4. pop file
138  * 5. ...
139  *
140  * On push:
141  * - push into memory_queue
142  * - if memory_queue size exceeds 2k,
143  * - flush first k into a new push_file
144  * - If there is a consumer reading from the memory queue, flip push file
145  * to a pop file. (make the
146  * push file into a pop file and update all consumers so that
147  * they are reading from the pop file.)
148  * - else if push file exists,
149  * - pop first element of memory_queue to push_file
150  *
151  * On pop:
152  * - If I am reading from pop file just read next element from pop file
153  * advancing to next pop file if necessary
154  * - If there are no more pop files, but if there is a push file, flip push
155  * file to a pop file.
156  * - If I am reading from memory queue, advance to the
157  * next memory queue element
158  *
159  * Optimizations
160  * -------------
161  * More optimizations are needed to minimize the number of files created.
162  * 1. Use a pool of file names which we can reuse rather than create a new one
163  * everytime we need a new file.
164  *
165  * 2. Once we reach 2k, we dump the first k to a file. but we don't close the
166  * file. After which every new insertion will shift the queue. one element
167  * gets written to disk, as one element gets inserted. Only when the file must
168  * be read by a consumer, then it gets flushed.
169  *
170  * 3. Delay the creation of the "pop file" from the "push file" as late as we
171  * can. i.e. we only create the pop file when there is a consumer who needs to
172  * read from the data just written to the push file
173  */
174 template <typename T, typename Serializer = broadcast_queue_serializer<T>>
176  public:
177  /**
178  * Constructs a disk backed queue
179  * \param cache limit Number of elements to cache
180  */
181  explicit broadcast_queue(size_t num_consumers,
182  size_t cache_limit = 128,
183  const Serializer& serializer = Serializer()):
184  m_cache_limit(cache_limit), m_serializer(serializer), m_consumers(num_consumers) {
185  if (m_cache_limit == 0) m_cache_limit = 1;
186  }
187  void reset() {
188  m_consumers.clear();
189  // clear the pop queue, deleting files if necessary
190  while(!m_pop_queues.empty()) release_pop_queue_front();
191 
192  // clear the push queue, deleting files if necessary
193  m_push_queue.write_handle.reset();
194  if (!m_push_queue.file_name.empty()) {
195  fileio::delete_path(m_push_queue.file_name);
196  }
197  m_push_queue.file_name.clear();
198 
199 
200  delete_all_cache_files();
201  }
202 
203  ~broadcast_queue() {
204  reset();
205  }
206  /**
207  * Sets the cache limit
208  * \param cache limit Number of elements to cache
209  */
210  void set_cache_limit(size_t cache_limit) {
211  m_cache_limit = cache_limit;
212  if (m_cache_limit == 0) m_cache_limit = 1;
213  }
214 
215  /**
216  * Pushes an element into the queue.
217  */
218  void push(const T& el) {
219  m_push_queue.element_cache.push_back(el);
220  ++m_push_queue.nelements;
221  ++nelements_pushed;
222 
223  if (m_push_queue.write_handle) {
224  for (auto& c: m_consumers) {
225  if (c.reading_from_push_queue() && c.element_offset == 0) {
226  flip_queues();
227  break;
228  }
229  }
230  }
231  if (!m_push_queue.write_handle) {
232  // no output file yet
233  // push until 2 * cache limit
234  // try to trim first. Then flush.
235  if (m_push_queue.nelements >= 2 * m_cache_limit) {
236  trim_push_queue();
237  }
238  if (m_push_queue.nelements >= 2 * m_cache_limit) {
239  flush_push_queue();
240  }
241  } else {
242  // we have an output file. this is now a rolling cache
243  // pop one element and write
244  oarchive oarc(*m_push_queue.write_handle);
245  m_serializer.save(oarc, m_push_queue.element_cache.front());
246  m_push_queue.element_cache.pop_front();
247  for (auto& c: m_consumers) {
248  if (c.reading_from_push_queue()) --c.element_offset;
249  }
250  }
251  }
252  /**
253  * if this returns false, the next call to pop(consumer) will succeed,
254  * Otherwise it will fail.
255  */
256  bool empty(size_t consumer) {
257  return m_consumers[consumer].nelements_popped == nelements_pushed;
258  }
259  /**
260  * Pops and element from the queue.
261  * Returns true on success, false on failure (queue is empty)
262  */
263  bool pop(size_t consumer, T& ret) {
264  DASSERT_LT(consumer, m_consumers.size());
265  // current consumer
266  auto& cc = m_consumers[consumer];
267 
268  if (!cc.reading_from_push_queue()) {
269  if(cc.file_offset >= cc.current_pop_queue->file_length) {
270  if (cc.current_pop_queue->next_queue == nullptr &&
271  m_push_queue.write_handle) {
272  flip_queues();
273  }
274  // advance to next queue if this queue is finished
275  cc.current_pop_queue = cc.current_pop_queue->next_queue;
276  cc.file_offset = 0;
277  if (m_pop_queues.front().unique()) release_pop_queue_front();
278  return pop(consumer, ret);
279  }
280  auto& pq = cc.current_pop_queue;
281  pq->read_handle->seekg(cc.file_offset, std::ios_base::beg);
282  iarchive iarc(*(pq->read_handle));
283  m_serializer.load(iarc, ret);
284  cc.file_offset = pq->read_handle->tellg();
285  } else {
286  if (m_push_queue.element_cache.size() <= cc.element_offset) return false;
287  ret = m_push_queue.element_cache[cc.element_offset];
288  ++cc.element_offset;
289  trim_push_queue();
290  }
291  ++cc.nelements_popped;
292  return true;
293  }
294 
295  /**
296  * Returns the number of elements in the queue.
297  */
298  size_t num_elements() const {
299  return m_element_count;
300  }
301 
302  /**
303  * Deletes all unused cache files
304  */
306  while(!m_allocated_filenames.empty()) {
307  fileio::delete_path(m_allocated_filenames.front());
308  m_allocated_filenames.pop();
309  }
310  }
311 
312  private:
313 
314  size_t m_cache_limit = 0;
315  size_t m_element_count = 0;
316  Serializer m_serializer;
317 
318  /**
319  * A queue of this is managed by m_pop_queues.
320  * Each pop_queue points to a file containing the data for this queue
321  */
322  struct pop_queue {
323  /// file name of this queue
324  std::string file_name;
325  /// length of file
326  size_t file_length = 0;
327  /// a ifstream handle
328  std::shared_ptr<general_ifstream> read_handle;
329  /// Number of elements in this queue
330  size_t nelements = 0;
331  /// The next queue in this list of queue
332  std::shared_ptr<pop_queue> next_queue;
333  };
334 
335  /**
336  * There is only one of this; this is where data gets pushed.
337  * There are 2 possible states for this structure.
338  *
339  * When there are < 2 * cache_limit elements:
340  * All data is in element_cache
341  * write_handle is nullptr
342  * When there are >= 2 * cache_limit elements:
343  * The first nelements - cache_limit elements are
344  * stored in the file using write_handle
345  * element_cache contains the most recently inserted cache_limit elements
346  */
347  struct push_queue {
348  std::string file_name;
349  std::shared_ptr<general_ofstream> write_handle;
350  std::deque<T> element_cache;
351  /// The total number of elements stored
352  size_t nelements = 0;
353  };
354 
355  /**
356  * There is one of this for each consumer.
357  *
358  * if current_pop_queue is nullptr the consumer is reading from the push_queue
359  * in which case element_offset is the index into push_queue::element_cache
360  * for the NEXT element to read.
361  *
362  * if curent_pop_queue is not null, we are reading from a pop_queue and
363  * file_offset is the file position to seek to for the NEXT element to read.
364  *
365  * Global Invariant
366  * ----------------
367  * When current_pop_queue is nullptr, push_queue.nelements cannot be
368  * greater than 2 * cache_limit_elements. (we cannot maintain the
369  * element_offset index correctly when the rolling cache is used).
370  */
371  struct consumer {
372  std::shared_ptr<pop_queue> current_pop_queue;
373  size_t element_offset = 0;
374  size_t file_offset = 0;
375  /// total elements popped so far
376  size_t nelements_popped = 0;
377 
378  bool reading_from_push_queue() const {
379  return current_pop_queue == nullptr;
380  }
381  };
382 
383  std::deque<std::shared_ptr<pop_queue> > m_pop_queues;
384  push_queue m_push_queue;
385  std::vector<consumer> m_consumers;
386  size_t nelements_pushed = 0;
387  /// we keep a pool of cache files to recycle
388  std::queue<std::string> m_allocated_filenames;
389 
390  std::string get_cache_file() {
391  if (!m_allocated_filenames.empty()) {
392  auto ret = m_allocated_filenames.front();
393  m_allocated_filenames.pop();
394  return ret;
395  } else {
396  return fileio::fixed_size_cache_manager::get_instance().get_temp_cache_id("dqueue");
397  }
398  }
399 
400  void release_cache_file(const std::string& f) {
401  m_allocated_filenames.push(f);
402  }
403 
404  /**
405  * If all readers are in the push queue, and
406  * reading directly from elements, we might be able to trim the queue.
407  */
408  void trim_push_queue() {
409  size_t min_element_offset = (size_t)(-1);
410  for (auto& c: m_consumers) {
411  // we have a reader that is not reading from push queue. cannot trim
412  if (c.reading_from_push_queue() == false) {
413  return;
414  } else if (c.element_offset < min_element_offset) {
415  min_element_offset = c.element_offset;
416  }
417  }
418  if (min_element_offset > 0) {
419  for (auto& c: m_consumers) {
420  c.element_offset -= min_element_offset;
421  }
422  auto start = m_push_queue.element_cache.begin();
423  auto end = start + min_element_offset;
424  m_push_queue.nelements -= min_element_offset;
425 
426  // remove those m_cache_limit elements from the in memory cache
427  m_push_queue.element_cache.erase(m_push_queue.element_cache.begin(),
428  end);
429  }
430  }
431 
432  bool has_push_queue_reader() {
433  return std::any_of(m_consumers.begin(),
434  m_consumers.end(),
435  [](const consumer& c) {
436  return c.reading_from_push_queue();
437  });
438  }
439 
440  /**
441  * To be called when the size of the in memory cache exceeds 2 *
442  * the cache limit.
443  */
444  void flush_push_queue() {
445  // write out m_cache_limit elements
446  auto start = m_push_queue.element_cache.begin();
447  auto end = m_push_queue.element_cache.begin() + m_cache_limit;
448 
449  // if there is a consumer reading from the push_queue, we need to
450  // completely close the file and shift it to the pull queue.
451  // this requires a whole bunch of datastructure updates to be
452  // performed by flip_queues
453  if (has_push_queue_reader()) {
454  m_push_queue.file_name = get_cache_file();
455  m_push_queue.write_handle =
456  std::make_shared<general_ofstream>(m_push_queue.file_name);
457  // remember the offset of each element
458  std::vector<size_t> file_offsets;
459  file_offsets.reserve(m_cache_limit);
460  oarchive oarc;
461  size_t filepos_ctr = 0;
462  for(; start != end; ++start) {
463  file_offsets.push_back(filepos_ctr);
464  m_serializer.save(oarc, *start);
465  m_push_queue.write_handle->write(oarc.buf, oarc.off);
466  filepos_ctr += oarc.off;
467  oarc.off = 0;
468  }
469  free(oarc.buf);
470 
471  bool must_flip_queue = false;
472  // now we need to update all the consumers
473  // which are reading directly from elements.
474  for (auto& c: m_consumers) {
475  if (c.reading_from_push_queue() && c.element_offset < m_cache_limit) {
476  must_flip_queue = true;
477  break;
478  }
479  }
480  if (must_flip_queue) flip_queues();
481 
482  for (auto& c: m_consumers) {
483  // reading from elements
484  if (c.reading_from_push_queue()) {
485  if (c.element_offset >= m_cache_limit) {
486  c.element_offset -= m_cache_limit;
487  } else {
488  DASSERT_EQ(must_flip_queue, true);
489  // convert element offset to file_offset
490  c.current_pop_queue = m_pop_queues.back();
491  c.file_offset = file_offsets[c.element_offset];
492  c.element_offset = 0;
493  }
494  }
495  }
496  } else {
497  // no push queue reader. open a push file and just dump
498  m_push_queue.file_name = get_cache_file();
499  m_push_queue.write_handle =
500  std::make_shared<general_ofstream>(m_push_queue.file_name);
501  oarchive oarc(*m_push_queue.write_handle);
502  for(; start != end; ++start) {
503  m_serializer.save(oarc, *start);
504  }
505  }
506 
507  // remove those m_cache_limit elements from the in memory cache
508  m_push_queue.element_cache.erase(m_push_queue.element_cache.begin(),
509  end);
510 
511  }
512 
513  /**
514  * Converts the file in the push_queue to a pull queue
515  * and update all the consumers
516  * \param file_offsets file_offsets[i] is the offset of element i in the file
517  * currently managed by push_queue
518  */
519  void flip_queues() {
520  // here we are just doing alot of invariant fixing to convert the push
521  // queue file to a pull queue
522  //
523  // close the push queue handle
524  m_push_queue.write_handle.reset();
525  // make a new pop queue
526  auto pq = std::make_shared<pop_queue>();
527  pq->file_name = std::move(m_push_queue.file_name);
528  m_push_queue.file_name.clear();
529  pq->read_handle = std::make_shared<general_ifstream>(pq->file_name);
530  pq->file_length = pq->read_handle->file_size();
531  pq->nelements = m_push_queue.nelements - m_push_queue.element_cache.size();
532  // insert into the queue
533  // update the linked list managed by pop_queue::next_queue
534  if (!m_pop_queues.empty()) {
535  auto last_elem = m_pop_queues.back();
536  last_elem->next_queue = pq;
537  }
538  m_pop_queues.push_back(pq);
539  // update the nelement counter in the push_queue
540  m_push_queue.nelements = m_push_queue.element_cache.size();
541  }
542 
543  void release_pop_queue_front() {
544  auto i = m_pop_queues.front();
545  i->read_handle.reset();
546  if (!i->file_name.empty()) {
547  release_cache_file(i->file_name);
548  }
549  i->file_name.clear();
550  m_pop_queues.pop_front();
551  }
552 };
553 
554 /// \}
555 } // turicreate
556 #endif
void set_cache_limit(size_t cache_limit)
bool empty(size_t consumer)
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
bool delete_path(const std::string &path, file_status status=file_status::FS_UNAVAILABLE)
void push(const T &el)
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
bool pop(size_t consumer, T &ret)
broadcast_queue(size_t num_consumers, size_t cache_limit=128, const Serializer &serializer=Serializer())
size_t num_elements() const