Turi Create  4.0
sparse_parallel_2d_array.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_2D_SPARSE_PARALLEL_ARRAY
7 #define TURI_2D_SPARSE_PARALLEL_ARRAY
8 
9 #include <core/util/bitops.hpp>
10 #include <core/util/cityhash_tc.hpp>
11 #include <core/parallel/pthread_tools.hpp>
12 #include <core/parallel/atomic.hpp>
13 #include <core/parallel/lambda_omp.hpp>
14 #include <sparsehash/dense_hash_set>
15 
16 namespace turi {
17 
18 /**
19  * \ingroup turi
20  * A sparse 2d array structure for holding items accessed by multiple
21  * threads. Besides metadata operations, this structure essentially
22  * provides two operations -- apply and apply_all. apply takes as
23  * input a row index, a column index, and a function taking a
24  * reference to an item. The item is created if it does not exist,
25  * is locked, and then the function is called. The reference is
26  * invalid as soon as the function exits. apply_all takes a function
27  * that takes as input a row_index, a column_index, and a reference
28  * to the item. apply_all calls this function on every entry of the
29  * matrix in parallel, with the added gaurantee that each row is
30  * called by the same thread.
31  */
32 template <typename T>
34 public:
35 
36  typedef T value_type;
37 
38  sparse_parallel_2d_array(size_t n_rows = 0, size_t n_cols = 0)
39  : kv_temp_container_v(thread::cpu_count())
40  {
41  resize(n_rows, n_cols);
42  }
43 
44  size_t rows() const { return n_rows; }
45  size_t cols() const { return n_cols; }
46 
47  /** Provides concurrant access to a particular element. The access
48  * must be done through the apply_f function, which should have the
49  * signature apply_f(T&). It is assumed that all changes to the
50  * element are completed when the element exits.
51  */
52  template <typename ApplyFunction>
53  GL_HOT
54  void apply(size_t i, size_t j, ApplyFunction&& apply_f) {
55  DASSERT_LT(i, rows());
56  DASSERT_LT(j, cols());
57 
58  size_t thread_idx = thread::thread_id();
59 
60  kv_container& kv = kv_temp_container_v[thread_idx];
61  kv.set_key(i, j, n_col_bits);
62 
63  size_t base_idx = get_first_level_hash(i, j, kv);
64 
65  auto& hs = hash_maps[base_idx];
66 
67  std::lock_guard<simple_spinlock> lg(hs.access_lock);
68 
69  auto ret = hs.hash_map.insert(std::move(kv));
70 
71  apply_f(ret.first->value);
72  }
73 
74  /** Provides non-locking access to a particular element. Cannot be
75  * used in parallel.
76  */
78  T& operator()(size_t i, size_t j) {
79  DASSERT_LT(i, rows());
80  DASSERT_LT(j, cols());
81 
82  kv_container& kv = kv_temp_container_v[0];
83  kv.set_key(i, j, n_col_bits);
84 
85  size_t base_idx = get_first_level_hash(i, j, kv);
86 
87  auto& hs = hash_maps[base_idx];
88  auto ret = hs.hash_map.insert(std::move(kv));
89 
90  return ret.first->value;
91  }
92 
93  /** Calls apply_f, in parallel, for every value currently in the
94  * table. The signature of the apply function is assumed to be
95  * apply_f(size_t i, size_t j, const T& t); Note this is the const
96  * overload.
97  *
98  * The storage and scheduling gaurantees that each unique value of
99  * i is called within the same thread. In other words, there are
100  * never two simultaneous calls to apply_f with the same value of
101  * i.
102  */
103  template <typename ApplyFunction>
104  void apply_all(ApplyFunction&& apply_f) const {
105 
106  atomic<size_t> current_block_idx = 0;
107 
108  in_parallel([&](size_t thread_idx, size_t num_threads)
109  GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
110 
111  while(true) {
112  size_t block_idx = (++current_block_idx) - 1;
113 
114  if(block_idx >= n_thread_blocks) {
115  break;
116  }
117 
118  size_t start_idx = n_levels_per_block * block_idx;
119  size_t end_idx = n_levels_per_block * (block_idx + 1);
120 
121  for(size_t i = start_idx; i < end_idx; ++i) {
122  const hash_block& hb = hash_maps[i];
123 
124  for(auto it = hb.hash_map.begin(); it != hb.hash_map.end(); ++it) {
125  const kv_container& kv = *it;
126  const auto idx_pair = kv.get_indices(n_col_bits);
127  const auto& value = kv.value;
128  apply_f(idx_pair.first, idx_pair.second, value);
129  }
130  }
131  }
132  });
133  }
134 
135  /** Calls apply_f, in parallel, for every value currently in the
136  * table. The signature of the apply function is assumed to be
137  * apply_f(size_t i, size_t j, T& t);
138  *
139  * The storage and scheduling gaurantees that each unique value of
140  * i is called within the same thread. In other words, there are
141  * never two simultaneous calls to apply_f with the same value of
142  * i.
143  *
144  * mutable overload.
145  */
146  template <typename ApplyFunction>
147  void apply_all(ApplyFunction&& apply_f) {
148 
149  atomic<size_t> current_block_idx = 0;
150 
151  in_parallel([&](size_t thread_idx, size_t num_threads)
152  GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
153  while(true) {
154  size_t block_idx = (++current_block_idx) - 1;
155 
156  if(block_idx >= n_thread_blocks) {
157  break;
158  }
159 
160  size_t start_idx = n_levels_per_block * block_idx;
161  size_t end_idx = n_levels_per_block * (block_idx + 1);
162 
163  for(size_t i = start_idx; i < end_idx; ++i) {
164  const hash_block& hb = hash_maps[i];
165 
166  for(auto it = hb.hash_map.begin(); it != hb.hash_map.end(); ++it) {
167  const kv_container& kv = *it;
168  const auto idx_pair = kv.get_indices(n_col_bits);
169  apply_f(idx_pair.first, idx_pair.second, kv.value);
170  }
171  }
172  }
173  });
174  }
175 
176 
177  void clear() {
178  parallel_for(size_t(0), hash_maps.size(), [&](size_t i) {
179  hash_maps[i].hash_map.clear();
180  });
181  }
182 
183  void resize(size_t _n_rows, size_t _n_cols) {
184  n_cols = _n_cols;
185  n_rows = _n_rows;
186  n_col_bits = bitwise_log2_ceil(n_cols + 1);
187  }
188 
189 ////////////////////////////////////////////////////////////////////////////////
190 
191  private:
192 
193  ////////////////////////////////////////////////////////////////////////////////
194  // The internal data structures to make this efficient.
195 
196  struct kv_container {
197  size_t key = 0;
198  mutable T value;
199 
200  // For the empty key, use the hash of
201  static kv_container as_empty() {
202  kv_container kv;
203  kv.key = index_hash(0); // Will never occur in practice, as we
204  // add 1 to the key.
205  kv.value = T();
206  return kv;
207  }
208 
209  ////////////////////////////////////////
210 
211  inline bool operator==(const kv_container& kv) const {
212  return key == kv.key;
213  }
214 
215  /** Sets the key.
216  */
217  void set_key(size_t i, size_t j, size_t n_col_bits) GL_HOT_INLINE_FLATTEN {
218  key = index_hash((i << n_col_bits) + j + 1);
219 
220 #ifndef NDEBUG
221  auto p = get_indices(n_col_bits);
222  DASSERT_EQ(p.first, i);
223  DASSERT_EQ(p.second, j);
224 #endif
225  }
226 
227  // Get the indices
228  inline std::pair<size_t, size_t> get_indices(size_t n_col_bits) const GL_HOT_INLINE_FLATTEN {
229  size_t idx = reverse_index_hash(key) - 1;
230  return std::pair<size_t, size_t>{(idx >> n_col_bits), idx & bit_mask<size_t>(static_cast<int>(n_col_bits))};
231  }
232  };
233 
234  kv_container empty_container;
235 
236  // The goal of this two-level system is to both allow us to have
237  // each row index be always called within the same thread, and to
238  // minimize collisions among writing threads.
239  static constexpr size_t n_thread_block_bits = 6;
240  static constexpr size_t n_levels_per_block_bits = 5;
241  static constexpr size_t n_thread_blocks = (size_t(1) << n_thread_block_bits);
242  static constexpr size_t n_levels_per_block = (size_t(1) << n_levels_per_block_bits);
243  static constexpr size_t n_level_bits = n_thread_block_bits + n_levels_per_block_bits;
244  static constexpr size_t n_levels = (size_t(1) << n_level_bits);
245 
247  inline size_t get_first_level_hash(size_t i, size_t j, const kv_container& kv) const {
248 
249  // The first index points to the thread block we end up using.
250  // All values within each block will be accessed by the same
251  // thread in the apply_all call. After that, it's randomized to
252  // reduce thread contention.
253  size_t first_idx = i & bit_mask<size_t>(n_thread_block_bits);
254  DASSERT_LT(first_idx, n_thread_blocks);
255 
256  size_t second_idx = kv.key >> (bitsizeof(size_t) - n_levels_per_block_bits);
257  DASSERT_LT(second_idx, n_levels_per_block);
258 
259  size_t base_idx = first_idx * n_levels_per_block + second_idx;
260  DASSERT_LT(base_idx, n_levels);
261  return base_idx;
262  }
263 
264  size_t n_rows = 0, n_cols = 0;
265  size_t n_col_bits = 0;
266 
267  /** Sets the key.
268  */
269  struct hash_block {
270  hash_block() {
271  hash_map.set_empty_key(kv_container::as_empty());
272  }
273 
274  simple_spinlock access_lock;
275 
276  struct trivial_kv_container_hash {
277  GL_HOT_INLINE_FLATTEN size_t operator()(const kv_container& k) const {
278  return k.key;
279  }
280  };
281 
282  google::dense_hash_set<kv_container, trivial_kv_container_hash> hash_map;
283  };
284 
285  // The first level table for this.
286  std::array<hash_block, n_levels> hash_maps;
287 
288  // Temporary things to avoid potential reallocations and stuff.
289  std::vector<kv_container> kv_temp_container_v;
290 };
291 
292 }
293 #endif /* TURI_2D_SPARSE_PARALLEL_ARRAY
294  */
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
static uint64_t reverse_index_hash(uint64_t idx)
GL_HOT void apply(size_t i, size_t j, ApplyFunction &&apply_f)
GL_HOT_INLINE_FLATTEN T & operator()(size_t i, size_t j)
void apply_all(ApplyFunction &&apply_f) const
static size_t cpu_count()
#define GL_HOT_INLINE_FLATTEN
static size_t thread_id()
void apply_all(ApplyFunction &&apply_f)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
static unsigned int bitwise_log2_ceil(const T &v, _ENABLE_IF_UINT(T))
Definition: bitops.hpp:356
static uint64_t index_hash(uint64_t idx)