Turi Create  4.0
blocking_queue.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_BLOCKING_QUEUE_HPP
7 #define TURI_BLOCKING_QUEUE_HPP
8 
9 
10 
11 #include <list>
12 #include <deque>
13 #include <core/parallel/pthread_tools.hpp>
14 #include <core/random/random.hpp>
15 
16 namespace turi {
17 
18  /**
19  * \ingroup util
20  * \brief Implements a blocking queue useful for producer/consumer models
21  */
22  template<typename T>
24  protected:
25 
26  typedef typename std::deque<T> queue_type;
27 
28  bool m_alive;
29  queue_type m_queue;
30  mutex m_mutex;
31  conditional m_conditional;
32  conditional m_empty_conditional;
33 
34  volatile uint16_t sleeping;
35  volatile uint16_t sleeping_on_empty;
36 
37 
38  public:
39 
40  //! creates a blocking queue
41  blocking_queue() : m_alive(true),sleeping(0),sleeping_on_empty(0) { }
42 
43  //! Add an element to the blocking queue
44  inline void enqueue(const T& elem) {
45  m_mutex.lock();
46  m_queue.push_back(elem);
47  // Signal threads waiting on the queue
48  if (sleeping) m_conditional.signal();
49  m_mutex.unlock();
50  }
51 
52  //! Add an element to the blocking queue
53  inline void enqueue_to_head(const T& elem) {
54  m_mutex.lock();
55  m_queue.push_front(elem);
56  // Signal threads waiting on the queue
57  if (sleeping) m_conditional.signal();
58  m_mutex.unlock();
59  }
60 
61 
62 
63  inline void enqueue_conditional_signal(const T& elem, size_t signal_at_size) {
64  m_mutex.lock();
65  m_queue.push_back(elem);
66  // Signal threads waiting on the queue
67  if (sleeping && m_queue.size() >= signal_at_size) m_conditional.signal();
68  m_mutex.unlock();
69  }
70 
71 
72  bool empty_unsafe() {
73  return m_queue.empty();
74  }
75 
76  void begin_critical_section() {
77  m_mutex.lock();
78  }
79 
80 
81  bool is_alive() {
82  return m_alive;
83  }
84 
85  void swap(queue_type &q) {
86  m_mutex.lock();
87  q.swap(m_queue);
88  if (m_queue.empty() && sleeping_on_empty) {
89  m_empty_conditional.signal();
90  }
91  m_mutex.unlock();
92  }
93 
94  inline std::pair<T, bool> try_dequeue_in_critical_section() {
95  T elem = T();
96  // Wait while the queue is empty and this queue is alive
97  if (m_queue.empty() || m_alive == false) {
98  return std::make_pair(elem, false);
99  }
100  else {
101  elem = m_queue.front();
102  m_queue.pop_front();
103  if (m_queue.empty() && sleeping_on_empty) {
104  m_empty_conditional.signal();
105  }
106  return std::make_pair(elem, true);
107  }
108  }
109 
110  void end_critical_section() {
111  m_mutex.unlock();
112  }
113 
114 
115  inline std::pair<T, bool> dequeue_and_begin_critical_section_on_success() {
116  m_mutex.lock();
117  T elem = T();
118  bool success = false;
119  // Wait while the queue is empty and this queue is alive
120  while(m_queue.empty() && m_alive) {
121  sleeping++;
122  m_conditional.wait(m_mutex);
123  sleeping--;
124  }
125  // An element has been added or a signal was raised
126  if(!m_queue.empty()) {
127  success = true;
128  elem = m_queue.front();
129  m_queue.pop_front();
130  if (m_queue.empty() && sleeping_on_empty) {
131  m_empty_conditional.signal();
132  }
133  }
134  if (!success) m_mutex.unlock();
135  return std::make_pair(elem, success);
136  }
137 
138  /// Returns immediately of queue size is >= immedeiate_size
139  /// Otherwise, it will poll over 'ns' nanoseconds or on a signal
140  /// until queue is not empty.
141  inline bool timed_wait_for_data(size_t ns, size_t immediate_size) {
142  m_mutex.lock();
143  bool success = false;
144  // Wait while the queue is empty and this queue is alive
145  if (m_queue.size() < immediate_size) {
146  do {
147  sleeping++;
148  m_conditional.timedwait_ns(m_mutex, ns);
149  sleeping--;
150  }while(m_queue.empty() && m_alive);
151  }
152  // An element has been added or a signal was raised
153  if(!m_queue.empty()) {
154  success = true;
155  }
156  m_mutex.unlock();
157 
158  return success;
159  }
160 
161 
162  /// Returns immediately of queue size is >= immedeiate_size
163  /// Otherwise, it will poll over 'ns' nanoseconds or on a signal
164  /// until queue is not empty.
165  inline bool try_timed_wait_for_data(size_t ns, size_t immediate_size) {
166  m_mutex.lock();
167  bool success = false;
168  // Wait while the queue is empty and this queue is alive
169  if (m_queue.size() < immediate_size) {
170  if (m_queue.empty() && m_alive) {
171  sleeping++;
172  m_conditional.timedwait_ns(m_mutex, ns);
173  sleeping--;
174  }
175  }
176  // An element has been added or a signal was raised
177  if(!m_queue.empty()) {
178  success = true;
179  }
180  m_mutex.unlock();
181 
182  return success;
183  }
184 
185 
186 
187  inline bool wait_for_data() {
188 
189  m_mutex.lock();
190  bool success = false;
191  // Wait while the queue is empty and this queue is alive
192  while(m_queue.empty() && m_alive) {
193  sleeping++;
194  m_conditional.wait(m_mutex);
195  sleeping--;
196  }
197  // An element has been added or a signal was raised
198  if(!m_queue.empty()) {
199  success = true;
200  }
201  m_mutex.unlock();
202 
203  return success;
204  }
205 
206 
207  /**
208  * Blocks until an element is available in the queue
209  * or until stop_blocking() is called.
210  * The return value is a pair of <T value, bool success>
211  * If "success" if set, then "value" is valid and
212  * is an element popped from the queue.
213  * If "success" is false, stop_blocking() was called
214  * and the queue has been destroyed.
215  */
216  inline std::pair<T, bool> dequeue() {
217 
218  m_mutex.lock();
219  T elem = T();
220  bool success = false;
221  // Wait while the queue is empty and this queue is alive
222  while(m_queue.empty() && m_alive) {
223  sleeping++;
224  m_conditional.wait(m_mutex);
225  sleeping--;
226  }
227  // An element has been added or a signal was raised
228  if(!m_queue.empty()) {
229  success = true;
230  elem = m_queue.front();
231  m_queue.pop_front();
232  if (m_queue.empty() && sleeping_on_empty) {
233  m_empty_conditional.signal();
234  }
235  }
236  m_mutex.unlock();
237 
238  return std::make_pair(elem, success);
239  }
240 
241  /**
242  * Returns an element if the queue has an entry.
243  * returns [item, false] otherwise.
244  */
245  inline std::pair<T, bool> try_dequeue() {
246  if (m_queue.empty() || m_alive == false) return std::make_pair(T(), false);
247  m_mutex.lock();
248  T elem = T();
249  // Wait while the queue is empty and this queue is alive
250  if (m_queue.empty() || m_alive == false) {
251  m_mutex.unlock();
252  return std::make_pair(elem, false);
253  }
254  else {
255  elem = m_queue.front();
256  m_queue.pop_front();
257  if (m_queue.empty() && sleeping_on_empty) {
258  m_empty_conditional.signal();
259  }
260  }
261  m_mutex.unlock();
262 
263  return std::make_pair(elem, true);
264  }
265 
266  //! Returns true if the queue is empty
267  inline bool empty() {
268  m_mutex.lock();
269  bool res = m_queue.empty();
270  m_mutex.unlock();
271  return res;
272  }
273 
274  /** Wakes up all threads waiting on the queue whether
275  or not an element is available. Once this function is called,
276  all existing and future dequeue operations will return with failure.
277  Note that there could be elements remaining in the queue after
278  stop_blocking() is called.
279  */
280  inline void stop_blocking() {
281  m_mutex.lock();
282  m_alive = false;
283  m_conditional.broadcast();
284  m_empty_conditional.broadcast();
285  m_mutex.unlock();
286  }
287 
288  /**
289  Resumes operation of the blocking_queue. Future calls to
290  dequeue will proceed as normal.
291  */
292  inline void start_blocking() {
293  m_mutex.lock();
294  m_alive = true;
295  m_mutex.unlock();
296  }
297 
298  //! get the current size of the queue
299  inline size_t size() {
300  return m_queue.size();
301  }
302 
303  /**
304  * The conceptual "reverse" of dequeue().
305  * This function will block until the queue becomes empty, or
306  * until stop_blocking() is called.
307  * Returns true on success.
308  * Returns false if the queue is no longer alive
309  */
311  m_mutex.lock();
312  // if the queue still has elements in it while I am still alive, wait
313  while (m_queue.empty() == false && m_alive == true) {
314  sleeping_on_empty++;
315  m_empty_conditional.wait(m_mutex);
316  sleeping_on_empty--;
317  }
318  m_mutex.unlock();
319  // if I am alive, the queue must be empty. i.e. success
320  // otherwise I am dead
321  return m_alive;
322  }
323 
324  /**
325  * Causes any threads currently blocking on a dequeue to wake up
326  * and evaluate the state of the queue. If the queue is empty,
327  * the threads will return back to sleep immediately. If the queue
328  * is destroyed through stop_blocking, all threads will return.
329  */
330  void broadcast() {
331  m_mutex.lock();
332  m_conditional.broadcast();
333  m_mutex.unlock();
334  }
335 
336 
337 
338  /**
339  * Causes any threads blocking on "wait_until_empty()" to wake
340  * up and evaluate the state of the queue. If the queue is not empty,
341  * the threads will return back to sleep immediately. If the queue
342  * is empty, all threads will return.
343  */
345  m_mutex.lock();
346  m_empty_conditional.broadcast();
347  m_mutex.unlock();
348  }
349 
350 
351  ~blocking_queue() {
352  m_alive = false;
353  broadcast();
355  }
356  }; // end of blocking_queue class
357 
358 
359 } // end of namespace turi
360 
361 
362 #endif
Implements a blocking queue useful for producer/consumer models.
void wait(const mutex &mut) const
blocking_queue()
creates a blocking queue
void enqueue_to_head(const T &elem)
Add an element to the blocking queue.
bool empty()
Returns true if the queue is empty.
std::pair< T, bool > try_dequeue()
int timedwait_ns(const mutex &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.
bool try_timed_wait_for_data(size_t ns, size_t immediate_size)
void lock() const
Acquires a lock on the mutex.
Definition: mutex.hpp:64
bool timed_wait_for_data(size_t ns, size_t immediate_size)
void unlock() const
Releases a lock on the mutex.
Definition: mutex.hpp:73
size_t size()
get the current size of the queue
std::pair< T, bool > dequeue()
void signal() const
Signals one waiting thread to wake up.
void broadcast() const
Wakes up all waiting threads.
void enqueue(const T &elem)
Add an element to the blocking queue.