Turi Create  4.0
thread_pool.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_THREAD_POOL_HPP
7 #define TURI_THREAD_POOL_HPP
8 
9 #include <boost/bind.hpp>
10 #include <core/parallel/pthread_tools.hpp>
11 #include <core/util/blocking_queue.hpp>
12 
13 namespace turi {
14 
15 class thread_pool;
16 
17 /**
18  * This class adds a task queueing structure on top of a thread_pool, while
19  * providing the same interface as the thread_pool.
20  * Multiple parallel_task_queue objects can be associated with the same
21  * thread_pool and each parallel_task_queue instance has its own indepedendent
22  * task joining capability.
23  *
24  * The parallel_task_queue also provides exception forwarding.
25  * exception throws within a thread of type const char* will be caught
26  * and forwarded to the join() function.
27  * If the call to join() is wrapped by a try-catch block, the exception
28  * will be caught safely and thread cleanup will be completed properly.
29  *
30  * Usage:
31  * \code
32  * parallel_task_queue queue(thread_pool::get_instance());
33  *
34  * queue.launch(...)
35  * ...
36  * queue.join()
37  * \endcode
38  */
40  public:
41  /**
42  * Create a parallel_task_queue which is associated with a particular pool.
43  * All tasks used by the parallel task_queue will use the pool for its pool
44  * of threads.
45  */
47 
48  /**
49  * Launch a single task into the thread pool which calls spawn_function.
50  *
51  * If virtual_threadid is set, the target thread will appear to have
52  * thread ID equal to the requested thread ID.
53  */
54  void launch(const boost::function<void (void)> &spawn_function,
55  size_t virtual_threadid = size_t(-1));
56 
57 
58  /** Waits for all tasks to complete. const char* exceptions
59  thrown by threads are forwarded to the join() function.
60  Once this function returns normally, the queue is empty.
61 
62  Note that this function may not return if producers continually insert
63  tasks through launch.
64  */
65  void join();
66 
67  /**
68  * Destructor. Waits for all tasks to complete.
69  */
71 
72  private:
73  thread_pool& pool;
74  // protects the exception queue, and the task counters
75  mutex mut;
76  conditional event_condition; // to wake up the joining thread
77  std::queue<std::exception_ptr> exception_queue;
78  size_t tasks_inserted = 0;
79  size_t tasks_completed = 0;
80  bool waiting_on_join = false; // true if a thread is waiting in join
81 };
82 
83 
84  /**
85  * \ingroup util
86  * Manages a pool of threads.
87  *
88  * The interface is nearly identical to the \ref thread_group.
89  * The key difference is internal behavior. The thread pool preallocates a
90  * collection of threads which it keeps asleep. When tasks are issued
91  * through the "launch" function, threads are woken up to perform the
92  * tasks.
93  *
94  * The thread_pool object does not perform exception forwarding, use
95  * parallel_task_queue for that.
96  *
97  * If multiple threads are running in the thread-group, the master should
98  * test if running_threads() is > 0, and retry the join().
99  *
100  */
101  class thread_pool {
102  private:
103  thread_group threads;
105  size_t pool_size;
106 
107  mutex mut;
108  conditional event_condition;
109  size_t tasks_inserted = 0;
110  size_t tasks_completed = 0;
111  bool waiting_on_join = false;
112 
113  bool cpu_affinity;
114  // not implemented
115  thread_pool& operator=(const thread_pool &thrgrp);
116  thread_pool(const thread_pool&);
117 
118  /**
119  Called by each thread. Loops around a queue of tasks.
120  */
121  void wait_for_task();
122 
123  /**
124  Creates all the threads in the thread pool.
125  Resets the task and exception queue
126  */
127  void spawn_thread_group();
128 
129  /**
130  Destroys the thread pool.
131  Also destroys the task queue
132  */
133  void destroy_all_threads();
134  public:
135 
136  /** Initializes a thread pool with nthreads.
137  * If affinity is set, the nthreads will by default stripe across
138  * the available cores on the system.
139  */
140  thread_pool(size_t nthreads = 2, bool affinity = false);
141 
142  /**
143  * Set the number of threads in the queue
144  */
145  void resize(size_t nthreads);
146 
147  /**
148  * Get the number of threads
149  */
150  size_t size() const;
151 
152 
153  /**
154  * Queues a single task into the thread pool which calls spawn_function.
155  *
156  * If virtual_threadid is set, the target thread will appear to have
157  * thread ID equal to the requested thread ID.
158  */
159  void launch(const boost::function<void (void)> &spawn_function,
160  size_t virtual_threadid = size_t(-1));
161 
162  void join();
163 
164  /**
165  * Changes the CPU affinity. Note that pthread does not provide
166  * a way to change CPU affinity on a currently started thread.
167  * This function therefore waits for all threads in the pool
168  * to finish their current task, and destroy all the threads. Then
169  * new threads are created with the new affinity setting.
170  */
171  void set_cpu_affinity(bool affinity);
172 
173  /**
174  Gets the CPU affinity.
175  */
176  bool get_cpu_affinity() { return cpu_affinity; };
177 
178 
179  /**
180  * Returns a singleton instance of the thread pool
181  */
182  static thread_pool& get_instance();
183 
184  /**
185  * Frees the singleton instance of the thread pool
186  */
187  static void release_instance();
188 
189  //! Destructor. Cleans up all threads
190  ~thread_pool();
191  };
192 
193 }
194 #endif
void launch(const boost::function< void(void)> &spawn_function, size_t virtual_threadid=size_t(-1))
Implements a blocking queue useful for producer/consumer models.
parallel_task_queue(thread_pool &pool)