6 #ifndef TURI_PARALLEL_LAMBDA_OMP_HPP 7 #define TURI_PARALLEL_LAMBDA_OMP_HPP 11 #include <type_traits> 13 #include <core/util/basic_types.hpp> 14 #include <core/parallel/thread_pool.hpp> 35 inline void in_parallel(
const std::function<
void (
size_t thread_id,
36 size_t num_threads)>& fn) {
48 for (
unsigned int i = 0;i < nworkers; ++i) {
49 threads.
launch([&fn, i, nworkers]() { fn(i, nworkers); }, i);
92 template <
typename FunctionType>
95 const FunctionType& fn) {
101 for(
size_t i = begin; i < end; ++i) {
106 size_t nlen = end - begin;
107 double split_size = (double)nlen / nworkers;
108 for (
size_t i = 0; i < nworkers; ++i) {
109 size_t worker_begin = begin + split_size * i;
110 size_t worker_end = begin + split_size * (i + 1);
111 if (i == nworkers - 1) worker_end = end;
112 threads.
launch([&fn, worker_begin, worker_end]() {
113 size_t worker_iter = worker_begin;
114 while (worker_iter < worker_end) {
157 template <
typename FunctionType,
typename ReduceType>
160 const FunctionType& fn,
161 ReduceType base = ReduceType()) {
166 ReduceType acc = base;
167 for(
size_t i = begin; i < end; ++i) {
174 size_t nlen = end - begin;
175 double split_size = (double)nlen / nworkers;
177 std::vector<ReduceType> acc(nworkers, base);
178 for (
size_t i = 0;i < nworkers; ++i) {
179 size_t worker_begin = begin + split_size * i;
180 size_t worker_end = begin + split_size * (i + 1);
181 if (i == nworkers - 1) worker_end = end;
182 threads.
launch([&fn, &acc, worker_begin, worker_end, i]() {
183 size_t worker_iter = worker_begin;
184 while (worker_iter < worker_end) {
185 fn(worker_iter, acc[i]);
191 ReduceType ret = base;
192 for (
size_t i = 0; i < acc.size(); ++i) {
228 template <
typename RandomAccessIterator,
typename FunctionType>
230 RandomAccessIterator iter_end,
231 const FunctionType& fn,
232 std::random_access_iterator_tag =
typename std::iterator_traits<RandomAccessIterator>::iterator_category()) {
237 RandomAccessIterator iter = iter_begin;
238 while (iter != iter_end) {
245 size_t nlen = std::distance(iter_begin, iter_end);
247 double split_size = (double)nlen / nworkers;
249 for (
size_t i = 0;i < nworkers; ++i) {
250 size_t worker_begin = split_size * i;
251 size_t worker_end = split_size * (i + 1);
252 if (i == nworkers - 1) worker_end = nlen;
254 [&fn, worker_begin, worker_end, &iter_begin]() {
255 RandomAccessIterator my_begin = iter_begin + worker_begin;
256 RandomAccessIterator my_end = iter_begin + worker_end;
257 while (my_begin != my_end) {
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
void launch(const boost::function< void(void)> &spawn_function, size_t virtual_threadid=size_t(-1))
static thread_pool & get_instance()
ReduceType fold_reduce(size_t begin, size_t end, const FunctionType &fn, ReduceType base=ReduceType())
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
static tls_data & get_tls_data()
Static helper routines.
thread_pool & get_parfor_thread_pool()