6 #ifndef TURI_PTHREAD_TOOLS_HPP 7 #define TURI_PTHREAD_TOOLS_HPP 11 #include <core/parallel/pthread_h.h> 12 #include <semaphore.h> 20 #include <boost/function.hpp> 21 #include <core/logging/assertions.hpp> 22 #include <core/parallel/atomic_ops.hpp> 23 #include <core/util/branch_hints.hpp> 24 #include <boost/unordered_map.hpp> 25 #undef _POSIX_SPIN_LOCKS 26 #define _POSIX_SPIN_LOCKS -1 29 typedef long suseconds_t;
32 #include <core/parallel/mutex.hpp> 33 #include <core/util/any.hpp> 40 #if _POSIX_SPIN_LOCKS >= 0 51 mutable pthread_spinlock_t m_spin;
55 int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
64 int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
69 void operator=(
const spinlock& m) { }
73 inline void lock()
const {
74 int error = pthread_spin_lock( &m_spin );
78 inline void unlock()
const {
79 int error = pthread_spin_unlock( &m_spin );
84 return pthread_spin_trylock( &m_spin ) == 0;
87 int error = pthread_spin_destroy( &m_spin );
90 friend class conditional;
92 #define SPINLOCK_SUPPORTED 1 96 #define SPINLOCK_SUPPORTED 0 110 mutable volatile char spinner;
131 while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
135 __sync_synchronize();
140 return (__sync_lock_test_and_set(&spinner, 1) == 0);
158 mutable volatile char spinner;
180 while(spinner == 1 || __sync_lock_test_and_set(&spinner, 1));
184 __sync_synchronize();
189 return (__sync_lock_test_and_set(&spinner, 1) == 0);
207 mutable pthread_cond_t m_cond;
211 int error = pthread_cond_init(&m_cond, NULL);
220 int error = pthread_cond_init(&m_cond, NULL);
233 int error = pthread_cond_wait(&m_cond, &mut.m_mut);
236 int error = pthread_cond_wait(&m_cond, &mut.m_mut);
238 ASSERT_MSG(!error,
"Condition variable wait error %d", error);
242 inline void wait(std::unique_lock<mutex>& mut)
const {
244 auto lock_ptr = mut.mutex();
249 std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
254 template <
typename Predicate>
255 inline void wait(
const mutex& mut, Predicate pred)
const {
256 while (!pred()) wait(mut);
260 template <
typename Predicate>
261 inline void wait(std::unique_lock<mutex>& mut, Predicate pred)
const {
262 while (!pred()) wait(mut);
269 int ret = pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
272 int ret = pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
279 struct timespec timeout;
282 gettimeofday(&tv, &tz);
283 timeout.tv_nsec = tv.tv_usec * 1000;
284 timeout.tv_sec = tv.tv_sec + (time_t)sec;
285 return timedwait(mut, timeout);
290 struct timespec timeout;
292 gettimeofday(&tv, NULL);
294 size_t s = ms / 1000;
296 size_t ns = ms * 1000000;
298 timeout.tv_nsec = tv.tv_usec * 1000;
299 timeout.tv_sec = tv.tv_sec;
302 timeout.tv_nsec += (suseconds_t)ns;
303 timeout.tv_sec += (time_t)s;
305 if (timeout.tv_nsec > 1000000000) {
307 timeout.tv_nsec -= 1000000000;
309 return timedwait(mut, timeout);
313 struct timespec timeout;
315 gettimeofday(&tv, NULL);
318 size_t s = ns / 1000000;
322 timeout.tv_nsec = tv.tv_usec * 1000;
323 timeout.tv_sec = tv.tv_sec;
326 timeout.tv_nsec += (suseconds_t)ns;
327 timeout.tv_sec += (time_t)s;
329 if (timeout.tv_nsec > 1000000000) {
331 timeout.tv_nsec -= 1000000000;
333 return timedwait(mut, timeout);
337 inline int timedwait(std::unique_lock<mutex>& mut,
size_t sec)
const {
339 auto lock_ptr = mut.mutex();
342 int ret = timedwait(*lock_ptr, sec);
345 std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
350 inline int timedwait_ms(std::unique_lock<mutex>& mut,
size_t ms)
const {
352 auto lock_ptr = mut.mutex();
355 int ret = timedwait_ms(*mut.mutex(), ms);
358 std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
363 inline int timedwait_ns(std::unique_lock<mutex>& mut,
size_t ns)
const {
365 auto lock_ptr = mut.mutex();
368 int ret = timedwait_ns(*lock_ptr, ns);
371 std::unique_lock<mutex> retlock(*lock_ptr, std::adopt_lock);
378 int error = pthread_cond_signal(&m_cond);
379 ASSERT_MSG(!error,
"Condition variable signal error %d", error);
387 int error = pthread_cond_broadcast(&m_cond);
388 ASSERT_MSG(!error,
"Condition variable broadcast error %d", error);
395 int error = pthread_cond_destroy(&m_cond);
398 std::cerr <<
"Condition variable destroy error " << error
418 mutable volatile size_t semvalue;
419 mutable volatile size_t waitercount;
438 inline void post()
const {
440 if (waitercount > 0) {
446 inline void wait()
const {
449 while (semvalue == 0) {
473 int error = sem_init(&m_sem, 0,0);
482 int error = sem_init(&m_sem, 0,0);
489 inline void post()
const {
490 int error = sem_post(&m_sem);
493 inline void wait()
const {
494 int error = sem_wait(&m_sem);
498 int error = sem_destroy(&m_sem);
505 #define atomic_xadd(P, V) __sync_fetch_and_add((P), (V)) 506 #define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N)) 507 #define atomic_inc(P) __sync_add_and_fetch((P), 1) 508 #define atomic_add(P, V) __sync_add_and_fetch((P), (V)) 509 #define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V)) 511 #define cpu_relax() asm volatile("yield\n": : :"memory") 513 #define cpu_relax() asm volatile("pause\n": : :"memory") 528 __extension__
struct {
534 mutable bool writing;
535 mutable volatile rwticket l;
538 memset(const_cast<rwticket*>(&l), 0,
sizeof(rwticket));
540 inline void writelock()
const {
541 unsigned me = atomic_xadd(&l.u, (1<<16));
542 unsigned char val = (
unsigned char)(me >> 16);
544 while (val != l.s.write)
asm volatile(
"pause\n": : :
"memory");
548 inline void wrunlock()
const{
549 rwticket t = *
const_cast<rwticket*
>(&l);
554 *(
volatile unsigned short *) (&l) = t.us;
559 inline void readlock()
const {
560 unsigned me = atomic_xadd(&l.u, (1<<16));
561 unsigned char val = (
unsigned char)(me >> 16);
563 while (val != l.s.read)
asm volatile(
"pause\n": : :
"memory");
567 inline void rdunlock()
const {
568 atomic_inc(&l.s.write);
571 inline void unlock()
const {
572 if (!writing) rdunlock();
579 #define RW_WAIT_BIT 0 580 #define RW_WRITE_BIT 1 581 #define RW_READ_BIT 2 588 mutable unsigned int l;
590 spinrwlock2():l(0) {}
591 void writelock()
const {
596 if (state < RW_WRITE)
599 if (cmpxchg(&l, state, RW_WRITE) == state)
return;
606 if (!(state & RW_WAIT)) atomic_set_bit(&l, RW_WAIT_BIT);
609 while (l > RW_WAIT) cpu_relax();
613 void wrunlock()
const {
614 atomic_add(&l, -RW_WRITE);
617 void readlock()
const {
620 while (l & (RW_WAIT | RW_WRITE)) cpu_relax();
623 if (!(atomic_xadd(&l, RW_READ) & (RW_WAIT | RW_WRITE)))
return;
626 atomic_add(&l, -RW_READ);
630 void rdunlock()
const {
631 atomic_add(&l, -RW_READ);
638 #undef atomic_set_bit 656 mutable pthread_rwlock_t m_rwlock;
659 int error = pthread_rwlock_init(&m_rwlock, NULL);
663 int error = pthread_rwlock_destroy(&m_rwlock);
668 void operator=(
const rwlock& m) { }
678 int error = pthread_rwlock_init(&m_rwlock, NULL);
682 inline void readlock()
const {
683 pthread_rwlock_rdlock(&m_rwlock);
686 inline void writelock()
const {
687 pthread_rwlock_wrlock(&m_rwlock);
690 inline bool try_readlock()
const {
691 return pthread_rwlock_tryrdlock(&m_rwlock) == 0;
693 inline bool try_writelock()
const {
694 return pthread_rwlock_trywrlock(&m_rwlock) == 0;
696 inline void unlock()
const {
697 pthread_rwlock_unlock(&m_rwlock);
700 inline void rdunlock()
const {
703 inline void wrunlock()
const {
725 mutable size_t needed;
726 mutable size_t called;
728 mutable bool barrier_sense;
729 mutable bool barrier_release;
741 barrier_sense =
false;
742 barrier_release =
true;
749 void resize_unsafe(
size_t numthreads) {
767 bool listening_on = barrier_sense;
768 if (called == needed) {
772 barrier_release = barrier_sense;
773 barrier_sense = !barrier_sense;
778 while(barrier_release != listening_on && alive) conditional.
wait(mutex);
799 mutable pthread_barrier_t m_barrier;
805 pthread_barrier_init(&m_barrier, NULL, (
unsigned)numthreads); }
807 void operator=(
const barrier& m) { }
808 void resize_unsafe(
size_t numthreads) {
809 pthread_barrier_destroy(&m_barrier);
810 pthread_barrier_init(&m_barrier, NULL, (
unsigned)numthreads);
812 ~
barrier() { pthread_barrier_destroy(&m_barrier); }
814 inline void wait()
const { pthread_barrier_wait(&m_barrier); }
825 inline void prefetch_range(
void *addr,
size_t len) {
827 char *end = (
char*)(addr) + len;
829 for (cp = (
char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
831 inline void prefetch_range_write(
void *addr,
size_t len) {
833 char *end = (
char*)(addr) + len;
835 for (cp = (
char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
867 inline size_t thread_id() {
return thread_id_; }
868 inline void set_thread_id(
size_t t) { thread_id_ = t; }
869 any& operator[](
const size_t&
id);
870 bool contains(
const size_t&
id)
const;
871 size_t erase(
const size_t&
id);
872 inline void set_in_thread_flag(
bool val) { in_thread = val; }
873 inline bool is_in_thread() {
return in_thread; }
876 bool in_thread =
false;
877 std::unique_ptr<boost::unordered_map<size_t, any> > local_data;
892 static inline size_t thread_id() {
return get_tls_data().thread_id(); }
896 static inline void set_thread_id(
size_t t) { get_tls_data().set_thread_id(t); }
902 return get_tls_data()[id];
909 return get_tls_data().contains(
id);
916 static inline size_t erase(
const size_t&
id){
917 return get_tls_data().erase(
id);
925 static void join(
thread& other);
929 static void thread_destroy_callback();
930 static void set_thread_destroy_callback(
void (*callback)());
937 static size_t cpu_count();
944 boost::function<void(void)> spawn_routine;
945 invoke_args(
size_t m_thread_id,
const boost::function<
void(
void)> &spawn_routine)
946 : m_thread_id(m_thread_id), spawn_routine(spawn_routine) { };
950 static void* invoke(
void *_args);
960 m_thread_id(thread_id),
961 thread_started(false){
963 const int BYTES_PER_MB = 1048576;
964 const int DEFAULT_SIZE_IN_MB = 8;
965 m_stack_size = DEFAULT_SIZE_IN_MB * BYTES_PER_MB;
972 void launch(
const boost::function<
void (
void)> &spawn_routine);
980 void launch(
const boost::function<
void (
void)> &spawn_routine,
size_t cpu_id);
994 return thread_started;
1007 size_t m_stack_size;
1010 pthread_t m_p_thread;
1015 bool thread_started;
1037 size_t m_thread_counter;
1038 size_t threads_running;
1041 std::queue<std::pair<pthread_t, const char*> > joinqueue;
1045 static void invoke(boost::function<
void (
void)> spawn_function,
thread_group *
group);
1056 void launch(
const boost::function<
void (
void)> &spawn_function);
1062 void launch(
const boost::function<
void (
void)> &spawn_function,
size_t cpu_id);
1071 return threads_running;
1081 size_t cpuid =
size_t(-1)) {
1083 if (cpuid !=
size_t(-1)) thr.
launch(f, cpuid);
1091 char __pad__[64 -
sizeof(size_t)];
thread(size_t thread_id=0)
bool active() const
Returns true if the thread is still running.
int timedwait(const mutex &mut, const struct timespec &timeout) const
Wait till a timeout time.
static bool contains(const size_t &id)
void notify_one() const
Signals one waiting thread to wake up. Synonym for signal()
int timedwait(std::unique_lock< mutex > &mut, size_t sec) const
Like wait() but with a time limit of "sec" seconds.
int timedwait(const mutex &mut, size_t sec) const
Like wait() but with a time limit of "sec" seconds.
void unlock() const
Releases a lock on the spinlock.
sframe group(sframe sframe_in, std::string key_column)
void wait(const mutex &mut) const
void wait() const
Wait on the barrier until numthreads has called wait.
void wait(const mutex &mut, Predicate pred) const
conditional(const conditional &)
thread launch_in_new_thread(const boost::function< void(void)> &f, size_t cpuid=size_t(-1))
Runs f in a new thread. convenience function for creating a new thread quickly.
an integer value padded to 64 bytes
mutex()
constructs a mutex
static void set_thread_id(size_t t)
void lock() const
Acquires a lock on the spinlock.
int timedwait_ms(const mutex &mut, size_t ms) const
Like wait() but with a time limit of "ms" milliseconds.
size_t running_threads()
Returns the number of running threads.
int timedwait_ns(const mutex &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.
bool try_lock() const
Non-blocking attempt to acquire a lock on the spinlock.
mutex spinlock
if spinlock not supported, it is typedef it to a mutex.
void lock() const
Acquires a lock on the mutex.
padded_simple_spinlock()
constructs a spinlock
conditional condition_variable
padded_simple_spinlock(const padded_simple_spinlock &)
void unlock() const
Releases a lock on the mutex.
static size_t erase(const size_t &id)
void notify_all() const
Synonym for broadcast.
void unlock() const
Releases a lock on the spinlock.
#define ASSERT_TRUE(cond)
void wait(std::unique_lock< mutex > &mut, Predicate pred) const
void launch(const boost::function< void(void)> &spawn_routine)
static size_t thread_id()
bool try_lock() const
Non-blocking attempt to acquire a lock on the mutex.
pthread_t pthreadid()
Returns the pthread thread id.
void lock() const
Acquires a lock on the spinlock.
cancellable_barrier(size_t numthreads)
Construct a barrier which will only fall when numthreads enter.
static any & get_local(const size_t &id)
~thread_group()
Destructor. Waits for all threads to complete execution.
simple_spinlock()
constructs a spinlock
semaphore(const semaphore &)
void wait(std::unique_lock< mutex > &mut) const
simple_spinlock(const simple_spinlock &)
void signal() const
Signals one waiting thread to wake up.
void broadcast() const
Wakes up all waiting threads.
bool try_lock() const
Non-blocking attempt to acquire a lock on the spinlock.
int timedwait_ms(std::unique_lock< mutex > &mut, size_t ms) const
Like wait() but with a time limit of "ms" milliseconds.
int timedwait_ns(std::unique_lock< mutex > &mut, size_t ns) const
Like wait() but with a time limit of "ns" nanoseconds.