Turi Create  4.0
pthread_tools.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_PTHREAD_TOOLS_HPP
7 #define TURI_PTHREAD_TOOLS_HPP
8 
9 
10 #include <cstdlib>
11 #include <core/parallel/pthread_h.h>
12 #include <semaphore.h>
13 #include <sched.h>
14 #include <signal.h>
15 #include <sys/time.h>
16 #include <vector>
17 #include <list>
18 #include <queue>
19 #include <iostream>
20 #include <boost/function.hpp>
21 #include <core/logging/assertions.hpp>
22 #include <core/parallel/atomic_ops.hpp>
23 #include <core/util/branch_hints.hpp>
24 #include <boost/unordered_map.hpp>
25 #undef _POSIX_SPIN_LOCKS
26 #define _POSIX_SPIN_LOCKS -1
27 
28 #ifdef _WIN32
29 typedef long suseconds_t;
30 #endif
31 
32 #include <core/parallel/mutex.hpp>
33 #include <core/util/any.hpp>
34 
35 
36 namespace turi {
37 
38 
39 
40 #if _POSIX_SPIN_LOCKS >= 0
41  /**
42  * \ingroup threading
43  *
44  * Wrapper around pthread's spinlock.
45  *
46  * Before you use, see \ref parallel_object_intricacies.
47  */
48  class spinlock {
49  private:
50  // mutable not actually needed
51  mutable pthread_spinlock_t m_spin;
52  public:
53  /// constructs a spinlock
54  spinlock () {
55  int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
56  ASSERT_TRUE(!error);
57  }
58 
59  /** Copy constructor which does not copy. Do not use!
60  Required for compatibility with some STL implementations (LLVM).
61  which use the copy constructor for vector resize,
62  rather than the standard constructor. */
63  spinlock(const spinlock&) {
64  int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
65  ASSERT_TRUE(!error);
66  }
67 
68  // not copyable
69  void operator=(const spinlock& m) { }
70 
71 
72  /// Acquires a lock on the spinlock
73  inline void lock() const {
74  int error = pthread_spin_lock( &m_spin );
75  ASSERT_TRUE(!error);
76  }
77  /// Releases a lock on the spinlock
78  inline void unlock() const {
79  int error = pthread_spin_unlock( &m_spin );
80  ASSERT_TRUE(!error);
81  }
82  /// Non-blocking attempt to acquire a lock on the spinlock
83  inline bool try_lock() const {
84  return pthread_spin_trylock( &m_spin ) == 0;
85  }
86  ~spinlock(){
87  int error = pthread_spin_destroy( &m_spin );
88  ASSERT_TRUE(!error);
89  }
90  friend class conditional;
91  }; // End of spinlock
92 #define SPINLOCK_SUPPORTED 1
93 #else
94  //! if spinlock not supported, it is typedef it to a mutex.
95  typedef mutex spinlock;
96 #define SPINLOCK_SUPPORTED 0
97 #endif
98 
99 
100  /**
101  * \ingroup threading
102  *If pthread spinlock is not implemented,
103  * this provides a simple alternate spin lock implementation.
104  *
105  * Before you use, see \ref parallel_object_intricacies.
106  */
108  private:
109  // mutable not actually needed
110  mutable volatile char spinner;
111  public:
112  /// constructs a spinlock
114  spinner = 0;
115  }
116 
117  /** Copy constructor which does not copy. Do not use!
118  Required for compatibility with some STL implementations (LLVM).
119  which use the copy constructor for vector resize,
120  rather than the standard constructor. */
122  spinner = 0;
123  }
124 
125  // not copyable
126  void operator=(const simple_spinlock& m) { }
127 
128 
129  /// Acquires a lock on the spinlock
130  inline void lock() const {
131  while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
132  }
133  /// Releases a lock on the spinlock
134  inline void unlock() const {
135  __sync_synchronize();
136  spinner = 0;
137  }
138  /// Non-blocking attempt to acquire a lock on the spinlock
139  inline bool try_lock() const {
140  return (__sync_lock_test_and_set(&spinner, 1) == 0);
141  }
142  ~simple_spinlock(){
143  ASSERT_TRUE(spinner == 0);
144  }
145  };
146 
147 
148  /**
149  * \ingroup threading
150  *If pthread spinlock is not implemented,
151  * this provides a simple alternate spin lock implementation.
152  *
153  * Before you use, see \ref parallel_object_intricacies.
154  */
156  private:
157  // mutable not actually needed
158  mutable volatile char spinner;
159  // char padding[63];
160  public:
161  /// constructs a spinlock
163  spinner = 0;
164  }
165 
166  /** Copy constructor which does not copy. Do not use!
167  Required for compatibility with some STL implementations (LLVM).
168  which use the copy constructor for vector resize,
169  rather than the standard constructor. */
171  spinner = 0;
172  }
173 
174  // not copyable
175  void operator=(const padded_simple_spinlock& m) { }
176 
177 
178  /// Acquires a lock on the spinlock
179  inline void lock() const {
180  while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
181  }
182  /// Releases a lock on the spinlock
183  inline void unlock() const {
184  __sync_synchronize();
185  spinner = 0;
186  }
187  /// Non-blocking attempt to acquire a lock on the spinlock
188  inline bool try_lock() const {
189  return (__sync_lock_test_and_set(&spinner, 1) == 0);
190  }
192  ASSERT_TRUE(spinner == 0);
193  }
194  };
195 
196 
197 
198 
199  /**
200  * \ingroup threading
201  * Wrapper around pthread's condition variable
202  *
203  * Before you use, see \ref parallel_object_intricacies.
204  */
205  class conditional {
206  private:
207  mutable pthread_cond_t m_cond;
208 
209  public:
210  conditional() {
211  int error = pthread_cond_init(&m_cond, NULL);
212  ASSERT_TRUE(!error);
213  }
214 
215  /** Copy constructor which does not copy. Do not use!
216  Required for compatibility with some STL implementations (LLVM).
217  which use the copy constructor for vector resize,
218  rather than the standard constructor. */
220  int error = pthread_cond_init(&m_cond, NULL);
221  ASSERT_TRUE(!error);
222  }
223 
224  // not copyable
225  void operator=(const conditional& m) { }
226 
227 
228  /// Waits on condition. The mutex must already be acquired. Caller
229  /// must be careful about spurious wakes.
230  inline void wait(const mutex& mut) const {
231 #ifdef _WIN32
232  mut.locked = false;
233  int error = pthread_cond_wait(&m_cond, &mut.m_mut);
234  mut.locked = true;
235 #else
236  int error = pthread_cond_wait(&m_cond, &mut.m_mut);
237 #endif
238  ASSERT_MSG(!error, "Condition variable wait error %d", error);
239  }
240  /// Waits on condition. The mutex must already be acquired. Caller
241  /// must be careful about spurious wakes.
242  inline void wait(std::unique_lock<mutex>& mut) const {
243  // take over the pointer
244  auto lock_ptr = mut.mutex();
245  mut.release();
246 
247  wait(*lock_ptr);
248  // put it back into the unique lock
249  std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
250  mut.swap(retlock);
251  }
252  /// Waits on condition. The mutex must already be acquired.
253  /// Returns only when predicate evaulates to true
254  template <typename Predicate>
255  inline void wait(const mutex& mut, Predicate pred) const {
256  while (!pred()) wait(mut);
257  }
258  /// Waits on condition. The mutex must already be acquired.
259  /// Returns only when predicate evaulates to true
260  template <typename Predicate>
261  inline void wait(std::unique_lock<mutex>& mut, Predicate pred) const {
262  while (!pred()) wait(mut);
263  }
264 
265  /// Wait till a timeout time
266  inline int timedwait(const mutex& mut, const struct timespec& timeout) const {
267 #ifdef _WIN32
268  mut.locked = false;
269  int ret = pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
270  mut.locked = true;
271 #else
272  int ret = pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
273 #endif
274  return ret;
275  }
276 
277  /// Like wait() but with a time limit of "sec" seconds
278  inline int timedwait(const mutex& mut, size_t sec) const {
279  struct timespec timeout;
280  struct timeval tv;
281  struct timezone tz;
282  gettimeofday(&tv, &tz);
283  timeout.tv_nsec = tv.tv_usec * 1000;
284  timeout.tv_sec = tv.tv_sec + (time_t)sec;
285  return timedwait(mut, timeout);
286  }
287 
288  /// Like wait() but with a time limit of "ms" milliseconds
289  inline int timedwait_ms(const mutex& mut, size_t ms) const {
290  struct timespec timeout;
291  struct timeval tv;
292  gettimeofday(&tv, NULL);
293  // convert ms to s and ns
294  size_t s = ms / 1000;
295  ms = ms % 1000;
296  size_t ns = ms * 1000000;
297  // convert timeval to timespec
298  timeout.tv_nsec = tv.tv_usec * 1000;
299  timeout.tv_sec = tv.tv_sec;
300 
301  // add the time
302  timeout.tv_nsec += (suseconds_t)ns;
303  timeout.tv_sec += (time_t)s;
304  // shift the nsec to sec if overflow
305  if (timeout.tv_nsec > 1000000000) {
306  timeout.tv_sec ++;
307  timeout.tv_nsec -= 1000000000;
308  }
309  return timedwait(mut, timeout);
310  }
311  /// Like wait() but with a time limit of "ns" nanoseconds
312  inline int timedwait_ns(const mutex& mut, size_t ns) const {
313  struct timespec timeout;
314  struct timeval tv;
315  gettimeofday(&tv, NULL);
316  assert(ns > 0);
317  // convert ns to s and ns
318  size_t s = ns / 1000000;
319  ns = ns % 1000000;
320 
321  // convert timeval to timespec
322  timeout.tv_nsec = tv.tv_usec * 1000;
323  timeout.tv_sec = tv.tv_sec;
324 
325  // add the time
326  timeout.tv_nsec += (suseconds_t)ns;
327  timeout.tv_sec += (time_t)s;
328  // shift the nsec to sec if overflow
329  if (timeout.tv_nsec > 1000000000) {
330  timeout.tv_sec ++;
331  timeout.tv_nsec -= 1000000000;
332  }
333  return timedwait(mut, timeout);
334  }
335 
336  /// Like wait() but with a time limit of "sec" seconds
337  inline int timedwait(std::unique_lock<mutex>& mut, size_t sec) const {
338  // take over the pointer
339  auto lock_ptr = mut.mutex();
340  mut.release();
341 
342  int ret = timedwait(*lock_ptr, sec);
343 
344  // put it back into the unique lock
345  std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
346  mut.swap(retlock);
347  return ret;
348  }
349  /// Like wait() but with a time limit of "ms" milliseconds
350  inline int timedwait_ms(std::unique_lock<mutex>& mut, size_t ms) const {
351  // take over the pointer
352  auto lock_ptr = mut.mutex();
353  mut.release();
354 
355  int ret = timedwait_ms(*mut.mutex(), ms);
356 
357  // put it back into the unique lock
358  std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
359  mut.swap(retlock);
360  return ret;
361  }
362  /// Like wait() but with a time limit of "ns" nanoseconds
363  inline int timedwait_ns(std::unique_lock<mutex>& mut, size_t ns) const {
364  // take over the pointer
365  auto lock_ptr = mut.mutex();
366  mut.release();
367 
368  int ret = timedwait_ns(*lock_ptr, ns);
369 
370  // put it back into the unique lock
371  std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
372  mut.swap(retlock);
373  return ret;
374  }
375 
376  /// Signals one waiting thread to wake up
377  inline void signal() const {
378  int error = pthread_cond_signal(&m_cond);
379  ASSERT_MSG(!error, "Condition variable signal error %d", error);
380  }
381  /// Signals one waiting thread to wake up. Synonym for signal()
382  inline void notify_one() const {
383  signal();
384  }
385  /// Wakes up all waiting threads
386  inline void broadcast() const {
387  int error = pthread_cond_broadcast(&m_cond);
388  ASSERT_MSG(!error, "Condition variable broadcast error %d", error);
389  }
390  /// Synonym for broadcast
391  inline void notify_all() const {
392  broadcast();
393  }
394  ~conditional() noexcept {
395  int error = pthread_cond_destroy(&m_cond);
396  if (error) {
397  try {
398  std::cerr << "Condition variable destroy error " << error
399  << std::endl;
400  } catch (...) {
401  }
402  abort();
403  }
404  }
405  }; // End conditional
406 
407 
408 #ifdef __APPLE__
409  /**
410  * Custom implementation of a semaphore.
411  *
412  * Before you use, see \ref parallel_object_intricacies.
413  */
414  class semaphore {
415  private:
416  conditional cond;
417  mutex mut;
418  mutable volatile size_t semvalue;
419  mutable volatile size_t waitercount;
420 
421  public:
422  semaphore() {
423  semvalue = 0;
424  waitercount = 0;
425  }
426  /** Copy constructor which does not copy. Do not use!
427  Required for compatibility with some STL implementations (LLVM).
428  which use the copy constructor for vector resize,
429  rather than the standard constructor. */
430  semaphore(const semaphore&) {
431  semvalue = 0;
432  waitercount = 0;
433  }
434 
435  // not copyable
436  void operator=(const semaphore& m) { }
437 
438  inline void post() const {
439  mut.lock();
440  if (waitercount > 0) {
441  cond.signal();
442  }
443  semvalue++;
444  mut.unlock();
445  }
446  inline void wait() const {
447  mut.lock();
448  waitercount++;
449  while (semvalue == 0) {
450  cond.wait(mut);
451  }
452  waitercount--;
453  semvalue--;
454  mut.unlock();
455  }
456  ~semaphore() {
457  ASSERT_TRUE(waitercount == 0);
458  ASSERT_TRUE(semvalue == 0);
459  }
460  }; // End semaphore
461 #else
462  /**
463  * Wrapper around pthread's semaphore
464  *
465  * Before you use, see \ref parallel_object_intricacies.
466  */
467  class semaphore {
468  private:
469  mutable sem_t m_sem;
470 
471  public:
472  semaphore() {
473  int error = sem_init(&m_sem, 0,0);
474  ASSERT_TRUE(!error);
475  }
476 
477  /** Copy constructor with does not copy. Do not use!
478  Required for compatibility with some STL implementations (LLVM).
479  which use the copy constructor for vector resize,
480  rather than the standard constructor. */
482  int error = sem_init(&m_sem, 0,0);
483  ASSERT_TRUE(!error);
484  }
485 
486  // not copyable
487  void operator=(const semaphore& m) { }
488 
489  inline void post() const {
490  int error = sem_post(&m_sem);
491  ASSERT_TRUE(!error);
492  }
493  inline void wait() const {
494  int error = sem_wait(&m_sem);
495  ASSERT_TRUE(!error);
496  }
497  ~semaphore() {
498  int error = sem_destroy(&m_sem);
499  ASSERT_TRUE(!error);
500  }
501  }; // End semaphore
502 #endif
503 
504 
505 #define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
506 #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
507 #define atomic_inc(P) __sync_add_and_fetch((P), 1)
508 #define atomic_add(P, V) __sync_add_and_fetch((P), (V))
509 #define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
510 #ifdef __arm64
511 #define cpu_relax() asm volatile("yield\n": : :"memory")
512 #else
513 #define cpu_relax() asm volatile("pause\n": : :"memory")
514 #endif
515 
516  /**
517  * \class spinrwlock
518  * rwlock built around "spinning"
519  * source adapted from http://locklessinc.com/articles/locks/
520  * "Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors"
521  * John Mellor-Crummey and Michael Scott
522  */
523  class spinrwlock {
524 
525  union rwticket {
526  unsigned u;
527  unsigned short us;
528  __extension__ struct {
529  unsigned char write;
530  unsigned char read;
531  unsigned char users;
532  } s;
533  };
534  mutable bool writing;
535  mutable volatile rwticket l;
536  public:
537  spinrwlock() {
538  memset(const_cast<rwticket*>(&l), 0, sizeof(rwticket));
539  }
540  inline void writelock() const {
541  unsigned me = atomic_xadd(&l.u, (1<<16));
542  unsigned char val = (unsigned char)(me >> 16);
543 
544  while (val != l.s.write) asm volatile("pause\n": : :"memory");
545  writing = true;
546  }
547 
548  inline void wrunlock() const{
549  rwticket t = *const_cast<rwticket*>(&l);
550 
551  t.s.write++;
552  t.s.read++;
553 
554  *(volatile unsigned short *) (&l) = t.us;
555  writing = false;
556  __asm("mfence");
557  }
558 
559  inline void readlock() const {
560  unsigned me = atomic_xadd(&l.u, (1<<16));
561  unsigned char val = (unsigned char)(me >> 16);
562 
563  while (val != l.s.read) asm volatile("pause\n": : :"memory");
564  l.s.read++;
565  }
566 
567  inline void rdunlock() const {
568  atomic_inc(&l.s.write);
569  }
570 
571  inline void unlock() const {
572  if (!writing) rdunlock();
573  else wrunlock();
574  }
575  };
576 
577 
578 
579 #define RW_WAIT_BIT 0
580 #define RW_WRITE_BIT 1
581 #define RW_READ_BIT 2
582 
583 #define RW_WAIT 1
584 #define RW_WRITE 2
585 #define RW_READ 4
586 
587  struct spinrwlock2 {
588  mutable unsigned int l;
589 
590  spinrwlock2():l(0) {}
591  void writelock() const {
592  while (1) {
593  unsigned state = l;
594 
595  /* No readers or writers? */
596  if (state < RW_WRITE)
597  {
598  /* Turn off RW_WAIT, and turn on RW_WRITE */
599  if (cmpxchg(&l, state, RW_WRITE) == state) return;
600 
601  /* Someone else got there... time to wait */
602  state = l;
603  }
604 
605  /* Turn on writer wait bit */
606  if (!(state & RW_WAIT)) atomic_set_bit(&l, RW_WAIT_BIT);
607 
608  /* Wait until can try to take the lock */
609  while (l > RW_WAIT) cpu_relax();
610  }
611  }
612 
613  void wrunlock() const {
614  atomic_add(&l, -RW_WRITE);
615  }
616 
617  void readlock() const {
618  while (1) {
619  /* A writer exists? */
620  while (l & (RW_WAIT | RW_WRITE)) cpu_relax();
621 
622  /* Try to get read lock */
623  if (!(atomic_xadd(&l, RW_READ) & (RW_WAIT | RW_WRITE))) return;
624 
625  /* Undo */
626  atomic_add(&l, -RW_READ);
627  }
628  }
629 
630  void rdunlock() const {
631  atomic_add(&l, -RW_READ);
632  }
633  };
634 
635 #undef atomic_xadd
636 #undef cmpxchg
637 #undef atomic_inc
638 #undef atomic_set_bit
639 #undef atomic_add
640 #undef RW_WAIT_BIT
641 #undef RW_WRITE_BIT
642 #undef RW_READ_BIT
643 #undef RW_WAIT
644 #undef RW_WRITE
645 #undef RW_READ
646 
647 
648  /**
649  * \class rwlock
650  * Wrapper around pthread's rwlock
651  *
652  * Before you use, see \ref parallel_object_intricacies.
653  */
654  class rwlock {
655  private:
656  mutable pthread_rwlock_t m_rwlock;
657  public:
658  rwlock() {
659  int error = pthread_rwlock_init(&m_rwlock, NULL);
660  ASSERT_TRUE(!error);
661  }
662  ~rwlock() {
663  int error = pthread_rwlock_destroy(&m_rwlock);
664  ASSERT_TRUE(!error);
665  }
666 
667  // not copyable
668  void operator=(const rwlock& m) { }
669 
670  /**
671  * \todo: Remove!
672  *
673  * Copy constructor which does not copy. Do not use! Required for
674  * compatibility with some STL implementations (LLVM). which use
675  * the copy constructor for vector resize, rather than the
676  * standard constructor. */
677  rwlock(const rwlock &) {
678  int error = pthread_rwlock_init(&m_rwlock, NULL);
679  ASSERT_TRUE(!error);
680  }
681 
682  inline void readlock() const {
683  pthread_rwlock_rdlock(&m_rwlock);
684  //ASSERT_TRUE(!error);
685  }
686  inline void writelock() const {
687  pthread_rwlock_wrlock(&m_rwlock);
688  //ASSERT_TRUE(!error);
689  }
690  inline bool try_readlock() const {
691  return pthread_rwlock_tryrdlock(&m_rwlock) == 0;
692  }
693  inline bool try_writelock() const {
694  return pthread_rwlock_trywrlock(&m_rwlock) == 0;
695  }
696  inline void unlock() const {
697  pthread_rwlock_unlock(&m_rwlock);
698  //ASSERT_TRUE(!error);
699  }
700  inline void rdunlock() const {
701  unlock();
702  }
703  inline void wrunlock() const {
704  unlock();
705  }
706  }; // End rwlock
707 
708 
709 
710 
711 
712  /**
713  * \ingroup threading
714  * This is a simple sense-reversing barrier implementation.
715  * In addition to standard barrier functionality, this also
716  * provides a "cancel" function which can be used to destroy
717  * the barrier, releasing all threads stuck in the barrier.
718  *
719  * Before you use, see \ref parallel_object_intricacies.
720  */
722  private:
725  mutable size_t needed;
726  mutable size_t called;
727 
728  mutable bool barrier_sense;
729  mutable bool barrier_release;
730  bool alive;
731 
732  // not copyconstructible
734 
735 
736  public:
737  /// Construct a barrier which will only fall when numthreads enter
738  cancellable_barrier(size_t numthreads) {
739  needed = numthreads;
740  called = 0;
741  barrier_sense = false;
742  barrier_release = true;
743  alive = true;
744  }
745 
746  // not copyable
747  void operator=(const cancellable_barrier& m) { }
748 
749  void resize_unsafe(size_t numthreads) {
750  needed = numthreads;
751  }
752 
753  /**
754  * \warning: This barrier is safely NOT reusable with this cancel
755  * definition
756  */
757  inline void cancel() {
758  alive = false;
759  conditional.broadcast();
760  }
761  /// Wait on the barrier until numthreads has called wait
762  inline void wait() const {
763  if (!alive) return;
764  mutex.lock();
765  // set waiting;
766  called++;
767  bool listening_on = barrier_sense;
768  if (called == needed) {
769  // if I have reached the required limit, wait up. Set waiting
770  // to 0 to make sure everyone wakes up
771  called = 0;
772  barrier_release = barrier_sense;
773  barrier_sense = !barrier_sense;
774  // clear all waiting
775  conditional.broadcast();
776  } else {
777  // while no one has broadcasted, sleep
778  while(barrier_release != listening_on && alive) conditional.wait(mutex);
779  }
780  mutex.unlock();
781  }
782  }; // end of conditional
783 
784 
785 
786  /**
787  * \class barrier
788  * Wrapper around pthread's barrier
789  *
790  * Before you use, see \ref parallel_object_intricacies.
791  */
792 #ifdef __linux__
793  /**
794  * \ingroup threading
795  * Wrapper around pthread's barrier
796  */
797  class barrier {
798  private:
799  mutable pthread_barrier_t m_barrier;
800  // not copyconstructable
801  barrier(const barrier&) { }
802  public:
803  /// Construct a barrier which will only fall when numthreads enter
804  barrier(size_t numthreads) {
805  pthread_barrier_init(&m_barrier, NULL, (unsigned)numthreads); }
806  // not copyable
807  void operator=(const barrier& m) { }
808  void resize_unsafe(size_t numthreads) {
809  pthread_barrier_destroy(&m_barrier);
810  pthread_barrier_init(&m_barrier, NULL, (unsigned)numthreads);
811  }
812  ~barrier() { pthread_barrier_destroy(&m_barrier); }
813  /// Wait on the barrier until numthreads has called wait
814  inline void wait() const { pthread_barrier_wait(&m_barrier); }
815  };
816 
817 #else
818  /* In some systems, pthread_barrier is not available.
819  */
821 #endif
822 
823 
824 
825  inline void prefetch_range(void *addr, size_t len) {
826  char *cp;
827  char *end = (char*)(addr) + len;
828 
829  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
830  }
831  inline void prefetch_range_write(void *addr, size_t len) {
832  char *cp;
833  char *end = (char*)(addr) + len;
834 
835  for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
836  }
837 
838 
839 
840 
841 
842 
843 
844 
845 
846  /**
847  * \ingroup threading
848  * A collection of routines for creating and managing threads.
849  *
850  * The thread object performs limited exception forwarding.
851  * exception throws within a thread of type const char* will be caught
852  * and forwarded to the join() function.
853  * If the call to join() is wrapped by a try-catch block, the exception
854  * will be caught safely and thread cleanup will be completed properly.
855  */
856  class thread {
857  public:
858 
859  /**
860  * This class contains the data unique to each thread. All threads
861  * are gauranteed to have an associated turicreate thread_specific
862  * data. The thread object is copyable.
863  */
864  class tls_data {
865  public:
866  inline tls_data(size_t thread_id);
867  inline size_t thread_id() { return thread_id_; }
868  inline void set_thread_id(size_t t) { thread_id_ = t; }
869  any& operator[](const size_t& id);
870  bool contains(const size_t& id) const;
871  size_t erase(const size_t& id);
872  inline void set_in_thread_flag(bool val) { in_thread = val; }
873  inline bool is_in_thread() { return in_thread; }
874  private:
875  size_t thread_id_;
876  bool in_thread = false;
877  std::unique_ptr<boost::unordered_map<size_t, any> > local_data;
878  }; // end of thread specific data
879 
880 
881 
882  /// Static helper routines
883  // ===============================================================
884 
885  /**
886  * Get the thread specific data associated with this thread
887  */
888  static tls_data& get_tls_data();
889 
890  /** Get the id of the calling thread. This will typically be the
891  index in the thread group. Between 0 to ncpus. */
892  static inline size_t thread_id() { return get_tls_data().thread_id(); }
893 
894  /** Set the id of the calling thread. This will typically be the
895  index in the thread group. Between 0 to ncpus. */
896  static inline void set_thread_id(size_t t) { get_tls_data().set_thread_id(t); }
897 
898  /**
899  * Get a reference to an any object
900  */
901  static inline any& get_local(const size_t& id) {
902  return get_tls_data()[id];
903  }
904 
905  /**
906  * Check to see if there is an entry in the local map
907  */
908  static inline bool contains(const size_t& id) {
909  return get_tls_data().contains(id);
910  }
911 
912  /**
913  * Removes the entry from the local map.
914  * @return number of elements erased.
915  */
916  static inline size_t erase(const size_t& id){
917  return get_tls_data().erase(id);
918  }
919 
920  /**
921  * This static method joins the invoking thread with the other
922  * thread object. This thread will not return from the join
923  * routine until the other thread complets it run.
924  */
925  static void join(thread& other);
926 
927  // Called just before thread exits. Can be used
928  // to do special cleanup... (need for Java JNI)
929  static void thread_destroy_callback();
930  static void set_thread_destroy_callback(void (*callback)());
931 
932 
933  /**
934  * Return the number processing units (individual cores) on this
935  * system
936  */
937  static size_t cpu_count();
938 
939 
940  private:
941 
942  struct invoke_args{
943  size_t m_thread_id;
944  boost::function<void(void)> spawn_routine;
945  invoke_args(size_t m_thread_id, const boost::function<void(void)> &spawn_routine)
946  : m_thread_id(m_thread_id), spawn_routine(spawn_routine) { };
947  };
948 
949  //! Little helper function used to launch threads
950  static void* invoke(void *_args);
951 
952  public:
953 
954  /**
955  * Creates a thread with a user-defined associated thread ID
956  */
957  inline thread(size_t thread_id = 0) :
958  m_stack_size(0),
959  m_p_thread(0),
960  m_thread_id(thread_id),
961  thread_started(false){
962  // Calculate the stack size in in bytes;
963  const int BYTES_PER_MB = 1048576;
964  const int DEFAULT_SIZE_IN_MB = 8;
965  m_stack_size = DEFAULT_SIZE_IN_MB * BYTES_PER_MB;
966  }
967 
968  /**
969  * execute this function to spawn a new thread running spawn_function
970  * routine
971  */
972  void launch(const boost::function<void (void)> &spawn_routine);
973 
974  /**
975  * Same as launch() except that you can specify a CPU on which to
976  * run the thread. This only currently supported in Linux and if
977  * invoked on a non Linux based system this will be equivalent to
978  * start().
979  */
980  void launch(const boost::function<void (void)> &spawn_routine, size_t cpu_id);
981 
982 
983  /**
984  * Join the calling thread with this thread.
985  * const char* exceptions
986  * thrown by the thread is forwarded to the join() function.
987  */
988  inline void join() {
989  join(*this);
990  }
991 
992  /// Returns true if the thread is still running
993  inline bool active() const {
994  return thread_started;
995  }
996 
997  inline ~thread() { }
998 
999  /// Returns the pthread thread id
1000  inline pthread_t pthreadid() {
1001  return m_p_thread;
1002  }
1003  private:
1004 
1005 
1006  //! The size of the internal stack for this thread
1007  size_t m_stack_size;
1008 
1009  //! The internal pthread object
1010  pthread_t m_p_thread;
1011 
1012  //! the threads id
1013  size_t m_thread_id;
1014 
1015  bool thread_started;
1016  }; // End of class thread
1017 
1018 
1019 
1020 
1021 
1022  /**
1023  * \ingroup threading
1024  * Manages a collection of threads.
1025  *
1026  * The thread_group object performs limited exception forwarding.
1027  * exception throws within a thread of type const char* will be caught
1028  * and forwarded to the join() function.
1029  * If the call to join() is wrapped by a try-catch block, the exception
1030  * will be caught safely and thread cleanup will be completed properly.
1031  *
1032  * If multiple threads are running in the thread-group, the master should
1033  * test if running_threads() is > 0, and retry the join().
1034  */
1036  private:
1037  size_t m_thread_counter;
1038  size_t threads_running;
1039  mutex mut;
1040  conditional cond;
1041  std::queue<std::pair<pthread_t, const char*> > joinqueue;
1042  // not implemented
1043  thread_group& operator=(const thread_group &thrgrp);
1044  thread_group(const thread_group&);
1045  static void invoke(boost::function<void (void)> spawn_function, thread_group *group);
1046  public:
1047  /**
1048  * Initializes a thread group.
1049  */
1050  thread_group() : m_thread_counter(0), threads_running(0) { }
1051 
1052  /**
1053  * Launch a single thread which calls spawn_function No CPU affinity is
1054  * set so which core it runs on is up to the OS Scheduler
1055  */
1056  void launch(const boost::function<void (void)> &spawn_function);
1057 
1058  /**
1059  * Launch a single thread which calls spawn_function Also sets CPU
1060  * Affinity
1061  */
1062  void launch(const boost::function<void (void)> &spawn_function, size_t cpu_id);
1063 
1064  /** Waits for all threads to complete execution. const char* exceptions
1065  thrown by threads are forwarded to the join() function.
1066  */
1067  void join();
1068 
1069  /// Returns the number of running threads.
1070  inline size_t running_threads() {
1071  return threads_running;
1072  }
1073  //! Destructor. Waits for all threads to complete execution
1074  inline ~thread_group(){ join(); }
1075 
1076  }; // End of thread group
1077 
1078 
1079  /// Runs f in a new thread. convenience function for creating a new thread quickly.
1080  inline thread launch_in_new_thread(const boost::function<void (void)> &f,
1081  size_t cpuid = size_t(-1)) {
1082  thread thr;
1083  if (cpuid != size_t(-1)) thr.launch(f, cpuid);
1084  else thr.launch(f);
1085  return thr;
1086  }
1087 
1088  /// an integer value padded to 64 bytes
1090  size_t val;
1091  char __pad__[64 - sizeof(size_t)];
1092  };
1093 
1094 
1095  /**
1096  * Convenience typedef to be equivalent to the std::condition_variable
1097  *
1098  */
1100 }; // End Namespace
1101 
1102 #endif
thread(size_t thread_id=0)
bool active() const
Returns true if the thread is still running.
int timedwait(const mutex &mut, const struct timespec &timeout) const
Wait till a timeout time.
static bool contains(const size_t &id)
void notify_one() const
Signals one waiting thread to wake up. Synonym for signal()
int timedwait(std::unique_lock< mutex > &mut, size_t sec) const
Like wait() but with a time limit of "sec" seconds.
int timedwait(const mutex &mut, size_t sec) const
Like wait() but with a time limit of "sec" seconds.
void unlock() const
Releases a lock on the spinlock.
sframe group(sframe sframe_in, std::string key_column)
void wait(const mutex &mut) const
void wait() const
Wait on the barrier until numthreads has called wait.
void wait(const mutex &mut, Predicate pred) const
conditional(const conditional &)
thread launch_in_new_thread(const boost::function< void(void)> &f, size_t cpuid=size_t(-1))
Runs f in a new thread. convenience function for creating a new thread quickly.
an integer value padded to 64 bytes
mutex()
constructs a mutex
Definition: mutex.hpp:37
static void set_thread_id(size_t t)
void lock() const
Acquires a lock on the spinlock.
int timedwait_ms(const mutex &mut, size_t ms) const
Like wait() but with a time limit of "ms" milliseconds.
rwlock(const rwlock &)
size_t running_threads()
Returns the number of running threads.
int timedwait_ns(const mutex &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.
bool try_lock() const
Non-blocking attempt to acquire a lock on the spinlock.
mutex spinlock
if spinlock not supported, it is typedef it to a mutex.
void lock() const
Acquires a lock on the mutex.
Definition: mutex.hpp:64
padded_simple_spinlock()
constructs a spinlock
conditional condition_variable
padded_simple_spinlock(const padded_simple_spinlock &)
void unlock() const
Releases a lock on the mutex.
Definition: mutex.hpp:73
static size_t erase(const size_t &id)
void notify_all() const
Synonym for broadcast.
void unlock() const
Releases a lock on the spinlock.
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
void wait(std::unique_lock< mutex > &mut, Predicate pred) const
void launch(const boost::function< void(void)> &spawn_routine)
static size_t thread_id()
bool try_lock() const
Non-blocking attempt to acquire a lock on the mutex.
Definition: mutex.hpp:82
pthread_t pthreadid()
Returns the pthread thread id.
void lock() const
Acquires a lock on the spinlock.
cancellable_barrier(size_t numthreads)
Construct a barrier which will only fall when numthreads enter.
static any & get_local(const size_t &id)
~thread_group()
Destructor. Waits for all threads to complete execution.
simple_spinlock()
constructs a spinlock
semaphore(const semaphore &)
void wait(std::unique_lock< mutex > &mut) const
simple_spinlock(const simple_spinlock &)
void signal() const
Signals one waiting thread to wake up.
void broadcast() const
Wakes up all waiting threads.
bool try_lock() const
Non-blocking attempt to acquire a lock on the spinlock.
int timedwait_ms(std::unique_lock< mutex > &mut, size_t ms) const
Like wait() but with a time limit of "ms" milliseconds.
int timedwait_ns(std::unique_lock< mutex > &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.