Turi Create  4.0
utilities.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SPARSE_SIMILARITY_UTILITIES_H
7 #define TURI_SPARSE_SIMILARITY_UTILITIES_H
8 
9 #include <core/storage/sframe_data/sarray.hpp>
10 #include <vector>
11 #include <core/parallel/pthread_tools.hpp>
12 #include <core/util/try_finally.hpp>
13 #include <core/logging/table_printer/table_printer.hpp>
14 #include <core/storage/sframe_data/sarray_iterators.hpp>
15 
16 namespace turi {
17 
18 /** Given a sorted sparse vector of (index, value) pairs, efficiently
19  * find and return a pair of indices (i1, i2) in the vector such that
20  * (item_index_lb <= v[i].first < item_index_ub) for all i = (i1,
21  * ..., i2 - 1), and false otherwise.
22  */
23 template <typename T>
25 std::pair<size_t, size_t>
26 find_slice_boundary_indices(const std::vector<std::pair<size_t, T> >& v,
27  size_t item_index_lb, size_t item_index_ub) {
28 
29  // The comparison function for finding the correct start and
30  // end items for the loop.
31  auto idx_cmp_f = [](const std::pair<size_t, T>& p1, const std::pair<size_t, T>& p2) {
32  return p1.first < p2.first;
33  };
34 
35  // We only work on sorted arrays, so make sure these indeed work.
36  DASSERT_TRUE(std::is_sorted(v.begin(), v.end(), idx_cmp_f));
37 
38  // Handle the edge cases.
39  if(v.empty()) {
40  return {0, 0};
41  }
42 
43  if(item_index_lb <= v.front().first && v.back().first < item_index_ub) {
44  return {0, v.size()};
45  }
46 
47  // Okay, need to actually find the indices. Do this with binary
48  // searches.
49  size_t list_idx_start = 0;
50  size_t list_idx_end = v.size();
51 
52  // If we don't start at the beginning, move up the list start to
53  // find the correct bounds.
54  if(!( item_index_lb <= v.front().first) ) {
55  auto lb_it = std::lower_bound(v.begin(), v.end(),
56  std::make_pair(item_index_lb, T()), idx_cmp_f);
57  list_idx_start = std::distance(v.begin(), lb_it);
58  }
59 
60  // Likewise, if we don't start at the end, move back the
61  // end to the true element.
62  if(!(v.back().first < item_index_ub)) {
63  auto ub_it = std::lower_bound(v.begin() + list_idx_start,
64  v.end(),
65  std::make_pair(item_index_ub, T()), idx_cmp_f);
66  list_idx_end = std::distance(v.begin(), ub_it);
67  }
68 
69  if(list_idx_start != list_idx_end) {
70  // Make sure we have indeed found the correct boundaries.
71  DASSERT_LT(v[list_idx_end-1].first, item_index_ub);
72  DASSERT_GE(v[list_idx_end-1].first, item_index_lb);
73  DASSERT_LT(v[list_idx_start].first, item_index_ub);
74  DASSERT_GE(v[list_idx_start].first, item_index_lb);
75  }
76 
77  return {list_idx_start, list_idx_end};
78 }
79 
80 
81 /** Iterates through a sparse vector sarray by column slices, with
82  * possible optimizations for other functionality. Each row in
83  * `data` is assumed to be a sorted vector of (index, value) pairs,
84  * and this function does multiple passes through the data, with
85  * each pass handling a contiguous slice of the indices in each row.
86  * These slices are determined by the slice_delimiters vector.
87  *
88  * Parameters:
89  * -------------
90  *
91  * data -- an sarray of vectors of (index, value) pairs. This vector
92  * must be sorted by index.
93  *
94  * slice_delimiters -- A vector of length (num_slices - 1) that give
95  * the boundaries of the slices of indices. For example, [0,
96  * 5, 10, 12] would have 3 slices, with (0,5), (5, 10), and
97  * (10, 12) being the slices processed.
98  *
99  * init_slice -- called at the beginning of each slice, before the
100  * data is processed. init_slice has the signature
101  *
102  * init_slice(size_t slice_idx, size_t item_idx_start, size_t item_idx_end)
103  *
104  * where slice_idx is the slice counter (0, 1, ...), and
105  * item_idx_start and item_idx_end are taken from
106  * slice_boundaries[slice_idx] and slice_boundaries[slice_idx - 1].
107  *
108  * preprocess_row -- the function called on every row. It has
109  * the signature:
110  *
111  * preprocess_row(size_t thread_idx, size_t row_idx,
112  * size_t slice_item_idx_start, size_t slice_item_idx_end,
113  * std::vector<std::pair<size_t, T> >& row)
114  *
115  * In this case, row_idx is the row currently being processed,
116  * slice_item_idx_start and slice_item_idx_end are taken from
117  * the current slice, and row is the current row. The row can
118  * be modified, but any resulting modifications must preserve
119  * the sorted nature. If the row is empty upon return, then
120  * processing all future elements is skipped.
121  *
122  * process_element -- the function called on every element. It has
123  * the signature:
124  *
125  * process_element(size_t thread_idx, size_t row_idx,
126  * size_t item_idx_start, size_t item_idx_end,
127  * size_t item_idx, const T& value)
128  *
129  *
130  * In this case, row_idx is the row currently being processed,
131  * item_idx_start and item_idx_end are taken from the slice,
132  * item_idx is the index of the value in the row, and value is
133  * the value.
134  *
135  * finalize_slice -- the function called at the end of every slice.
136  * It has the same signature as init_slice:
137  *
138  * finalize_slice(size_t slice_idx, size_t item_idx_start, size_t item_idx_end)
139  *
140  */
141 template <typename T,
142  typename SliceInitFunction,
143  typename RowProcessFunction,
144  typename ElementProcessFunction,
145  typename SliceFinalizeFunction>
147  const std::shared_ptr<sarray<std::vector<std::pair<size_t, T> > > >& data,
148  const std::vector<size_t>& slice_delimiters,
149  SliceInitFunction&& init_slice,
150  RowProcessFunction&& preprocess_row,
151  ElementProcessFunction&& process_element,
152  SliceFinalizeFunction&& finalize_slice) {
153 
154  const size_t n = data->size();
155 
156  if(n == 0)
157  return;
158 
159  volatile bool user_cancelation = false;
160 
161  auto execute_user_cancelation = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
162  user_cancelation = true;
163  log_and_throw("Cancelled by user.");
164  };
165 
166  auto check_user_cancelatation = [&]() GL_GCC_ONLY(GL_HOT_INLINE) {
167  if(user_cancelation || cppipc::must_cancel()) {
168  execute_user_cancelation();
169  }
170  };
171 
172  for(size_t slice_idx = 0; slice_idx < slice_delimiters.size() - 1; ++slice_idx) {
173  check_user_cancelatation();
174 
175  // Get the proper slice_delimiters.
176  size_t item_idx_start = slice_delimiters[slice_idx];
177  size_t item_idx_end = slice_delimiters[slice_idx + 1];
178 
179  // Initialize the current slice.
180  init_slice(slice_idx, item_idx_start, item_idx_end);
181 
182  // Check after the initialization function.
183  check_user_cancelatation();
184 
185  auto data_it = make_sarray_block_iterator(data);
186 
187  // Time to rock and roll.
188  in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT) {
189 
190  // Buffer of elements.
191  std::vector<std::vector< std::pair<size_t, T> > > item_buffer_v;
192 
193  while(true) {
194  // Check in case of cancelation
195  check_user_cancelatation();
196 
197  size_t block_row_index_start = 0;
198  if(data_it.read_next(&block_row_index_start, &item_buffer_v) ) {
199  break;
200  }
201 
202  size_t n_rows_read = item_buffer_v.size();
203 
204  for(size_t inner_idx = 0; inner_idx < n_rows_read && !user_cancelation; ++inner_idx) {
205 
206  // Check at the start here, before anything happens.
207  check_user_cancelatation();
208 
209  size_t row_idx = block_row_index_start + inner_idx;
210  auto& item_list_nonconst = item_buffer_v[inner_idx];
211 
212  // Preprocess the row.
213  preprocess_row(thread_idx, row_idx, item_idx_start, item_idx_end, item_list_nonconst);
214 
215  // If it's empty, then ignore it.
216  if(item_list_nonconst.empty()) {
217  continue;
218  }
219 
220  // Check at the end of processing a row.
221  check_user_cancelatation();
222 
223  const auto& item_list = item_list_nonconst;
224 
225  size_t list_idx_start = 0;
226  size_t list_idx_end = item_list.size();
227 
228  // Adjust the slice indices if needed.
229  std::tie(list_idx_start, list_idx_end)
231  item_list, item_idx_start, item_idx_end);
232 
233  // If this one is empty, ignore.
234  if(UNLIKELY(list_idx_start == list_idx_end))
235  continue;
236 
237  for(size_t idx_a = list_idx_start; idx_a < list_idx_end; ++idx_a) {
238 
239  DASSERT_GE(item_list[idx_a].first, item_idx_start);
240  DASSERT_LT(item_list[idx_a].first, item_idx_end);
241 
242  process_element(thread_idx, row_idx,
243  item_idx_start, item_idx_end,
244  item_list[idx_a].first, item_list[idx_a].second);
245  }
246  } // End inner loop over rows.
247  } // End outer loop over rows.
248  });
249 
250  // Check at the end of processing a slice, before the finalize
251  // slice function is called.
252  check_user_cancelatation();
253 
254  finalize_slice(slice_idx, item_idx_start, item_idx_end);
255  }
256 }
257 
258 /** Iterates through a sparse vector sarray efficiently, calling a
259  * prescribed function on each row and on each element.
260  *
261  * Parameters:
262  * -------------
263  *
264  * data -- an sarray of vectors of (index, value) pairs. This vector
265  * must be sorted by index.
266  *
267  * preprocess_row -- the function called on every row. It has
268  * the signature:
269  *
270  * preprocess_row(size_t thread_idx, size_t row_idx,
271  * std::vector<std::pair<size_t, T> >& row)
272  *
273  * In this case, row_idx is the row currently being processed
274  * and row is the current row. Any modifications to row are
275  * discarded after this.
276  *
277  */
278 template <typename T, typename RowProcessFunction>
280  const std::shared_ptr<sarray<std::vector<std::pair<size_t, T> > > >& data,
281  RowProcessFunction&& process_row) {
282 
283  // This is a convenience wrapper around the main function,
284  // iterate_through_sparse_item_array_by_slice. It does it by
285  // assuming the only slice is just (0, inf) -- i.e. one slice.
286  //
287  // Because of this, most of the functions will be empty.
288  auto empty_slice_function = [&](size_t slice_idx, size_t item_idx_start, size_t item_idx_end)
289  GL_GCC_ONLY(GL_HOT_INLINE)
290  {};
291 
292  // This is the only
293  auto _process_row = [&](size_t thread_idx, size_t row_idx,
294  size_t slice_item_idx_start, size_t slice_item_idx_end,
295  std::vector<std::pair<size_t, T> >& row)
296  GL_GCC_ONLY(GL_HOT_INLINE) {
297 
298  // Pass the appropriate parts to the process_row function
299  process_row(thread_idx, row_idx, row);
300 
301  // This disables further work on this row.
302  row.clear();
303  };
304 
305  auto empty_process_element = [&](size_t thread_idx, size_t row_idx,
306  size_t item_idx_start, size_t item_idx_end,
307  size_t item_idx, const T& value)
308  GL_GCC_ONLY(GL_HOT_INLINE)
309  {};
310 
311  // Now, pass these on to the main utility function.
313  data,
314  {0, std::numeric_limits<size_t>::max()},
315  empty_slice_function,
316  _process_row,
317  empty_process_element,
318  empty_slice_function);
319 }
320 
321 /** Transpose a sparse sarray of sorted vectors of entry pairs. Does
322  * it effeciently and close to within memory bounds.
323  *
324  * The input data is a sarray of vectors of (column_index, value)
325  * pairs. This is then transposed to an similar sarray of vectors
326  * of (row_index, value) pairs, where each row consists of the
327  * matching values of column_index in the original sarray. The
328  * resulting vectors are sorted.
329  *
330  * To do this efficiently, the number of elements for each
331  * column_index are required ahead of time. This is passed in as
332  * item_counts, which should be of size equal to the column
333  * dimension.
334  *
335  * max_memory_usage gives the maximum memory allowed for the
336  * computation. The larger max_memory_usage is, the fewer passes
337  * through the data are required.
338  */
339 template <typename T>
340 std::shared_ptr<sarray<std::vector<std::pair<size_t, T> > > > transpose_sparse_sarray(
341  std::shared_ptr<sarray<std::vector<std::pair<size_t, T> > > > data,
342  const std::vector<size_t>& item_counts,
343  size_t max_memory_usage) {
344 
345  size_t num_items = item_counts.size();
346 
347  // Figure out how many items on each pass can be stored in memory.
348  std::vector<size_t> slice_delimiters = {0};
349 
350  size_t mem_usage = 0;
351 
352  for(size_t i = 0; i < item_counts.size(); ++i) {
353  size_t this_mem_usage = sizeof(std::pair<size_t, T>) * item_counts[i];
354 
355  if(mem_usage + this_mem_usage > max_memory_usage) {
356 
357  slice_delimiters.push_back(i);
358  mem_usage = 0;
359  } else {
360  mem_usage += this_mem_usage;
361  }
362 
363  if(slice_delimiters.size() > 256) {
364  log_and_throw( ( std::string("Memory limit of ")
365  + std::to_string(max_memory_usage)
366  + " too low to efficiently transpose sparse sarray.").c_str() );
367  }
368  }
369 
370  slice_delimiters.push_back(num_items);
371 
372  // Set up the transpose.
373  const size_t n = data->size();
374  const size_t num_slices = slice_delimiters.size() - 1;
375 
376  std::vector<size_t> row_locations;
377  std::vector<atomic<size_t> > row_sizes;
378  std::vector<std::pair<size_t, T> > slice_t_data;
379 
380  // If we have multiple passes, do this so we don't do expensive
381  // reallocs later.
382  if(num_slices > 1) {
383  slice_t_data.reserve(max_memory_usage);
384  }
385 
386  logprogress_stream << "Transposing user-item lists for use in nearest neighbor search. "
387  << std::endl;
388 
389  table_printer table( { {"Elapsed Time (Data Transposition))", 0}, {"% Complete", 0} } );
390  table.print_header();
391 
392  atomic<size_t> row_count = 0;
393  size_t total_rows_to_process = n * num_slices;
394 
395  ////////////////////////////////////////////////////////////////////////////////
396  // Run through each index and add a cutoff to the
397 
398  auto init_slice = [&](size_t slice_idx, size_t item_idx_start, size_t item_idx_end) {
399  size_t num_items_in_slice = item_idx_end - item_idx_start;
400 
401  row_locations.resize(num_items_in_slice + 1);
402 
403  size_t item_cumsum = 0;
404  for(size_t i = 0; i < num_items_in_slice; ++i) {
405  row_locations[i] = item_cumsum;
406  item_cumsum += item_counts[i + item_idx_start];
407  }
408 
409  row_sizes.resize(num_items_in_slice);
410  std::fill(row_sizes.begin(), row_sizes.end(), 0);
411 
412  slice_t_data.resize(item_cumsum);
413  row_locations[num_items_in_slice] = item_cumsum;
414  };
415 
416  // nothing to be done for preprocessing a row.
417  auto empty_preprocess_row = [&](size_t thread_idx, size_t row_idx,
418  size_t slice_item_idx_start, size_t slice_item_idx_end,
419  std::vector<std::pair<size_t, T> >& row)
420  GL_GCC_ONLY(GL_HOT_INLINE) {
421  size_t cur_row_count = (++row_count) - 1;
422 
423  if(UNLIKELY(cur_row_count % 100 == 0)) {
424  double percent_complete = double((400 * cur_row_count) / total_rows_to_process) / 4;
425 
426  table.print_timed_progress_row(progress_time(), percent_complete);
427  }
428  };
429 
430 
431  ////////////////////////////////////////////////////////////////////////////////
432  // Stuff for processing each element within a slice. This means
433  // putting it in it's rightful spot in the transpose line.
434 
435  auto process_element = [&](size_t thread_idx, size_t row_idx,
436  size_t item_idx_start, size_t item_idx_end,
437  size_t item_idx, const T& value) {
438 
439  size_t internal_idx = item_idx - item_idx_start;
440  size_t item_count_idx = (++ (row_sizes[internal_idx]) ) - 1;
441 
442  DASSERT_LT(item_count_idx, item_counts[item_idx]);
443 
444  slice_t_data[row_locations[internal_idx] + item_count_idx] = {row_idx, value};
445  };
446 
447  ////////////////////////////////////////////////////////////////////////////////
448  // Stuff for writing out the slice.
449  auto out_data = std::make_shared<sarray<std::vector<std::pair<size_t, T> > > >();
450  out_data->open_for_write(1);
451  auto it_out = out_data->get_output_iterator(0);
452 
453  auto finalize_slice = [&](size_t slice_idx, size_t item_idx_start, size_t item_idx_end) {
454  size_t num_items_in_slice = item_idx_end - item_idx_start;
455 
456  DASSERT_EQ(row_locations.size(), num_items_in_slice + 1);
457 
458  atomic<size_t> process_idx = 0;
459 
460  // To do the writing, we allow one of the threads to write finished rows
461  size_t write_idx = 0;
462  dense_bitset idx_is_finished(num_items_in_slice);
463 
464  std::vector<std::pair<size_t, T> > row_out;
465  auto flush_next_row = [&]() GL_GCC_ONLY(GL_HOT_INLINE) {
466  DASSERT_LT(write_idx, num_items_in_slice);
467  DASSERT_TRUE(idx_is_finished.get(write_idx));
468 
469  row_out.assign(slice_t_data.begin() + row_locations[write_idx],
470  slice_t_data.begin() + row_locations[write_idx + 1]);
471 
472  *it_out = row_out;
473  ++it_out, ++write_idx;
474  };
475 
476  // First, go through and sort all of the individual slices.
477  in_parallel([&](size_t thread_idx, size_t num_threads) {
478 
479  while(true) {
480 
481  if(thread_idx == 0) {
482  // As long as thread 0 has stuff to write, then write it
483  // out while the rest of tqhe threads handle the sorting.
484  // This gives us a head start on writing the rest out as
485  // below
486  while(write_idx < num_items_in_slice && idx_is_finished.get(write_idx)) {
487  flush_next_row();
488  }
489  }
490 
491  size_t idx = (++process_idx) - 1;
492 
493  if(idx >= num_items_in_slice) {
494  break;
495  }
496 
497  std::sort(slice_t_data.begin() + row_locations[idx],
498  slice_t_data.begin() + row_locations[idx + 1],
499  [](const std::pair<size_t, T>& p1,
500  const std::pair<size_t, T>& p2) {
501  return p1.first < p2.first;
502  });
503 
504  idx_is_finished.set_bit(idx);
505  }
506  });
507 
508  // Now, flush the remaining rows that may have been missed at the
509  // end of the parallel portion.
510  while(write_idx < num_items_in_slice) {
511  flush_next_row();
512  }
513 
514  }; // End finalize the slice.
515 
516  // Now actually run all of the above.
518  data,
519  slice_delimiters,
520  init_slice,
521  empty_preprocess_row,
522  process_element,
523  finalize_slice);
524 
525  out_data->close();
526 
527  table.print_row(progress_time(), 100);
528  table.print_footer();
529 
530  return out_data;
531 }
532 
533 }
534 
535 #endif /* UTILITIES_H */
GL_HOT_INLINE std::pair< size_t, size_t > find_slice_boundary_indices(const std::vector< std::pair< size_t, T > > &v, size_t item_index_lb, size_t item_index_ub)
Definition: utilities.hpp:26
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > transpose_sparse_sarray(std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > data, const std::vector< size_t > &item_counts, size_t max_memory_usage)
Definition: utilities.hpp:340
void iterate_through_sparse_item_array_by_slice(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, const std::vector< size_t > &slice_delimiters, SliceInitFunction &&init_slice, RowProcessFunction &&preprocess_row, ElementProcessFunction &&process_element, SliceFinalizeFunction &&finalize_slice)
Definition: utilities.hpp:146
bool get(size_t b) const
Returns the value of the bit b.
#define logprogress_stream
Definition: logger.hpp:325
#define GL_HOT_INLINE
void iterate_through_sparse_item_array(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, RowProcessFunction &&process_row)
Definition: utilities.hpp:279
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
void print_header() const
bool set_bit(size_t b)
Atomically sets the bit at position b to true returning the old value.
sarray_block_iterator< T > make_sarray_block_iterator(const std::shared_ptr< sarray< T > > &data)
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364