Turi Create  4.0
turi::broadcast_queue< T, Serializer > Class Template Reference

#include <core/storage/query_engine/util/broadcast_queue.hpp>

Public Member Functions

 broadcast_queue (size_t num_consumers, size_t cache_limit=128, const Serializer &serializer=Serializer())
 
void set_cache_limit (size_t cache_limit)
 
void push (const T &el)
 
bool empty (size_t consumer)
 
bool pop (size_t consumer, T &ret)
 
size_t num_elements () const
 
void delete_all_cache_files ()
 

Detailed Description

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
class turi::broadcast_queue< T, Serializer >

This implements a external memory single producer, multiple-consumer queue where every consumer sees all the produced elements.

This class is not thread safe.

Template Parameters
TDatatype to be saved. Must be serializable

This class guarantees very high efficiency if the total number of elements do not exceed the cache limit.

The key design constraint are that files are either open for reading or writing, but not both simultaneously. Random writes are prohibited, but random reads are allowed.

Single consumer queue

The single consumer case is easy to understand so we will explain that first.

  1. push file
  2. center queue
  3. pop file

Elements enter at the top and leave from the bottom.

When data is pushed, we push directly into the center queue. But if the center queue is full, we start writing to the push_file.

When data is popped, we first read from the pop-file. When there are no more pop files, we reads from the center queue. When the center queue is empty, it closes the push and swaps it to a pop file and starts reading from the pop file.

Thus there can be at most one pop file.

There are two interesting properties. Where n is the total number of elements, and k is the limit on the size of center_queue.

1) no more than n / kfiles will be created even under adversial schedulings of consumer and producer.

(Simple proof. Everytime a push file becomes a pop file, the center is empty. Hence at least k elements must be inserted into the center queue before a new push file is started.)

2) Assuming producer and consumer rates match. i.e. we alternate producing 'q' elements and consuming 'q' elements. At most a proportion of (q - k)/q elements will be written to or read from disk.

Multiple consumer queue 1

In the multiple consumer case, every consumer must see every element. This makes the datastructure substantially trickier. Specifically you want to make sure the setting where there are 2 or more consumers and all consumers are very slow except for one which is as fast as the producer, do not result in:

  • producer writes a single element write to a push file
  • fast consumer needs the element, closes that push file turns it into a pop file and reads it.
  • This results in one file per element.

In other words, we must try to preserve the property (1) above, that at most O(n / k) files are created for some datastructure cache parameter k.

A simple method is to shift to a pure caching model.

  1. memory queue
  2. pop file
  3. pop file
  4. pop file
  5. ...

When we push:

  • It goes into the memory queue
  • When memory queue exceeds some threshold, it gets written to disk and a new memory queue is started.

Every consumer remembers a current file position pointer, and

  • pops from file
  • or reads from the in memory queue. (a little bit of bookkeeping is needed since the queue may get flushed to disk)

While this satisfies the property (1), this does not satisfy property (2). i.e. Assuming producer and consumer rates match. i.e. we alternate producing 'q' elements and every consumer consuming 'q' elements. When q > k, This procedure will write every element to file.

Multiple consumer queue 2

Lets soften the cache a bit by allowing caching of up to 2k elements. When 2k elements are reached, we flush the first k elements to a file.

It is easy to show that at most O(n / k) files are created since each file must have at least k elements. Similarly, when producer and consumer rates match. At most a proportion of (q - k)/q elements will be written to or read from disk.

This thus satisfies the desirable properties of the single consumer queue.

The architecture is hence:

  1. memory queue
  1. push file
  2. pop file
  3. pop file
  4. pop file
  5. ...

On push:

  • push into memory_queue
  • if memory_queue size exceeds 2k,
    • flush first k into a new push_file
    • If there is a consumer reading from the memory queue, flip push file to a pop file. (make the push file into a pop file and update all consumers so that they are reading from the pop file.)
  • else if push file exists,
    • pop first element of memory_queue to push_file

On pop:

  • If I am reading from pop file just read next element from pop file advancing to next pop file if necessary
  • If there are no more pop files, but if there is a push file, flip push file to a pop file.
  • If I am reading from memory queue, advance to the next memory queue element

Optimizations

More optimizations are needed to minimize the number of files created.

  1. Use a pool of file names which we can reuse rather than create a new one everytime we need a new file.
  2. Once we reach 2k, we dump the first k to a file. but we don't close the file. After which every new insertion will shift the queue. one element gets written to disk, as one element gets inserted. Only when the file must be read by a consumer, then it gets flushed.
  3. Delay the creation of the "pop file" from the "push file" as late as we can. i.e. we only create the pop file when there is a consumer who needs to read from the data just written to the push file

Definition at line 175 of file broadcast_queue.hpp.

Constructor & Destructor Documentation

◆ broadcast_queue()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
turi::broadcast_queue< T, Serializer >::broadcast_queue ( size_t  num_consumers,
size_t  cache_limit = 128,
const Serializer &  serializer = Serializer() 
)
inlineexplicit

Constructs a disk backed queue

Parameters
cachelimit Number of elements to cache

Definition at line 181 of file broadcast_queue.hpp.

Member Function Documentation

◆ delete_all_cache_files()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
void turi::broadcast_queue< T, Serializer >::delete_all_cache_files ( )
inline

Deletes all unused cache files

Definition at line 305 of file broadcast_queue.hpp.

◆ empty()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
bool turi::broadcast_queue< T, Serializer >::empty ( size_t  consumer)
inline

if this returns false, the next call to pop(consumer) will succeed, Otherwise it will fail.

Definition at line 256 of file broadcast_queue.hpp.

◆ num_elements()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
size_t turi::broadcast_queue< T, Serializer >::num_elements ( ) const
inline

Returns the number of elements in the queue.

Definition at line 298 of file broadcast_queue.hpp.

◆ pop()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
bool turi::broadcast_queue< T, Serializer >::pop ( size_t  consumer,
T &  ret 
)
inline

Pops and element from the queue. Returns true on success, false on failure (queue is empty)

Definition at line 263 of file broadcast_queue.hpp.

◆ push()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
void turi::broadcast_queue< T, Serializer >::push ( const T &  el)
inline

Pushes an element into the queue.

Definition at line 218 of file broadcast_queue.hpp.

◆ set_cache_limit()

template<typename T, typename Serializer = broadcast_queue_serializer<T>>
void turi::broadcast_queue< T, Serializer >::set_cache_limit ( size_t  cache_limit)
inline

Sets the cache limit

Parameters
cachelimit Number of elements to cache

Definition at line 210 of file broadcast_queue.hpp.


The documentation for this class was generated from the following file: