Turi Create  4.0
lambda_omp.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_PARALLEL_LAMBDA_OMP_HPP
7 #define TURI_PARALLEL_LAMBDA_OMP_HPP
8 #include <iterator>
9 #include <utility>
10 #include <functional>
11 #include <type_traits>
12 
13 #include <core/util/basic_types.hpp>
14 #include <core/parallel/thread_pool.hpp>
15 
16 namespace turi {
17 
18 /**
19  * Runs a provided function in parallel, passing the function the thread ID
20  * and the number of threads. The thread ID is always between 0 and
21  * #threads - 1.
22  *
23  * \ingroup threading
24  *
25  * \code
26  * in_parallel([&](size_t thrid, size_t num_threads) {
27  * std::cout << "Thread " << thrid << " out of "
28  * << num_threads << "\n";
29  * });
30  * \endcode
31  *
32  * \param fn The function to run. The function must take two size_t arguments:
33  * the thread ID and the number of threads.
34  */
35 inline void in_parallel(const std::function<void (size_t thread_id,
36  size_t num_threads)>& fn) {
37  size_t nworkers = thread_pool::get_instance().size();
38 
39  if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
40 
41  fn(0, 1);
42  return;
43 
44  } else {
45 
47 
48  for (unsigned int i = 0;i < nworkers; ++i) {
49  threads.launch([&fn, i, nworkers]() { fn(i, nworkers); }, i);
50  }
51  threads.join();
52  }
53 }
54 
55 /**
56  * Returns the thread pool dedicated for running parallel for jobs.
57  * \ingroup threading
58  */
60 
61 /**
62  * Runs a parallel for ranging from the integers 'begin' to 'end'.
63  * \ingroup threading
64  *
65  * When run single threaded, is equivalent to
66  * \code
67  * for(size_t i = begin; i < end; ++i) {
68  * fn(i);
69  * }
70  * \endcode
71  *
72  * Example:
73  * \code
74  * // performs an element wise multiplication of 'a' and 'b'
75  * std::vector<double> vec_multiple(const std::vector<double>& a,
76  * const std::vector<double>& b) {
77  * std::vector<double> ret(a.size(), 0); // allocate the return object
78  *
79  * parallel_for(0, a.size(), [&](size_t i) {
80  * ret[i] = a[i] * b[i];
81  * });
82  *
83  * return ret;
84  * }
85  * \endcode
86  *
87  * \param begin The beginning integer of the for loop
88  * \param end The ending integer of the for loop
89  * \param fn The function to run. The function must take a single size_t
90  * argument which is a current index.
91  */
92 template <typename FunctionType>
93 void parallel_for(size_t begin,
94  size_t end,
95  const FunctionType& fn) {
96 
97  size_t nworkers = thread_pool::get_instance().size();
98 
99  if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
100  // we do not support recursive calls to in parallel yet.
101  for(size_t i = begin; i < end; ++i) {
102  fn(i);
103  }
104  } else {
106  size_t nlen = end - begin; // total range
107  double split_size = (double)nlen / nworkers; // size of range each worker gets
108  for (size_t i = 0; i < nworkers; ++i) {
109  size_t worker_begin = begin + split_size * i; // beginning of this worker's range
110  size_t worker_end = begin + split_size * (i + 1); // end of this worker's range
111  if (i == nworkers - 1) worker_end = end;
112  threads.launch([&fn, worker_begin, worker_end]() {
113  size_t worker_iter = worker_begin;
114  while (worker_iter < worker_end) {
115  fn(worker_iter);
116  ++worker_iter;
117  }
118  }, i);
119  }
120  threads.join();
121  }
122 }
123 
124 
125 /**
126  * Runs a map reduce operation for ranging from the integers 'begin' to 'end'.
127  * \ingroup threading
128  *
129  * When run single threaded, is equivalent to
130  * \code
131  * T acc;
132  * for(size_t i = begin; i < end; ++i) {
133  * acc = n(i, acc);
134  * }
135  * return acc;
136  * \endcode
137  *
138  * Example:
139  * \code
140  * // performs an inner product of 'a' and 'b'
141  * double vec_proc(const std::vector<double>& a,
142  * const std::vector<double>& b) {
143  *
144  * double result = fold_reduce(0, a.size(), [&](size_t i, double& acc) {
145  * acc += a[i] * b[i];
146  * return acc;
147  * }, 0.0);
148  * return result;
149  * }
150  * \endcode
151  *
152  * \param begin The beginning integer of the for loop
153  * \param end The ending integer of the for loop
154  * \param fn The function to run. The function must take a single size_t
155  * argument which is a current index.
156  */
157 template <typename FunctionType, typename ReduceType>
158 ReduceType fold_reduce (size_t begin,
159  size_t end,
160  const FunctionType& fn,
161  ReduceType base = ReduceType()) {
162  size_t nworkers = thread_pool::get_instance().size();
163 
164  if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
165  // we do not support recursive calls to in parallel yet.
166  ReduceType acc = base;
167  for(size_t i = begin; i < end; ++i) {
168  fn(i, acc);
169  }
170  return acc;
171  } else {
173 
174  size_t nlen = end - begin; // total rangeS
175  double split_size = (double)nlen / nworkers; // size of range each worker gets
176 
177  std::vector<ReduceType> acc(nworkers, base);
178  for (size_t i = 0;i < nworkers; ++i) {
179  size_t worker_begin = begin + split_size * i; // beginning of this worker's range
180  size_t worker_end = begin + split_size * (i + 1); // end of this worker's range
181  if (i == nworkers - 1) worker_end = end;
182  threads.launch([&fn, &acc, worker_begin, worker_end, i]() {
183  size_t worker_iter = worker_begin;
184  while (worker_iter < worker_end) {
185  fn(worker_iter, acc[i]);
186  ++worker_iter;
187  }
188  }, i);
189  }
190  threads.join();
191  ReduceType ret = base;
192  for (size_t i = 0; i < acc.size(); ++i) {
193  ret += acc[i];
194  }
195  return ret;
196  }
197 }
198 
199 /**
200  * Runs a parallel for over a random access iterator range.
201  * \ingroup threading
202  *
203  * When run single threaded, is equivalent to
204  * \code
205  * RandomAccessIterator iter = begin;
206  * while (iter != end) {
207  * fn(*iiter);
208  * ++end;
209  * }
210  * \endcode
211  *
212  * Example:
213  * \code
214  * // squares each element of the vector in place
215  * void square(std::vector<double>& v) {
216  * parallel_for(v.begin(), v.end(), [&](double& d) {
217  * d = d * d;
218  * });
219  *
220  * }
221  * \endcode
222  *
223  * \param begin The beginning integer of the for loop
224  * \param end The ending integer of the for loop
225  * \param fn The function to run. The function must take a single size_t
226  * argument which is a current index.
227  */
228 template <typename RandomAccessIterator, typename FunctionType>
229 inline void parallel_for(RandomAccessIterator iter_begin,
230  RandomAccessIterator iter_end,
231  const FunctionType& fn,
232  std::random_access_iterator_tag = typename std::iterator_traits<RandomAccessIterator>::iterator_category()) {
233 
234  size_t nworkers = thread_pool::get_instance().size();
235 
236  if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
237  RandomAccessIterator iter = iter_begin;
238  while (iter != iter_end) {
239  fn(*iter);
240  ++iter;
241  }
242  } else {
244 
245  size_t nlen = std::distance(iter_begin, iter_end); // number of elements
246 
247  double split_size = (double)nlen / nworkers; // number of elements per worker
248 
249  for (size_t i = 0;i < nworkers; ++i) {
250  size_t worker_begin = split_size * i; // index this worker starts at
251  size_t worker_end = split_size * (i + 1); // index this worker ends at
252  if (i == nworkers - 1) worker_end = nlen;
253  threads.launch(
254  [&fn, worker_begin, worker_end, &iter_begin]() {
255  RandomAccessIterator my_begin = iter_begin + worker_begin;
256  RandomAccessIterator my_end = iter_begin + worker_end;
257  while (my_begin != my_end) {
258  fn(*my_begin);
259  ++my_begin;
260  }
261  } );
262  }
263  threads.join();
264  }
265 }
266 
267 
268 };
269 
270 #endif
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
void launch(const boost::function< void(void)> &spawn_function, size_t virtual_threadid=size_t(-1))
static thread_pool & get_instance()
ReduceType fold_reduce(size_t begin, size_t end, const FunctionType &fn, ReduceType base=ReduceType())
Definition: lambda_omp.hpp:158
size_t size() const
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
static tls_data & get_tls_data()
Static helper routines.
thread_pool & get_parfor_thread_pool()