6 #ifndef SFRAME_QUERY_ENGINE_broadcast_queue_HPP 7 #define SFRAME_QUERY_ENGINE_broadcast_queue_HPP 11 #include <core/storage/fileio/general_fstream.hpp> 12 #include <core/storage/fileio/fixed_size_cache_manager.hpp> 13 #include <core/logging/assertions.hpp> 20 struct broadcast_queue_serializer {
21 void save(oarchive& oarc,
const T& t) {
24 void load(iarchive& iarc, T& t) {
174 template <
typename T,
typename Serializer = broadcast_queue_serializer<T>>
182 size_t cache_limit = 128,
183 const Serializer& serializer = Serializer()):
184 m_cache_limit(cache_limit), m_serializer(serializer), m_consumers(num_consumers) {
185 if (m_cache_limit == 0) m_cache_limit = 1;
190 while(!m_pop_queues.empty()) release_pop_queue_front();
193 m_push_queue.write_handle.reset();
194 if (!m_push_queue.file_name.empty()) {
197 m_push_queue.file_name.clear();
200 delete_all_cache_files();
211 m_cache_limit = cache_limit;
212 if (m_cache_limit == 0) m_cache_limit = 1;
219 m_push_queue.element_cache.push_back(el);
220 ++m_push_queue.nelements;
223 if (m_push_queue.write_handle) {
224 for (
auto& c: m_consumers) {
225 if (c.reading_from_push_queue() && c.element_offset == 0) {
231 if (!m_push_queue.write_handle) {
235 if (m_push_queue.nelements >= 2 * m_cache_limit) {
238 if (m_push_queue.nelements >= 2 * m_cache_limit) {
244 oarchive oarc(*m_push_queue.write_handle);
245 m_serializer.save(oarc, m_push_queue.element_cache.front());
246 m_push_queue.element_cache.pop_front();
247 for (
auto& c: m_consumers) {
248 if (c.reading_from_push_queue()) --c.element_offset;
257 return m_consumers[consumer].nelements_popped == nelements_pushed;
263 bool pop(
size_t consumer, T& ret) {
264 DASSERT_LT(consumer, m_consumers.size());
266 auto& cc = m_consumers[consumer];
268 if (!cc.reading_from_push_queue()) {
269 if(cc.file_offset >= cc.current_pop_queue->file_length) {
270 if (cc.current_pop_queue->next_queue ==
nullptr &&
271 m_push_queue.write_handle) {
275 cc.current_pop_queue = cc.current_pop_queue->next_queue;
277 if (m_pop_queues.front().unique()) release_pop_queue_front();
278 return pop(consumer, ret);
280 auto& pq = cc.current_pop_queue;
281 pq->read_handle->seekg(cc.file_offset, std::ios_base::beg);
283 m_serializer.load(iarc, ret);
284 cc.file_offset = pq->read_handle->tellg();
286 if (m_push_queue.element_cache.size() <= cc.element_offset)
return false;
287 ret = m_push_queue.element_cache[cc.element_offset];
291 ++cc.nelements_popped;
299 return m_element_count;
306 while(!m_allocated_filenames.empty()) {
308 m_allocated_filenames.pop();
314 size_t m_cache_limit = 0;
315 size_t m_element_count = 0;
316 Serializer m_serializer;
324 std::string file_name;
326 size_t file_length = 0;
328 std::shared_ptr<general_ifstream> read_handle;
330 size_t nelements = 0;
332 std::shared_ptr<pop_queue> next_queue;
348 std::string file_name;
349 std::shared_ptr<general_ofstream> write_handle;
350 std::deque<T> element_cache;
352 size_t nelements = 0;
372 std::shared_ptr<pop_queue> current_pop_queue;
373 size_t element_offset = 0;
374 size_t file_offset = 0;
376 size_t nelements_popped = 0;
378 bool reading_from_push_queue()
const {
379 return current_pop_queue ==
nullptr;
383 std::deque<std::shared_ptr<pop_queue> > m_pop_queues;
384 push_queue m_push_queue;
385 std::vector<consumer> m_consumers;
386 size_t nelements_pushed = 0;
388 std::queue<std::string> m_allocated_filenames;
390 std::string get_cache_file() {
391 if (!m_allocated_filenames.empty()) {
392 auto ret = m_allocated_filenames.front();
393 m_allocated_filenames.pop();
396 return fileio::fixed_size_cache_manager::get_instance().get_temp_cache_id(
"dqueue");
400 void release_cache_file(
const std::string& f) {
401 m_allocated_filenames.push(f);
408 void trim_push_queue() {
409 size_t min_element_offset = (size_t)(-1);
410 for (
auto& c: m_consumers) {
412 if (c.reading_from_push_queue() ==
false) {
414 }
else if (c.element_offset < min_element_offset) {
415 min_element_offset = c.element_offset;
418 if (min_element_offset > 0) {
419 for (
auto& c: m_consumers) {
420 c.element_offset -= min_element_offset;
422 auto start = m_push_queue.element_cache.begin();
423 auto end = start + min_element_offset;
424 m_push_queue.nelements -= min_element_offset;
427 m_push_queue.element_cache.erase(m_push_queue.element_cache.begin(),
432 bool has_push_queue_reader() {
433 return std::any_of(m_consumers.begin(),
435 [](
const consumer& c) {
436 return c.reading_from_push_queue();
444 void flush_push_queue() {
446 auto start = m_push_queue.element_cache.begin();
447 auto end = m_push_queue.element_cache.begin() + m_cache_limit;
453 if (has_push_queue_reader()) {
454 m_push_queue.file_name = get_cache_file();
455 m_push_queue.write_handle =
456 std::make_shared<general_ofstream>(m_push_queue.file_name);
458 std::vector<size_t> file_offsets;
459 file_offsets.reserve(m_cache_limit);
461 size_t filepos_ctr = 0;
462 for(; start != end; ++start) {
463 file_offsets.push_back(filepos_ctr);
464 m_serializer.save(oarc, *start);
465 m_push_queue.write_handle->write(oarc.buf, oarc.off);
466 filepos_ctr += oarc.off;
471 bool must_flip_queue =
false;
474 for (
auto& c: m_consumers) {
475 if (c.reading_from_push_queue() && c.element_offset < m_cache_limit) {
476 must_flip_queue =
true;
480 if (must_flip_queue) flip_queues();
482 for (
auto& c: m_consumers) {
484 if (c.reading_from_push_queue()) {
485 if (c.element_offset >= m_cache_limit) {
486 c.element_offset -= m_cache_limit;
488 DASSERT_EQ(must_flip_queue,
true);
490 c.current_pop_queue = m_pop_queues.back();
491 c.file_offset = file_offsets[c.element_offset];
492 c.element_offset = 0;
498 m_push_queue.file_name = get_cache_file();
499 m_push_queue.write_handle =
500 std::make_shared<general_ofstream>(m_push_queue.file_name);
501 oarchive oarc(*m_push_queue.write_handle);
502 for(; start != end; ++start) {
503 m_serializer.save(oarc, *start);
508 m_push_queue.element_cache.erase(m_push_queue.element_cache.begin(),
524 m_push_queue.write_handle.reset();
526 auto pq = std::make_shared<pop_queue>();
527 pq->file_name = std::move(m_push_queue.file_name);
528 m_push_queue.file_name.clear();
529 pq->read_handle = std::make_shared<general_ifstream>(pq->file_name);
530 pq->file_length = pq->read_handle->file_size();
531 pq->nelements = m_push_queue.nelements - m_push_queue.element_cache.size();
534 if (!m_pop_queues.empty()) {
535 auto last_elem = m_pop_queues.back();
536 last_elem->next_queue = pq;
538 m_pop_queues.push_back(pq);
540 m_push_queue.nelements = m_push_queue.element_cache.size();
543 void release_pop_queue_front() {
544 auto i = m_pop_queues.front();
545 i->read_handle.reset();
546 if (!i->file_name.empty()) {
547 release_cache_file(i->file_name);
549 i->file_name.clear();
550 m_pop_queues.pop_front();
void set_cache_limit(size_t cache_limit)
bool empty(size_t consumer)
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
bool delete_path(const std::string &path, file_status status=file_status::FS_UNAVAILABLE)
void delete_all_cache_files()
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
bool pop(size_t consumer, T &ret)
broadcast_queue(size_t num_consumers, size_t cache_limit=128, const Serializer &serializer=Serializer())
size_t num_elements() const