Turi Create  4.0
hilbert_parallel_for.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_HILBERT_PARALLE_FOR_HPP
7 #define TURI_SGRAPH_HILBERT_PARALLE_FOR_HPP
8 #include <utility>
9 #include <functional>
10 #include <core/parallel/lambda_omp.hpp>
11 #include <core/storage/sgraph_data/hilbert_curve.hpp>
12 #include <core/storage/sgraph_data/sgraph_constants.hpp>
13 #include <core/util/blocking_queue.hpp>
14 
15 namespace turi {
16 
17 
18 
19 /**
20  * \ingroup sgraph_physical
21  * \addtogroup sgraph_compute SGraph Compute
22  * \{
23  */
24 
25 /**
26  * Graph Computation Functions
27  */
28 namespace sgraph_compute {
29 
30 /**
31  * This performs a parallel sweep over an n*n grid following the Hilbert
32  * curve ordering. The parallel sweep is broken into two parts. A "preamble"
33  * callback which is called sequentially, which contains a list of all the
34  * coordinates to be executed in the next pass, and a function which is
35  * executed on every coordinate in the pass.
36  *
37  * The function abstractly implements the following:
38  *
39  * \code
40  * for i = 0 to n*n step parallel_limit
41  * // collect all the coordinates to be run in this pass
42  * std::vector<pair<size_t, size_t> > coordinates
43  * for j = i to min(i + parallel_limit, n*n)
44  * coordinates.push_back(convert_hilbert_curve_to_coordinates(j))
45  * // run the preamble
46  * preamble(coordinates)
47  *
48  * parallel for over coordinate in coordinates:
49  * fn(coordinate)
50  * \endcode
51  *
52  * n must be at least 2 and a power of 2.
53  */
54 inline void hilbert_blocked_parallel_for(size_t n,
55  std::function<void(std::vector<std::pair<size_t, size_t> >) > preamble,
56  std::function<void(std::pair<size_t, size_t>)> fn,
57  size_t parallel_limit = SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS) {
58  for (size_t i = 0;i < n*n; i += parallel_limit) {
59  std::vector<std::pair<size_t, size_t> > coordinates;
60  // accumulate the list of coordinates to run
61  size_t lastcoord_this_pass = std::min(i + parallel_limit, n*n);
62  for(size_t j = i; j < lastcoord_this_pass; ++j) {
63  coordinates.push_back(hilbert_index_to_coordinate(j, n));
64  }
65  preamble(coordinates);
66  parallel_for(coordinates.begin(), coordinates.end(), fn);
67  }
68 }
69 
70 /**
71  * Non blocking version.
72  */
73 inline void hilbert_parallel_for(size_t n,
74  std::function<void(std::vector<std::pair<size_t, size_t> >) > preamble,
75  std::function<void(std::pair<size_t, size_t>)> fn) {
76 
77  blocking_queue<std::pair<size_t, size_t> > coordinates_queue;
78  std::vector<std::pair<size_t, size_t> > coordinates;
79  for (size_t i = 0;i < n*n; i ++) {
80  auto coord = hilbert_index_to_coordinate(i, n);
81  coordinates_queue.enqueue(coord);
82  coordinates.push_back(coord);
83  }
84  preamble(coordinates);
85 
86  parallel_for(0, n*n, [&](size_t i) {
87  auto coord_success_pair = coordinates_queue.dequeue();
88  ASSERT_TRUE(coord_success_pair.second);
89  fn(coord_success_pair.first);
90  });
91 }
92 
93 } // sgraph_compute
94 
95 /// \}
96 } // turicreate
97 #endif // TURI_SGRAPH_HILBERT_PARALLE_FOR_HPP
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
Implements a blocking queue useful for producer/consumer models.
void hilbert_parallel_for(size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn)
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
std::pair< T, bool > dequeue()
size_t SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS
void hilbert_blocked_parallel_for(size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn, size_t parallel_limit=SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS)
std::pair< size_t, size_t > hilbert_index_to_coordinate(size_t s, size_t n)
void enqueue(const T &elem)
Add an element to the blocking queue.