6 #ifndef TURI_BLOCKING_QUEUE_HPP 7 #define TURI_BLOCKING_QUEUE_HPP 13 #include <core/parallel/pthread_tools.hpp> 14 #include <core/random/random.hpp> 26 typedef typename std::deque<T> queue_type;
34 volatile uint16_t sleeping;
35 volatile uint16_t sleeping_on_empty;
46 m_queue.push_back(elem);
48 if (sleeping) m_conditional.
signal();
55 m_queue.push_front(elem);
57 if (sleeping) m_conditional.
signal();
63 inline void enqueue_conditional_signal(
const T& elem,
size_t signal_at_size) {
65 m_queue.push_back(elem);
67 if (sleeping && m_queue.size() >= signal_at_size) m_conditional.
signal();
73 return m_queue.empty();
76 void begin_critical_section() {
85 void swap(queue_type &q) {
88 if (m_queue.empty() && sleeping_on_empty) {
89 m_empty_conditional.
signal();
94 inline std::pair<T, bool> try_dequeue_in_critical_section() {
97 if (m_queue.empty() || m_alive ==
false) {
98 return std::make_pair(elem,
false);
101 elem = m_queue.front();
103 if (m_queue.empty() && sleeping_on_empty) {
104 m_empty_conditional.
signal();
106 return std::make_pair(elem,
true);
110 void end_critical_section() {
115 inline std::pair<T, bool> dequeue_and_begin_critical_section_on_success() {
118 bool success =
false;
120 while(m_queue.empty() && m_alive) {
122 m_conditional.
wait(m_mutex);
126 if(!m_queue.empty()) {
128 elem = m_queue.front();
130 if (m_queue.empty() && sleeping_on_empty) {
131 m_empty_conditional.
signal();
134 if (!success) m_mutex.
unlock();
135 return std::make_pair(elem, success);
143 bool success =
false;
145 if (m_queue.size() < immediate_size) {
150 }
while(m_queue.empty() && m_alive);
153 if(!m_queue.empty()) {
167 bool success =
false;
169 if (m_queue.size() < immediate_size) {
170 if (m_queue.empty() && m_alive) {
177 if(!m_queue.empty()) {
187 inline bool wait_for_data() {
190 bool success =
false;
192 while(m_queue.empty() && m_alive) {
194 m_conditional.
wait(m_mutex);
198 if(!m_queue.empty()) {
220 bool success =
false;
222 while(m_queue.empty() && m_alive) {
224 m_conditional.
wait(m_mutex);
228 if(!m_queue.empty()) {
230 elem = m_queue.front();
232 if (m_queue.empty() && sleeping_on_empty) {
233 m_empty_conditional.
signal();
238 return std::make_pair(elem, success);
246 if (m_queue.empty() || m_alive ==
false)
return std::make_pair(T(),
false);
250 if (m_queue.empty() || m_alive ==
false) {
252 return std::make_pair(elem,
false);
255 elem = m_queue.front();
257 if (m_queue.empty() && sleeping_on_empty) {
258 m_empty_conditional.
signal();
263 return std::make_pair(elem,
true);
269 bool res = m_queue.empty();
300 return m_queue.size();
313 while (m_queue.empty() ==
false && m_alive ==
true) {
315 m_empty_conditional.
wait(m_mutex);
Implements a blocking queue useful for producer/consumer models.
void wait(const mutex &mut) const
blocking_queue()
creates a blocking queue
void enqueue_to_head(const T &elem)
Add an element to the blocking queue.
bool empty()
Returns true if the queue is empty.
std::pair< T, bool > try_dequeue()
int timedwait_ns(const mutex &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.
bool try_timed_wait_for_data(size_t ns, size_t immediate_size)
void lock() const
Acquires a lock on the mutex.
bool timed_wait_for_data(size_t ns, size_t immediate_size)
void unlock() const
Releases a lock on the mutex.
void broadcast_blocking_empty()
size_t size()
get the current size of the queue
std::pair< T, bool > dequeue()
void signal() const
Signals one waiting thread to wake up.
void broadcast() const
Wakes up all waiting threads.
void enqueue(const T &elem)
Add an element to the blocking queue.