Turi Create
4.0
|
#include <core/storage/query_engine/util/broadcast_queue.hpp>
Public Member Functions | |
broadcast_queue (size_t num_consumers, size_t cache_limit=128, const Serializer &serializer=Serializer()) | |
void | set_cache_limit (size_t cache_limit) |
void | push (const T &el) |
bool | empty (size_t consumer) |
bool | pop (size_t consumer, T &ret) |
size_t | num_elements () const |
void | delete_all_cache_files () |
This implements a external memory single producer, multiple-consumer queue where every consumer sees all the produced elements.
This class is not thread safe.
T | Datatype to be saved. Must be serializable |
This class guarantees very high efficiency if the total number of elements do not exceed the cache limit.
The key design constraint are that files are either open for reading or writing, but not both simultaneously. Random writes are prohibited, but random reads are allowed.
The single consumer case is easy to understand so we will explain that first.
Elements enter at the top and leave from the bottom.
When data is pushed, we push directly into the center queue. But if the center queue is full, we start writing to the push_file.
When data is popped, we first read from the pop-file. When there are no more pop files, we reads from the center queue. When the center queue is empty, it closes the push and swaps it to a pop file and starts reading from the pop file.
Thus there can be at most one pop file.
There are two interesting properties. Where n is the total number of elements, and k is the limit on the size of center_queue.
1) no more than n / kfiles will be created even under adversial schedulings of consumer and producer.
(Simple proof. Everytime a push file becomes a pop file, the center is empty. Hence at least k elements must be inserted into the center queue before a new push file is started.)
2) Assuming producer and consumer rates match. i.e. we alternate producing 'q' elements and consuming 'q' elements. At most a proportion of (q - k)/q elements will be written to or read from disk.
In the multiple consumer case, every consumer must see every element. This makes the datastructure substantially trickier. Specifically you want to make sure the setting where there are 2 or more consumers and all consumers are very slow except for one which is as fast as the producer, do not result in:
In other words, we must try to preserve the property (1) above, that at most O(n / k) files are created for some datastructure cache parameter k.
A simple method is to shift to a pure caching model.
When we push:
Every consumer remembers a current file position pointer, and
While this satisfies the property (1), this does not satisfy property (2). i.e. Assuming producer and consumer rates match. i.e. we alternate producing 'q' elements and every consumer consuming 'q' elements. When q > k, This procedure will write every element to file.
Lets soften the cache a bit by allowing caching of up to 2k elements. When 2k elements are reached, we flush the first k elements to a file.
It is easy to show that at most O(n / k) files are created since each file must have at least k elements. Similarly, when producer and consumer rates match. At most a proportion of (q - k)/q elements will be written to or read from disk.
This thus satisfies the desirable properties of the single consumer queue.
The architecture is hence:
On push:
On pop:
More optimizations are needed to minimize the number of files created.
Definition at line 175 of file broadcast_queue.hpp.
|
inlineexplicit |
Constructs a disk backed queue
cache | limit Number of elements to cache |
Definition at line 181 of file broadcast_queue.hpp.
|
inline |
Deletes all unused cache files
Definition at line 305 of file broadcast_queue.hpp.
|
inline |
if this returns false, the next call to pop(consumer) will succeed, Otherwise it will fail.
Definition at line 256 of file broadcast_queue.hpp.
|
inline |
Returns the number of elements in the queue.
Definition at line 298 of file broadcast_queue.hpp.
|
inline |
Pops and element from the queue. Returns true on success, false on failure (queue is empty)
Definition at line 263 of file broadcast_queue.hpp.
|
inline |
Pushes an element into the queue.
Definition at line 218 of file broadcast_queue.hpp.
|
inline |
Sets the cache limit
cache | limit Number of elements to cache |
Definition at line 210 of file broadcast_queue.hpp.