Turi Create  4.0
sparse_similarity_lookup_impl.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_ITEM_SIMILARITY_LOOKUP_CONSTRUCTION_H_
7 #define TURI_UNITY_ITEM_SIMILARITY_LOOKUP_CONSTRUCTION_H_
8 
9 #include <toolkits/sparse_similarity/item_processing.hpp>
10 #include <toolkits/sparse_similarity/index_mapper.hpp>
11 #include <toolkits/sparse_similarity/sparse_similarity_lookup.hpp>
12 #include <toolkits/sparse_similarity/similarities.hpp>
13 #include <toolkits/sparse_similarity/utilities.hpp>
14 #include <toolkits/sparse_similarity/neighbor_search.hpp>
15 #include <toolkits/sparse_similarity/sliced_itemitem_matrix.hpp>
16 #include <core/logging/table_printer/table_printer.hpp>
17 #include <core/generics/sparse_parallel_2d_array.hpp>
18 #include <core/storage/sframe_data/sframe.hpp>
19 #include <core/storage/sframe_data/sframe_iterators.hpp>
20 #include <core/util/logit_math.hpp>
21 #include <core/util/cityhash_tc.hpp>
22 #include <core/util/dense_bitset.hpp>
23 #include <core/util/sys_util.hpp>
24 #include <core/parallel/pthread_tools.hpp>
25 
26 namespace turi { namespace sparse_sim {
27 
28 ////////////////////////////////////////////////////////////////////////////////
29 
30 /** The main class for training and actually implementing the sparse
31  * similarity lookup stuff.
32  */
33 template <typename SimilarityType>
35  private:
36 
37  ////////////////////////////////////////////////////////////////////////////////
38  // Construction routines.
39 
40  public:
41  sparse_similarity_lookup_impl(SimilarityType&& _similarity,
42  const std::map<std::string, flexible_type>& _options)
43  : similarity(_similarity)
44  , max_item_neighborhood_size(_options.at("max_item_neighborhood_size"))
45  , item_prediction_buffers_by_thread(thread::cpu_count())
46  {
47  this->options = _options;
48  }
49 
50  std::string similarity_name() const {
51  return SimilarityType::name();
52  }
53 
54  private:
55 
56  ////////////////////////////////////////////////////////////////////////////////
57  // Type definitions based on the similarity type.
58 
59  typedef typename SimilarityType::item_data_type item_data_type;
60  typedef typename SimilarityType::interaction_data_type interaction_data_type;
61  typedef typename SimilarityType::final_item_data_type final_item_data_type;
62  typedef typename SimilarityType::final_interaction_data_type final_interaction_data_type;
63  typedef typename SimilarityType::prediction_accumulation_type prediction_accumulation_type;
64  typedef std::vector<item_processing_info<SimilarityType> > item_info_vector;
65 
66  // If the final_item_data_type is unused, then we can ignore it.
67  static constexpr bool use_final_item_data() {
68  return sparse_sim::use_final_item_data<SimilarityType>();
69  }
70 
71  ////////////////////////////////////////////////////////////////////////////////
72  // Global containers needed for the various processing routines.
73 
74  // The main similarity type used.
75  SimilarityType similarity;
76  size_t total_num_items = 0;
77 
78  ////////////////////////////////////////////////////////////////////////////////
79  // The lookup table data. This information is used in the
80  // prediction and scoring routines.
81 
82  typedef std::pair<size_t, final_interaction_data_type> interaction_info_type;
83 
84  std::vector<size_t> item_neighbor_boundaries;
85  std::vector<interaction_info_type> item_interaction_data;
86 
87  size_t max_item_neighborhood_size = 0;
88 
89  // This is pulled in from the final_item_data. Some similarity
90  // types use this for processing.
91  std::vector<final_item_data_type> final_item_data;
92 
93  ////////////////////////////////////////////////////////////////////////////////
94  // Stuff for proper progress printing and tracking.
95 
96  /** Tracks the progress of the entire process.
97  *
98  */
99  struct _progress_tracker {
100 
101  _progress_tracker(size_t _num_items)
102  : num_items(_num_items)
103  , table({{"Elapsed Time (Constructing Lookups)", 0},
104  {"Total % Complete", 0},
105  {"Items Processed", 0}})
106  {
107  }
108 
109  void print_header() { table.print_header(); }
110  void print_break() { table.print_line_break(); }
111  void print_footer() {
112  item_pair_count = num_items * num_items;
113  double percent_complete = 100.0;
114 
115  table.print_row(
116  progress_time(),
117  percent_complete,
118  num_items);
119 
120  table.print_footer();
121  }
122 
124  void increment_item_counter(size_t counter = 1) {
125  item_pair_count += counter;
126 
127  if(UNLIKELY(table.time_for_next_row() && !in_print_next_row)) {
128  // Because this function can be called a lot, we just set a
129  // flag to tell other threads to hold off. It will still be
130  // ordered correctly -- if we miss a few entries, that's fine.
131  // The situation we want to prevent is every thread suddenly
132  // calling _print_next_row because time_for_next_row() is
133  // true; While _print_next_row is fine with this, it this
134  // would slow things down in an inner loop. Thus we use the
135  // atomic in_print_next_row flag to deter this situation.
136  in_print_next_row = true;
137  _print_next_row();
138  in_print_next_row = false;
139  }
140  }
141 
142  private:
143 
144  GL_HOT_NOINLINE
145  void _print_next_row() {
146 
147  // This lock prevents multiple threads from accessing this at
148  // the same time, which means things will always be in order.
149  std::unique_lock<simple_spinlock> lg(_print_next_row_counter_lock, std::defer_lock);
150 
151  bool acquired_lock = lg.try_lock();
152 
153  if(!acquired_lock) {
154  return;
155  }
156 
157  size_t _item_pair_count = item_pair_count;
158  size_t items_processed = _item_pair_count / num_items;
159 
160  double n_total_items = double(num_items) * num_items;
161  double prop_complete = std::min(1.0, double(_item_pair_count) / n_total_items);
162 
163  // Approximate to the nearest 0.25%
164  double percent_complete = std::floor(4 * 100.0 * prop_complete) / 4;
165 
166  table.print_timed_progress_row(
167  progress_time(),
168  percent_complete,
169  items_processed);
170  }
171 
172  size_t num_items = 0;
173  atomic<size_t> item_pair_count;
174  atomic<int> in_print_next_row;
175  simple_spinlock _print_next_row_counter_lock;
176  table_printer table;
177  };
178 
179  ////////////////////////////////////////////////////////////////////////////////
180  // Management functions for building the lookup tables.
181 
182  // Stuff needed for building the intermediate structures.
183  std::vector<uint32_t> item_neighbor_counts;
184  std::vector<simple_spinlock> item_interaction_locks;
185 
186  /** Initialize the item lookup tables. Call before the item
187  * lookups are used.
188  */
189  void init_item_lookups(size_t num_items, const std::vector<final_item_data_type>& _final_item_data) {
190 
191  // Assign these.
192  total_num_items = num_items;
193  item_neighbor_counts.assign(total_num_items, 0);
194  item_interaction_locks.resize(total_num_items);
195 
196  ALLOCATION_RETRY:
197  try {
198  item_interaction_data.reserve(total_num_items * max_item_neighborhood_size);
199 
200  } catch(const std::bad_alloc& e) {
201  // Attempt to handle the allocations in this realm properly.
202  // If it drops to ridiculously low item similarity numbers,
203  // then we've got a problem, but usually when this happens the
204  // user has set max_item_neighborhood_size to way too large of
205  // a number, like the total number of items.
206  if(max_item_neighborhood_size >= 16) {
207 
208  size_t new_max_item_neighborhood_size = std::min<size_t>(64, max_item_neighborhood_size / 2);
209 
211  << "Error allocating proper lookup tables with max_item_neighborhood_size = "
212  << max_item_neighborhood_size << "; reattempting with max_item_neighborhood_size = "
213  << new_max_item_neighborhood_size << "." << std::endl;
214 
215  max_item_neighborhood_size = new_max_item_neighborhood_size;
216  options.at("max_item_neighborhood_size") = max_item_neighborhood_size;
217  goto ALLOCATION_RETRY;
218 
219  } else {
220  std::ostringstream ss;
221  ss << "Out-of-Memory error allocating proper lookup tables with max_item_neighborhood_size = "
222  << max_item_neighborhood_size << ". This currently requires a lookup table of "
223  << (max_item_neighborhood_size * total_num_items * 16)
224  << " bytes. Please attempt with fewer items or use a machine with more memory."
225  << std::endl;
226  log_and_throw(ss.str().c_str());
227  }
228  }
229 
230  item_interaction_data.assign(total_num_items * max_item_neighborhood_size,
231  {0, final_interaction_data_type()});
232 
233  // Copy over the item vertex data.
234  if(use_final_item_data()) {
235  ASSERT_EQ(_final_item_data.size(), num_items);
236  final_item_data = _final_item_data;
237  }
238  }
239 
240  ////////////////////////////////////////////////////////////////////////////////
241 
242  /** Insert a new item into the lookups. Is completely threadsafe.
243  */
245  void insert_into_lookup(
246  size_t item_a, size_t item_b,
247  const final_interaction_data_type& value) {
248 
249  std::pair<size_t, final_interaction_data_type> p = {item_b, value};
250  auto& count = item_neighbor_counts[item_a];
251 
252  final_item_data_type _unused;
253 
254  auto item_comparitor = [&](const interaction_info_type& p1, const interaction_info_type& p2)
255  GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
256 
257  DASSERT_LT(item_a, total_num_items);
258  DASSERT_LT(p1.first, total_num_items);
259  DASSERT_LT(p2.first, total_num_items);
260 
261  return similarity.compare_interaction_values(
262  p1.second,
263  p2.second,
264  use_final_item_data() ? final_item_data[item_a] : _unused,
265  use_final_item_data() ? final_item_data[p1.first] : _unused,
266  use_final_item_data() ? final_item_data[p2.first] : _unused);
267  };
268 
269  // Before locking the item, make sure that the new item is likely
270  // to actually go on the heap.
271  if(count == max_item_neighborhood_size
272  && !(item_comparitor(p, item_interaction_data[item_a * max_item_neighborhood_size + count - 1]))) {
273  return;
274  } else {
275 
276  // Put this in a separate function so the above is much more
277  // easy to inline. Below involes a lot of math.
278  auto insert_on_heap = [&]() GL_GCC_ONLY(GL_HOT_NOINLINE) {
279 
280  // Now, lock the item.
281  std::lock_guard<simple_spinlock> lg(item_interaction_locks[item_a]);
282 
283  // Make sure this item is not already in there.
284 #ifdef NDEBUG
285  {
286  for(size_t i = 0; i < count; ++i) {
287  DASSERT_NE(item_interaction_data[item_a*max_item_neighborhood_size + i].first, p.first);
288  }
289  }
290 #endif
291 
292  if(count < max_item_neighborhood_size) {
293  item_interaction_data[item_a*max_item_neighborhood_size + count] = p;
294  ++count;
295 
296  if(count == max_item_neighborhood_size) {
297  std::make_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
298  item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
299  item_comparitor);
300  }
301  } else {
302  if(LIKELY(item_comparitor(p, item_interaction_data[item_a * max_item_neighborhood_size + count - 1]))) {
303 
304  item_interaction_data[item_a * max_item_neighborhood_size + count - 1] = p;
305  std::push_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
306  item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
307  item_comparitor);
308  std::pop_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
309  item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
310  item_comparitor);
311 
312  // Check that the popped item is not better than the new one.
313  DASSERT_FALSE(item_comparitor(item_interaction_data[item_a * max_item_neighborhood_size + count - 1], p));
314  }
315  }
316  };
317 
318  insert_on_heap();
319  }
320  }
321 
322  /** Finalize the lookup tables. After calling this, things are
323  * ready to be used.
324  */
325  GL_HOT_NOINLINE_FLATTEN
326  void finalize_lookups() {
327  double threshold = options.at("threshold");
328 
329  // First, go through and remove all the empty space.
330  size_t current_position = 0;
331  item_neighbor_boundaries.resize(total_num_items + 1);
332  for(size_t i = 0; i < total_num_items; ++i) {
333  item_neighbor_boundaries[i] = current_position;
334 
335  // Apply threshholding.
336  size_t write_pos = 0;
337  for(size_t j = 0; j < item_neighbor_counts[i]; ++j) {
338  if(item_interaction_data[i * max_item_neighborhood_size + j].second > threshold) {
339  item_interaction_data[current_position + write_pos]
340  = item_interaction_data[i * max_item_neighborhood_size + j];
341  ++write_pos;
342  }
343  }
344 
345  current_position += write_pos;
346  }
347 
348  item_neighbor_boundaries[total_num_items] = current_position;
349  item_interaction_data.resize(current_position);
350  item_neighbor_counts.clear();
351  item_neighbor_counts.shrink_to_fit();
352 
353  // Now, sort each max_item_neighborhood_size spot.
354  atomic<size_t> current_idx = 0;
355 
356  in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
357  while(true) {
358  size_t idx = (++current_idx) - 1;
359  if(idx >= total_num_items) {
360  break;
361  }
362 
363  std::sort(item_interaction_data.begin() + item_neighbor_boundaries[idx],
364  item_interaction_data.begin() + item_neighbor_boundaries[idx + 1],
365  [](const std::pair<size_t, final_interaction_data_type>& p1,
366  const std::pair<size_t, final_interaction_data_type>& p2) {
367  return p1.first < p2.first;
368  });
369  }
370  });
371  }
372 
373  ////////////////////////////////////////////////////////////////////////////////
374 
375  /** An internal routine
376  *
377  */
378  void setup_by_raw_similarity(
379  size_t num_items,
380  const flex_list& item_data,
381  const sframe& _interaction_data,
382  const std::string& item_column,
383  const std::string& similar_item_column,
384  const std::string& similarity_column,
385  bool add_reverse = false) {
386 
387  total_num_items = num_items;
388 
389  {
390  std::vector<final_item_data_type> _final_item_data;
391 
392  if(use_final_item_data()) {
393  ASSERT_EQ(item_data.size(), total_num_items);
394 
395  _final_item_data.resize(total_num_items);
396  for(size_t i = 0; i < total_num_items; ++i) {
397  similarity.import_final_item_value(_final_item_data[i], item_data[i]);
398  }
399  }
400 
401  init_item_lookups(num_items, _final_item_data);
402  }
403 
404  // Pretty much no magic here. Just read it all out and dump it
405  // into the item_interaction_data lookup.
406 
407  sframe interaction_data = _interaction_data.select_columns(
408  {item_column, similar_item_column, similarity_column});
409 
410  if(interaction_data.column_type(0) != flex_type_enum::INTEGER) {
411  log_and_throw("Items in provided data must be integers in the set {0, ..., num_items}.");
412  }
413 
414  if(interaction_data.column_type(1) != flex_type_enum::INTEGER) {
415  log_and_throw("Similar items in provided data must be integers in the set {0, ..., num_items}.");
416  }
417 
418  parallel_sframe_iterator_initializer it_init(interaction_data);
419 
420  in_parallel([&](size_t thread_idx, size_t num_threads) {
421 
422  final_interaction_data_type final_interaction_data = final_interaction_data_type();
423 
424  for(parallel_sframe_iterator it(it_init, thread_idx, num_threads); !it.done(); ++it) {
425  size_t item_a = it.value(0);
426  size_t item_b = it.value(1);
427  if(item_a == item_b) continue;
428 
429  const flexible_type& sim_value = it.value(2);
430 
431  if(item_a >= num_items || item_b >= num_items) {
432  auto raise_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
433  std::ostringstream ss;
434  ss << "Out of range item index encountered in row " << it.row_index()
435  << "; item index = " << std::max(item_a, item_b) << " >= " << num_items << " = num_items."
436  << std::endl;
437  log_and_throw(ss.str().c_str());
438  };
439 
440  raise_error();
441  }
442 
443  similarity.import_final_interaction_value(final_interaction_data, sim_value);
444  insert_into_lookup(item_a, item_b, final_interaction_data);
445 
446  if(add_reverse) {
447  insert_into_lookup(item_b, item_a, final_interaction_data);
448  }
449  }
450  });
451 
452  // Now, finalize the lookups and we're done!
453  finalize_lookups();
454  }
455 
456  ////////////////////////////////////////////////////////////////////////////////
457 
458  /** Estimates the density of the matrix, so we can get an accurate
459  * picture of how many passes will be needed to properly fit
460  * everything in memory.
461  */
462  double estimate_sparse_matrix_density(
463  const item_info_vector& item_info,
464  const std::vector<size_t>& items_per_user) {
465 
466  random::seed(0);
467 
468  size_t degree_approximation_threshold = options.at("degree_approximation_threshold");
469  size_t num_items = item_info.size();
470 
471  // This is a somewhat challenging one to figure out, and the
472  // critical value is the expected density of the matrix. To get
473  // this, we choose a number of edges as specific points, then go
474  // through and calculate the probability that each of these has
475  // been hit on a pass through the data.
476  //
477  // We have the exact marginal probability an item i is chosen from
478  // the item counts. Call this p_i.
479  //
480  // For a given user that has rated $n_u$ items, we assume that the
481  // items are chosen iid according the probability an item is
482  // chosen is sampling p_i without replacement. For ease of use,
483  // we relax the "without replacement", and just assume it's chosen
484  // with replacement. Then the probability a given user has chosen
485  // item i in his full collection is $1 - (1 - p_i)^n_u$.
486  //
487  // Now, we are actually looking at interactions, so a given edge
488  // has two parts -- (i, j). Furthermore, we limit the number of
489  // edges considered by one user to degree_approximation_threshold;
490  // which we need to account for as well. Thus, the probability
491  // that this edge is hit by user u is:
492  //
493  // r_{iju} (1 - (1 - p_i)^{n_u}) (1 - (1 - p_j)^{n*_u})
494  //
495  // where {n^*_u} = min(degree_approximation_threshold, n_u).
496  //
497  // Then, the probability q_{ij} that a given edge (i, j) is hit is
498  // given by:
499  //
500  // q_{ij} = 1 - \prod_u (1 - r_{iju})
501  //
502  // Using logs for numerical stability:
503  //
504  // log(r_{iju}) = (log(1 - exp(n_u * log(1 - p_i) ) )
505  // + log(1 - exp(n^*_u * log(1 - p_j) ) ) )
506  //
507  // = log1me(n_u * log(1 - p_i) ) + log1me(n*_u * log(1 - p_j) )
508  //
509  // where log1me is the numerically stable version of log(1 - exp(x)).
510  //
511  // Thus log(1 - q_ij) = sum_u log(1 - exp(log(r_{iju})) )
512  // = sum_u log1me( log1me(n_u * log(1 - p_i) )
513  // + log1me(n*_u * log(1 - p_j) ) )
514  //
515 
516  ////////////////////////////////////////////////////////////////////////////////
517  // Calculate log(1 - p_i) for all items.
518 
519  std::vector<double> item_log1mp(num_items);
520 
521  size_t total_item_counts = 0;
522 
523  for(const auto& ii : item_info) {
524  total_item_counts += ii.num_users;
525  }
526 
527  // Calculate log(1 - p_i) for all i.
528  parallel_for(size_t(0), num_items, [&](size_t i) {
529  DASSERT_GT(item_info[i].num_users, 0);
530  double hit_p = double(item_info[i].num_users) / total_item_counts;
531  item_log1mp[i] = std::log1p(-hit_p);
532  });
533 
534  ////////////////////////////////////////////////////////////////////////////////
535  // create function to calculate log(1 - r_iju) given i, j, u.
536  // This is what is accumulated as things grow.
537 
538  // This is where all the math happens
539  auto calc_log_hit_prob_accumulation = [&](
540  size_t i, size_t j, size_t user_item_count) {
541 
542  size_t clipped_user_item_count
543  = std::min(user_item_count, degree_approximation_threshold);
544 
545  double log_riju = (
546  log1me(std::min<double>(-1e-16, clipped_user_item_count * item_log1mp[i]))
547  + log1me(std::min<double>(-1e-16, clipped_user_item_count * item_log1mp[j])));
548 
549  // std::ostringstream ss;
550  // ss << "hit_probability (" << i << ", " << j << ", " << user_item_count << ") = "
551  // << std::exp(log_riju) << "; log prob miss = " << log1me(log_riju)
552  // << "; item_log1mp[i] = " << item_log1mp[i]
553  // << "; item_log1mp[j] = " << item_log1mp[j]
554  // << "; user_item_count[j] = " << user_item_count << std::endl;
555 
556  // std::cerr << ss.str() << std::endl;
557 
558  return log1me(std::min<double>(1e-16, log_riju));
559  };
560 
561  struct _sample {
562  size_t i = 0, j = 0;
563  double log_1_m_q = 0;
564  double estimated_prob = 0;
565  };
566 
567  size_t num_samples = options.at("sparse_density_estimation_sample_size");
568 
569  // Don't need an insane number of samples if we don't have that
570  // many passes through the data to consider.
571  num_samples = std::min(num_samples, num_items * num_items);
572 
573  // Sample a number of locations here.
574  std::vector<_sample> samples(num_samples);
575 
576  // Go through and do the estimation
577  in_parallel(
578  [&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
579 
580  size_t sample_start_idx = (thread_idx * samples.size()) / num_threads;
581  size_t sample_end_idx = ((thread_idx + 1) * samples.size()) / num_threads;
582 
583  for(size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
584  auto& s = samples[s_idx];
585  s.i = random::fast_uniform<size_t>(0, num_items - 1);
586  s.j = random::fast_uniform<size_t>(0, num_items - 1);
587  s.log_1_m_q = 0;
588  }
589 
590  double mult_factor = 1;
591 
592  std::vector<size_t> item_count_distribution;
593 
594  static constexpr size_t user_count_dist_sample_size = 5000;
595 
596  // If we have a ton of users, then we subsample this part here.
597  if(items_per_user.size() > user_count_dist_sample_size) {
598  // Do an iid sample here.
599  item_count_distribution.resize(user_count_dist_sample_size);
600  for(size_t i = 0; i < user_count_dist_sample_size; ++i) {
601  size_t idx = random::fast_uniform<size_t>(0, items_per_user.size() - 1);
602  item_count_distribution[i] = items_per_user[idx];
603  }
604  mult_factor = double(items_per_user.size()) / item_count_distribution.size();
605  } else {
606  item_count_distribution = items_per_user;
607  }
608 
609  for(size_t idx = 0; idx < item_count_distribution.size(); ++idx) {
610  for(size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
611  auto& s = samples[s_idx];
612  size_t count = item_count_distribution[idx];
613  s.log_1_m_q += mult_factor * calc_log_hit_prob_accumulation(s.i, s.j, count);
614  }
615  }
616 
617  for(size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
618  auto& s = samples[s_idx];
619  s.estimated_prob = -std::expm1(s.log_1_m_q);
620  DASSERT_LE(s.estimated_prob, 1.0 + 1e-6);
621  DASSERT_GE(s.estimated_prob, 0.0 - 1e-6);
622  }
623  });
624 
625  double total_prob = 0;
626 
627  for(const _sample& s : samples) {
628  total_prob += s.estimated_prob;
629  }
630 
631  double estimated_density = total_prob / samples.size();
632 
633  return estimated_density;
634  }
635 
636  /** Calculate the slice structure of the full matrix.
637  *
638  */
639  std::vector<size_t> calculate_slice_structure(
640  size_t num_items, size_t max_slices, double bytes_per_interaction) {
641 
642  size_t target_memory_usage = options.at("target_memory_usage");
643 
644  size_t target_num_items_per_slice
645  = std::ceil(target_memory_usage / bytes_per_interaction);
646 
647  // Make sure it's at least 1.
648  target_num_items_per_slice = std::max(num_items, target_num_items_per_slice);
649 
650  auto slice_boundaries = calculate_upper_triangular_slice_structure(
651  num_items, target_num_items_per_slice, max_slices);
652 
653  return slice_boundaries;
654  }
655 
656  /** Bytes per item in the dense case.
657  */
658  double bytes_per_item_dense() {
659  return sizeof(interaction_data_type);
660  }
661 
662  /** Bytes per item in the sparse case.
663  */
664  double bytes_per_item_sparse(const item_info_vector& item_info,
665  const std::vector<size_t>& items_per_user) {
666 
667  logstream(LOG_INFO) << "Estimating relative cost of doing sparse lookups vs. dense lookups."
668  << std::endl;
669 
670  double estimated_density = estimate_sparse_matrix_density(item_info, items_per_user);
671 
672  logstream(LOG_INFO) << "Estimated sparse matrix density at "
673  << estimated_density << ". " << std::endl;
674 
675  // The 1.7 here comes from the average memory usage per element factor of
676  // google's dense_hash_set table. We store 1 index and 1 edge per element.
677  double estimated_memory_usage_per_element
678  = estimated_density * (1.7 * (sizeof(size_t) + sizeof(interaction_data_type) ));
679 
680  return estimated_memory_usage_per_element;
681  }
682 
683  ////////////////////////////////////////////////////////////////////////////////
684 
685  /** Get the threshold user count value above which we assume the
686  * individual effect of a single edge is negligible. This allows
687  * us to prune a user's items to something more manageable.
688  */
689  size_t get_item_count_threshold_for_user(
690  const item_info_vector& item_info,
691  const std::vector<std::pair<size_t, double> >& item_list) {
692 
693  size_t degree_approximation_threshold =
694  options.at("degree_approximation_threshold");
695 
696  DASSERT_GT(item_list.size(), degree_approximation_threshold);
697 
698  std::vector<size_t> items(item_list.size());
699  for(size_t i = 0; i < item_list.size(); ++i) {
700  items[i] = item_list[i].first;
701  }
702 
703  // For all the users that have over
704  // degree_approximation_threshold ratings, we register the
705  // least frequently occuring items and only look at the
706  // incoming edges to those.
707  std::nth_element(
708  items.begin(),
709  items.begin() + degree_approximation_threshold,
710  items.end(),
711  [&](size_t i, size_t j) { return item_info[i].num_users < item_info[j].num_users; });
712 
713  size_t item_count_threshold = item_info[items[degree_approximation_threshold]].num_users;
714 
715  // Two checks to make sure our math is indeed correct.
716  // We want to make sure that approximately
717  // degree_approximation_threshold items with the
718  // fewest hit counts -- therefore, the items most
719  // likely to be influenced by this user -- are the
720  // ones we hit below. If the math in determining this
721  // threshhold is correct, then there should be at
722  // least degree_approximation_threshold items with
723  // equal to or fewer than item_count_threshold users,
724  // and fewer than degree_approximation_threshold items
725  // with fewer than item_count_threshold users, as
726  // item_count_threshold should be the count of the
727  // degree_approximation_threshold item if they are
728  // sorted by user count.
729 
730 #ifndef NDEBUG
731  {
732  size_t n1 = std::count_if(
733  item_list.begin(), item_list.end(),
734  [&](std::pair<size_t, double> p) {
735  return item_info[p.first].num_users <= item_count_threshold;
736  });
737 
738  DASSERT_TRUE(n1 > degree_approximation_threshold);
739 
740  size_t n2 = std::count_if(
741  item_list.begin(), item_list.end(),
742  [&](std::pair<size_t, double> p) {
743  return item_info[p.first].num_users < item_count_threshold;
744  });
745 
746  DASSERT_TRUE(n2 <= degree_approximation_threshold);
747  }
748 #endif
749 
750  return item_count_threshold;
751  }
752 
753  ////////////////////////////////////////////////////////////////////////////////
754  //
755 
756  /** A utility to run nearest neighbors to eliminate some of the items.
757  */
758  size_t preprocess_nearest_neighbors(
759  dense_bitset& item_in_nearest_neighbors,
760  const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data,
761  const item_info_vector& item_info,
762  const std::vector<size_t>& items_per_user,
763  std::shared_ptr<_progress_tracker> progress_tracker) {
764 
765  // Check some of the input terms.
766  DASSERT_EQ(item_in_nearest_neighbors.size(), item_info.size());
767  DASSERT_EQ(item_in_nearest_neighbors.popcount(), 0);
768 
769  size_t num_items = item_info.size();
770  size_t num_users = items_per_user.size();
771 
772  std::string force_mode = options.at("training_method");
773  DASSERT_TRUE(force_mode == "auto" || force_mode.substr(0,2) == "nn");
774 
775  // The minimum number of users that hit an item has to be at least
776  // num_users / nearest_neighbors_user_count_ratio_threshhold. In
777  // that case, the nearest_neighbors_user_count_ratio_threshhold is
778  // fixed.
779  size_t nearest_neighbors_user_count_ratio_threshhold = 32;
780 
781  // To make nearest neighbors worth it, we should actually run it
782  // on as many items as are feasible to do. This is per thread, as
783  // each thread has an iterator which then reads local query rows,
784  // but the most expensive part is reading in the data.
785  size_t nearest_neighbors_min_num_items = 32;
786  nearest_neighbors_min_num_items = std::min(
787  item_info.size(), nearest_neighbors_min_num_items);
788 
789  // Now, do we have enough to run nearest neighbors on the data?
790  // Not worth it if we don't have enough items to run through, but
791  // removing a single common item can have significant impact on
792  // the later performance.
793  size_t n_in_nearest_neighbors = 0;
794 
795  // Any items with more users than this we allocate to nearest neighbors.
796  size_t user_count_threshold
797  = num_users / nearest_neighbors_user_count_ratio_threshhold;
798 
799  for(size_t i = 0; i < num_items; ++i) {
800  if(item_info[i].num_users > user_count_threshold) {
801  item_in_nearest_neighbors.set_bit(i);
802  ++n_in_nearest_neighbors;
803  }
804  }
805 
806  ////////////////////////////////////////////////////////////////////////////////
807  // Clip to make sure we are fitting within the number of items.
808 
809  // Now, handle the edge cases.
810  if(force_mode == "auto") {
811 
812  // If there is no advantage to running anything here, then just
813  // skip this step.
814  if(n_in_nearest_neighbors == 0) {
815  return 0;
816  }
817 
818  // Finally, if we can efficiently handle all the items, then we
819  // should.
820  if(nearest_neighbors_min_num_items == item_info.size()) {
821  item_in_nearest_neighbors.fill();
822  n_in_nearest_neighbors = item_info.size();
823  }
824 
825  } else {
826 
827  if(force_mode == "nn") {
828  // Force everything to be done with nearest neighbors.
829  item_in_nearest_neighbors.fill();
830  n_in_nearest_neighbors = item_info.size();
831  } else if(force_mode.substr(0, 2) == "nn") {
832 
833  // We are forced to do nearest neighbors here, but we need to
834  // make sure that we don't actually do it on all of them so
835  // some are left over for testing the next stage.
836 
837  // Make sure that the minimum does not force us to do all the
838  // items.
839  if(nearest_neighbors_min_num_items == item_info.size()) {
840  nearest_neighbors_min_num_items /= 2;
841  }
842 
843  // If this is the case, the nth_element operation below will
844  // fill in the items.
845  if(n_in_nearest_neighbors == 0) {
846  // Pass. The minimum number was not reached. This is just
847  // to make this case explicit -- it will be continued below.
848  }
849 
850  // In this case, we reset the mask and let the nth_element
851  // operation below pick off the top items.
852  if(n_in_nearest_neighbors == item_info.size()) {
853  item_in_nearest_neighbors.clear();
854 
855  // Guaranteed to hit the nth_element mode below.
856  n_in_nearest_neighbors = nearest_neighbors_min_num_items - 1;
857  }
858  }
859  }
860 
861  ////////////////////////////////////////////////////////////////////////////////
862  // Step 2: Check to make sure the ne nearest neighbors worth it,
863  // take the transpose and subsequent nearest neighbors operations
864  // are worth it -- so pick the nearest_neighbors_min_num_items
865  // number most expensive number of items and run them through
866  // nearest neighbors.
867 
868  if(n_in_nearest_neighbors < nearest_neighbors_min_num_items) {
869 
870  // Okay, we need to make the nearest neighbors worth it, so move
871  // some more items over there.
872 
873  std::vector<std::pair<size_t, size_t> > count_buffer(item_info.size());
874  for(size_t i = 0; i < item_info.size(); ++i) {
875  count_buffer[i] = {item_info[i].num_users, i};
876  }
877 
878  std::nth_element(count_buffer.begin(),
879  count_buffer.begin() + nearest_neighbors_min_num_items,
880  count_buffer.end(),
881  [](const std::pair<size_t, size_t>& p1,
882  const std::pair<size_t, size_t>& p2) {
883  return p1.first > p2.first;
884  });
885 
886  DASSERT_GE(count_buffer.at(nearest_neighbors_min_num_items - 1).first,
887  count_buffer.at(nearest_neighbors_min_num_items).first);
888 
889  // Add these to the mask.
890  for(size_t i = 0; i < nearest_neighbors_min_num_items; ++i) {
891  item_in_nearest_neighbors.set_bit(count_buffer[i].second);
892  }
893 
894  n_in_nearest_neighbors = nearest_neighbors_min_num_items;
895  }
896 
897  DASSERT_EQ(item_in_nearest_neighbors.popcount(), n_in_nearest_neighbors);
898 
899  logprogress_stream << "Processing the " << n_in_nearest_neighbors
900  << " most common items by brute force search." << std::endl;
901 
902  ////////////////////////////////////////////////////////////////////////////////
903  // Step 3: Transpose the array so it's a by-item list of the users
904  // for each item.
905 
906  std::vector<size_t> users_per_item(item_info.size());
907  for(size_t i = 0; i < item_info.size(); ++i) {
908  users_per_item[i] = item_info[i].num_users;
909  }
910 
911  std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > > transposed_data
913  data, users_per_item, options.at("target_memory_usage"));
914 
915  ////////////////////////////////////////////////////////////////////////////////
916  // Step 4: Run it through a nearest neighbors brute-force search.
917 
918  /** Function to process the pairs coming in.
919  */
920  auto process_item_pair = [&](size_t item_a, size_t item_b, const final_interaction_data_type& value)
921  GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
922 
923  DASSERT_NE(item_a, item_b);
924 
925  insert_into_lookup(item_a, item_b, value);
926  insert_into_lookup(item_b, item_a, value);
927 
928  // Increment this by one.
929  progress_tracker->increment_item_counter();
930  };
931 
932  /** A function that allows certain pairs to be skipped and not
933  * processed. We skip the transpose of pairs already in the
934  * query list, and identical indices.
935  */
936  auto skip_pair = [&](size_t query_idx, size_t ref_idx) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
937 
938  if(query_idx == ref_idx)
939  return true;
940 
941  if(query_idx < ref_idx) {
942  return item_in_nearest_neighbors.get(ref_idx);
943  }
944 
945  return false;
946  };
947 
948  progress_tracker->print_header();
949 
950  /** Now send it off to the brute force similarity lookup tables.
951  */
952  brute_force_all_pairs_similarity_with_vector_reference(
953  /* Reference data. */
954  transposed_data,
955  item_info,
956 
957  /* Query data. Same set. */
958  transposed_data,
959  item_info,
960 
961  /* The similarity. */
962  similarity,
963 
964  /* Process function. */
965  process_item_pair,
966 
967  /* metadata -- the number of dimensions; in this case, it's the number of users. */
968  num_users,
969  options.at("target_memory_usage"),
970 
971  /* Skip processing of items we don't want to. */
972  skip_pair,
973 
974  /* the mask. */
975  &item_in_nearest_neighbors);
976 
977  // And we're done!
978  return n_in_nearest_neighbors;
979  }
980 
981  /** The main function that processes the sparse matrix sarray.
982  */
983  template <typename ItemItemContainer>
984  void _train_with_sparse_matrix_sarray(
985  ItemItemContainer&& interaction_data,
986  const std::vector<size_t>& slice_boundaries,
987  const item_info_vector& item_info,
988  const std::vector<size_t>& items_per_row,
989  const vector_index_mapper& index_mapper,
990  std::shared_ptr<_progress_tracker> progress_tracker,
991  const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data) {
992 
993  ////////////////////////////////////////////////////////////////////////////////
994  // Step 1. Define constants needed later on, along with common lookup tables.
995 
996  // Locks, in case they are needed. Just hash to a particular
997  // point in this array to avoid contention.
998  static constexpr bool use_interaction_locks = (
999  // If we require edge locking for the similarity types...
1000  SimilarityType::require_interaction_locking()
1001  // ... and we aren't using the sparse_parallel_2d_array, which
1002  // has edge locking built in.
1003  && !std::is_same<ItemItemContainer,
1005 
1006  static constexpr size_t n_interaction_locks = (use_interaction_locks ? 1024 : 1);
1007  std::array<simple_spinlock, n_interaction_locks> interaction_locks;
1008 
1009  // Now that that is set up, get the rest.
1010  const size_t num_items = item_info.size();
1011  TURI_ATTRIBUTE_UNUSED_NDEBUG const size_t n = data->size();
1012  DASSERT_EQ(items_per_row.size(), n);
1013 
1014  const size_t random_seed = (options.count("random_seed")
1015  ? size_t(options.at("random_seed"))
1016  : size_t(0));
1017 
1018  size_t degree_approximation_threshold = options.at("degree_approximation_threshold");
1019 
1020  simple_spinlock pruned_user_item_count_thresholds_lock;
1021  std::map<size_t, size_t> pruned_user_item_count_thresholds;
1022 
1023  ////////////////////////////////////////////////////////////////////////////////
1024  // Variables for the progress tracking.
1025 
1026  progress_tracker->print_header();
1027 
1028  // Calculate the total number of operations registered through a
1029  // pass through the data (ignoring sampling).
1030 
1031  size_t total_interactions_to_register = 0;
1032 
1033  // These quantities are in the original, non-index-mapped values,
1034  // so the actual updates will have to be in terms of that as well.
1035  for(size_t m : items_per_row) {
1036  total_interactions_to_register += std::min(degree_approximation_threshold, m) * m;
1037  }
1038 
1039  // For each item, we register this quantity after processing each
1040  // row, so multiply this by the number of passes.
1041  total_interactions_to_register *= (slice_boundaries.size() - 1);
1042 
1043  // Now, what we actually report is in terms of the number of
1044  // item-item interactions, so each row needs to be scaled by this
1045  // amount.
1046 
1047  double progress_register_scale = (double(num_items) * num_items) / (total_interactions_to_register);
1048 
1049  ////////////////////////////////////////////////////////////////////////////////
1050  // At the beginning of each slice, this function is called. We
1051  // use that to set up the information and lookups for process_row.
1052 
1053  auto init_slice = [&](size_t slice_idx, size_t item_idx_start, size_t item_idx_end) {
1054 
1055  // The matrix starts from (item_idx_start, item_idx_start).
1056  size_t slice_height = item_idx_end - item_idx_start;
1057  size_t slice_width = num_items - item_idx_start;
1058  DASSERT_GE(slice_height, 1);
1059 
1060  // Now reset the edge data container.
1061  interaction_data.clear();
1062  interaction_data.resize(slice_height, slice_width);
1063  };
1064 
1065  ////////////////////////////////////////////////////////////////////////////////
1066  // The workhorse function; called for every row in the data and
1067  // for every slice. This processes all the items in this row,
1068  // then clears the row so
1069  // iterate_through_sparse_item_array_by_slice continues to the
1070  // next row after this.
1071 
1072  auto process_row = [&](size_t thread_idx, size_t row_idx,
1073  size_t item_idx_start, size_t item_idx_end,
1074  std::vector<std::pair<size_t, double> >& item_list)
1075  GL_GCC_ONLY(GL_HOT_INLINE) {
1076 
1077  ////////////////////////////////////////////////////////////////////////////////
1078  // If the index mapping is enabled, remap the indices to
1079  // the current internal ones.
1080 
1081  do {
1082  // First, report this to the progress tracker based on progress size.
1083  {
1084  size_t m = item_list.size();
1085  size_t n_interactions_to_register
1086  = size_t(progress_register_scale * std::min(m, degree_approximation_threshold) * m);
1087 
1088  progress_tracker->increment_item_counter(n_interactions_to_register);
1089  }
1090 
1091  index_mapper.remap_sparse_vector(item_list);
1092 
1093  // It may be that the above cleared out all the items.
1094  if(item_list.empty()) {
1095  break;
1096  }
1097 
1098  ////////////////////////////////////////////////////////////////////////////////
1099  // Check if we need to threshold this one to make things
1100  // computationally feasible.
1101 
1102  size_t item_count_threshold = std::numeric_limits<size_t>::max();
1103 
1104  // Data structures for the sampling. We just use a
1105  // simple hash-based sampling that is deterministic by
1106  // row, so the entire thing is deterministic by
1107  // random_seed.
1108  size_t rng_gen_value = 0;
1109 
1110  // items with hash above this are excluded.
1111  size_t rng_64bit_threshhold = std::numeric_limits<size_t>::max();
1112 
1113  // Exclusion function.
1114  auto exclude_item_by_sampling = [&](size_t idx) {
1115  rng_gen_value = hash64(rng_gen_value, idx);
1116  return rng_gen_value >= rng_64bit_threshhold;
1117  };
1118 
1119  // Do we need to approximate this interaction?
1120  bool approximation_enabled = (item_list.size() > degree_approximation_threshold);
1121 
1122  if(approximation_enabled) {
1123  // The approximation used here has two part:
1124  //
1125  // - For incoming edges -- that is, vertices who's
1126  // nearest neighors we are choosing, we pick the top
1127  // degree_approximation_threshold items that have
1128  // the lowest counts. The rational is that
1129  // processing single row will have the most impact
1130  // on these items, since the fewest other items
1131  // process them.
1132  //
1133  // - For outgoing edges -- that is, vertices that we
1134  // process here -- we sample down to
1135  // degree_approximation_threshold items. That means
1136  // that we will always have at most
1137  // degree_approximation_threshold**2 items processed
1138  // per user.
1139 
1140  ////////////////////////////////////////////////////////////////////////////////
1141  // Step 1: Set up the pruning part.
1142 
1143  // Get the pruning threshold
1144  {
1145  std::lock_guard<simple_spinlock> lg(pruned_user_item_count_thresholds_lock);
1146 
1147  auto it = pruned_user_item_count_thresholds.find(row_idx);
1148 
1149  if(it == pruned_user_item_count_thresholds.end()) {
1150  item_count_threshold
1151  = get_item_count_threshold_for_user(item_info, item_list);
1152  pruned_user_item_count_thresholds[row_idx] = item_count_threshold;
1153  } else {
1154  item_count_threshold = it->second;
1155  }
1156  }
1157 
1158  ////////////////////////////////////////////////////////////////////////////////
1159  // Step 2: Set up the random sampling for the inner part.
1160 
1161  rng_gen_value = hash64(random_seed, row_idx);
1162  rng_64bit_threshhold = ((std::numeric_limits<size_t>::max() / item_list.size())
1163  * degree_approximation_threshold);
1164  }
1165 
1166  // Set the iteration bounds on the incoming list of
1167  // items based on which slice we are in. Requires a
1168  // search if they are not at the end.
1169  size_t list_idx_start, list_idx_end;
1170 
1171  std::tie(list_idx_start, list_idx_end)
1172  = find_slice_boundary_indices(item_list, item_idx_start,item_idx_end);
1173 
1174  // If this one is empty, ignore.
1175  if(UNLIKELY(list_idx_start == list_idx_end)) {
1176  break;
1177  }
1178 
1179  // Make sure we have indeed found the correct boundaries.
1180  DASSERT_LT(item_list[list_idx_end-1].first, item_idx_end);
1181  DASSERT_GE(item_list[list_idx_end-1].first, item_idx_start);
1182  DASSERT_LT(item_list[list_idx_start].first, item_idx_end);
1183  DASSERT_GE(item_list[list_idx_start].first, item_idx_start);
1184 
1185  ////////////////////////////////////////////////////////////////////////////////
1186  // Now that list_idx_start and list_idx_end are set, iterate
1187  // over the elements in it.
1188  for(size_t idx_a = list_idx_start; idx_a < list_idx_end; ++idx_a) {
1189 
1190  size_t item_a = item_list[idx_a].first;
1191  const auto& value_a = item_list[idx_a].second;
1192 
1193  // If this is one of the ones we've determined not to
1194  // worry about by threshold count, then skip it.
1195  if(item_info[item_a].num_users > item_count_threshold)
1196  continue;
1197 
1198  // Only do the upper slice of the triangle --
1199  // everything is assumed to be symmetric.
1200  for(size_t idx_b = idx_a + 1; idx_b < item_list.size(); ++idx_b) {
1201  if(exclude_item_by_sampling(idx_b))
1202  continue;
1203 
1204  size_t item_b = item_list[idx_b].first;
1205  const auto& value_b = item_list[idx_b].second;
1206 
1207  size_t row_idx = item_a - item_idx_start;
1208  size_t col_idx = item_b - item_idx_start;
1209 
1210  DASSERT_LT(row_idx, col_idx);
1211 
1212  auto update_interaction_f = [&](interaction_data_type& edge)
1213  GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
1214 
1215  // Apply the edge.
1216  similarity.update_interaction(
1217  edge,
1218  item_info[item_a].item_data, item_info[item_b].item_data,
1219  value_a, value_b);
1220  };
1221 
1222  if(use_interaction_locks) {
1223  std::lock_guard<simple_spinlock> lg(
1224  interaction_locks[hash64(row_idx, col_idx) % n_interaction_locks]);
1225 
1226  interaction_data.apply(row_idx, col_idx, update_interaction_f);
1227  } else {
1228  interaction_data.apply(row_idx, col_idx, update_interaction_f);
1229  }
1230  }
1231  }
1232 
1233  } while(false);
1234 
1235  // Clear the item list so the iteration function doesn't iterate
1236  // through the resulting elements. (see docs for
1237  // iterate_through_sparse_item_array_by_slice).
1238  item_list.clear();
1239  };
1240 
1241  ////////////////////////////////////////////////////////////////////////////////
1242  // The function to process a given element. We're not using that
1243  // one here, so just declare it to be empty.
1244 
1245  auto empty_process_element = [](size_t thread_idx, size_t row_idx,
1246  size_t item_idx_start, size_t item_idx_end,
1247  size_t item_idx, double value)
1248  GL_GCC_ONLY(GL_HOT_INLINE)
1249  {};
1250 
1251 
1252  ////////////////////////////////////////////////////////////////////////////////
1253  // At the end of every slice, go through and process all the
1254  // lookup tables that process_row filled. Between init_slice and
1255  // finalize_slice was one complete run through the data.
1256 
1257  auto finalize_slice = [&](size_t slice_idx, size_t item_idx_start, size_t item_idx_end) {
1258 
1259  ////////////////////////////////////////////////////////////////////////////////
1260  // Process all of these edges, then go back and do another pass
1261  // on the next slice if necessary. Note that there is
1262  // guaranteed to be no thread accessing the same data at once.
1263 
1264  interaction_data.apply_all(
1265  [&](size_t row_idx, size_t col_idx, const interaction_data_type& edge)
1266  GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
1267 
1268  final_interaction_data_type final_interaction_data = final_interaction_data_type();
1269 
1270  size_t item_a = item_idx_start + row_idx;
1271  size_t item_b = item_idx_start + col_idx;
1272 
1273  DASSERT_LT(item_a, item_b);
1274 
1275  this->similarity.finalize_interaction(
1276  final_interaction_data,
1277  item_info[item_a].final_item_data,
1278  item_info[item_b].final_item_data,
1279  edge,
1280  item_info[item_a].item_data,
1281  item_info[item_b].item_data);
1282 
1283  // Remap the indices if needed.
1284  size_t abs_item_a = index_mapper.map_internal_index_to_data_index(item_a);
1285  size_t abs_item_b = index_mapper.map_internal_index_to_data_index(item_b);
1286 
1287  // Insert into the central lookup.
1288  insert_into_lookup(abs_item_a, abs_item_b, final_interaction_data);
1289  insert_into_lookup(abs_item_b, abs_item_a, final_interaction_data);
1290  });
1291  };
1292 
1293  // Now, run the above functions!
1295  data,
1296  slice_boundaries,
1297  init_slice,
1298  process_row,
1299  empty_process_element,
1300  finalize_slice);
1301  }
1302 
1303  ////////////////////////////////////////////////////////////////////////////////
1304  // This works okay.
1305 
1306  /** Algorithm for implementation of the sparse_similarity similarities.
1307  */
1308  std::map<std::string, flexible_type>
1309  train_from_sparse_matrix_sarray(
1310  size_t num_items,
1311  const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data) {
1312 
1313  turi::timer total_timer;
1314  total_timer.start();
1315  auto progress_tracker = std::make_shared<_progress_tracker>(num_items);
1316 
1317  std::map<std::string, flexible_type> ret;
1318 
1319  // For debugging purposes, this
1320  std::string force_mode = options.at("training_method");
1321 
1322  ////////////////////////////////////////////////////////////////////////////////
1323  // Step 1 -- set up the vertices. Based on this, we end up
1324  // dividing some items into nearest neighbors, some into
1325  // dense symmetric similarity, some into dense non-symmetric similarity,
1326  // and some into sparse similarity depending on the item counts of each.
1327 
1328  item_info_vector item_info;
1329  vector_index_mapper index_mapper;
1330  std::vector<size_t> items_per_user;
1331  bool nearest_neighbors_run = false;
1332 
1333  calculate_item_processing_colwise(
1334  item_info, similarity, data, num_items, &items_per_user);
1335 
1336  size_t num_items_remaining = item_info.size();
1337 
1338  logprogress_stream << "Setting up lookup tables." << std::endl;
1339  {
1340  std::vector<final_item_data_type> _final_item_data(num_items, final_item_data_type());
1341 
1342  for(size_t i = 0; i < item_info.size(); ++i) {
1343  _final_item_data[i] = item_info[i].final_item_data;
1344  }
1345 
1346  init_item_lookups(num_items, _final_item_data);
1347  }
1348 
1349  ////////////////////////////////////////////////////////////////////////////////
1350  // Since the dense pass option can be called in multiple places, put
1351 
1352  auto attempt_dense_pass = [&](size_t pass_limit) {
1353 
1354  std::vector<size_t> dense_slice_structure
1355  = calculate_slice_structure(num_items_remaining, pass_limit, bytes_per_item_dense());
1356 
1357  if(dense_slice_structure.empty()) {
1358  return false;
1359  }
1360 
1361  size_t num_dense_passes = dense_slice_structure.size() - 1;
1362 
1363  // If we can do it in a couple passes, then just do it and don't
1364  // do any of the preprocessing steps.
1365  if(num_dense_passes <= pass_limit) {
1366 
1367  if(num_dense_passes == 1) {
1369  << "Processing data in one pass using dense lookup tables."
1370  << std::endl;
1371  } else {
1373  << "Processing data in " << num_dense_passes << " passes using dense lookup tables."
1374  << std::endl;
1375  }
1376 
1378 
1379  {
1380  matrix_type dense_container;
1381 
1382  // Reserve to avoid expensive reallocations that cause bad allocs.
1383  if(num_dense_passes != 1) {
1384  size_t target_memory_usage = options.at("target_memory_usage");
1385  dense_container.reserve(target_memory_usage / sizeof(interaction_data_type));
1386  }
1387 
1388  // Set up the slices for the edge processing.
1389  _train_with_sparse_matrix_sarray(
1390  dense_container, dense_slice_structure, item_info,
1391  items_per_user, index_mapper, progress_tracker, data);
1392  }
1393 
1394  return true;
1395  } else {
1396  return false;
1397  }
1398  };
1399 
1400  ////////////////////////////////////////////////////////////////////////////////
1401  // Step 1: See if we can do it all with one pass of the max_num_items.
1402  size_t max_data_passes = options.at("max_data_passes");
1403 
1404  if(force_mode == "auto") {
1405  bool success = attempt_dense_pass(4);
1406  if(success) {
1407 
1408  // Record the actual training method into the dense.
1409  options["training_method"] = "dense";
1410  goto ITEM_SIM_DONE;
1411  }
1412  }
1413 
1414  if(force_mode == "dense") {
1415  bool success = attempt_dense_pass(max_data_passes);
1416  if(success) {
1417  goto ITEM_SIM_DONE;
1418  } else {
1419  std::ostringstream ss;
1420  ss <<"Not enough allowed memory to use training_method = \"dense\" with "
1421  << "max_data_passes = " << max_data_passes
1422  << "; consider increasing target_memory_usage "
1423  << " or max_data_passes." << std::endl;
1424  log_and_throw(ss.str().c_str());
1425  }
1426  }
1427 
1428  ////////////////////////////////////////////////////////////////////////////////
1429  // Step 2: First do a nearest neighbors preprocessing step to
1430  // handle the most expensive items.
1431 
1432  if(force_mode == "auto" || force_mode.substr(0, 2) == "nn") {
1433 
1434  dense_bitset item_in_nearest_neighbors(num_items);
1435 
1436  size_t n_in_nearest_neighbors = preprocess_nearest_neighbors(
1437  item_in_nearest_neighbors, data, item_info, items_per_user, progress_tracker);
1438 
1439  if(n_in_nearest_neighbors == num_items) {
1440  // The nearest neighbors has taken care of everything. We're
1441  // done.
1442 
1443  // Record the actual training method into the dense.
1444  options["training_method"] = "nn";
1445 
1446  goto ITEM_SIM_DONE;
1447  } else if(n_in_nearest_neighbors == 0) {
1448  // nearest neighbors did nothing. So we don't need to remap
1449  // the indices.
1450  nearest_neighbors_run = false;
1451  } else {
1452 
1453  // Apply this mapping to the vertex data
1454  item_in_nearest_neighbors.invert();
1455  num_items_remaining = index_mapper.set_index_mapping_from_mask(item_in_nearest_neighbors);
1456  index_mapper.remap_vector(item_info);
1457  DASSERT_EQ(num_items_remaining, item_info.size());
1458  nearest_neighbors_run = true;
1459 
1460  // Register we're going on to other methods.
1461  progress_tracker->print_break();
1462  }
1463  }
1464 
1465  ////////////////////////////////////////////////////////////////////////////////
1466  // Step 3: Are we forced to do a dense pass after this?
1467 
1468  if(force_mode == "nn:dense") {
1469  bool success = attempt_dense_pass(max_data_passes);
1470  if(success) {
1471  goto ITEM_SIM_DONE;
1472  } else {
1473  std::ostringstream ss;
1474  ss <<"Not enough allowed memory to use training_method = \"nn:dense\" with "
1475  << "max_data_passes = " << max_data_passes
1476  << "; consider increasing target_memory_usage "
1477  << " or max_data_passes." << std::endl;
1478  log_and_throw(ss.str().c_str());
1479  }
1480  }
1481 
1482  ////////////////////////////////////////////////////////////////////////////////
1483  // Step 3: Pick how many passes wo
1484 
1485 
1486  // This is just to make sure we don't get in an infinite loop
1487  // here.
1488  for(size_t attempt = 0; ; ++attempt) {
1489 
1490  auto error_out = []() {
1491  log_and_throw("Unable to determine reasonable way to run "
1492  "item_similarity given constrained running parameters. "
1493  "To fix, try: (1) increasing target_memory_usage, "
1494  "(2) increasing max_data_passes, or (3) forcing nearest "
1495  "neighbors mode with training_method='nn'.");
1496  };
1497 
1498  // A safegaurd against there being an infinite loop.
1499  if(attempt >= 16) {
1500  error_out();
1501  }
1502 
1503  // First, try to do a sparse pass.
1504  double bpi_sparse = bytes_per_item_sparse(item_info, items_per_user);
1505 
1506  logstream(LOG_INFO) << "Bytes per item in sparse matrix = " << bpi_sparse << std::endl;
1507 
1508  std::vector<size_t> sparse_slice_structure
1509  = calculate_slice_structure(num_items_remaining, max_data_passes, bpi_sparse);
1510 
1511  bool sparse_possible = !sparse_slice_structure.empty();
1512 
1513  size_t num_sparse_passes = (sparse_possible
1514  ? sparse_slice_structure.size() - 1
1515  : std::numeric_limits<size_t>::max());
1516 
1517  if(sparse_possible) {
1518  logstream(LOG_INFO) << "Estimated " << num_sparse_passes
1519  << " passes needed for sparse matrix." << std::endl;
1520  } else {
1521  logstream(LOG_INFO) << "Number of data passes too high for sparse matrix. " << std::endl;
1522  }
1523 
1524  // Are we disabling the dense mode by forcing the sparse mode?
1525  // If so, then we keep trying until it works in the sparse mode.
1526  bool disable_dense = (force_mode == "sparse" || force_mode == "nn:sparse");
1527 
1528  // If we are not disabling the dense mode, then attempt to do it
1529  // by allocating enough of the
1530  if(!disable_dense) {
1531 
1532  size_t dense_mode_allowed_passes = max_data_passes;
1533 
1534  // By rough guestimation and some benchmarking, it seems that
1535  // the sparse mode occurs a 4-8x-ish penalty over the dense
1536  // mode. Furthermore, the memory usage in the dense mode is
1537  // more predictable, so we should favor it slightly. Thus, if
1538  // we are able to do the dense mode in less than 4 times the
1539  // passes it would take to do the sparse mode, just run with
1540  // that.
1541 
1542  if(sparse_possible) {
1543  dense_mode_allowed_passes = std::min(8*num_sparse_passes, max_data_passes);
1544  }
1545 
1546  bool success = attempt_dense_pass(dense_mode_allowed_passes);
1547 
1548  if(success) {
1549  if(nearest_neighbors_run) {
1550  options["training_method"] = "nn:dense";
1551  } else {
1552  options["training_method"] = "dense";
1553  }
1554  goto ITEM_SIM_DONE;
1555  }
1556  }
1557 
1558  // Okay, dense didn't work. So let's do sparse if possible.
1559  if(sparse_possible) {
1560 
1561  if(num_sparse_passes == 1) {
1563  << "Processing data in one pass using sparse lookup tables."
1564  << std::endl;
1565  } else {
1567  << "Processing data in " << num_sparse_passes
1568  << " passes using sparse lookup tables."
1569  << std::endl;
1570  }
1571 
1573 
1574  _train_with_sparse_matrix_sarray(
1575  matrix_type(), sparse_slice_structure, item_info,
1576  items_per_user, index_mapper, progress_tracker, data);
1577 
1578  // Record what we actually did for future reference.
1579  if(nearest_neighbors_run) {
1580  options["training_method"] = "nn:sparse";
1581  } else {
1582  options["training_method"] = "sparse";
1583  }
1584 
1585  goto ITEM_SIM_DONE;
1586  }
1587 
1588  ////////////////////////////////////////////////////////////////////////////////
1589  // Okay, it appears that neither method above helps. Time to
1590  // apply approximations and do it again.
1591 
1592  // If we are doing it with training_method == "auto", then lower
1593  // the degree approximation threshold. This may help us do it
1594  // approximately.
1595 
1596  size_t degree_approximation_threshold = options.at("degree_approximation_threshold");
1597 
1598  // This number is somewhat arbitrary, but not sure what else to do here...
1599  if(degree_approximation_threshold > 8) {
1601  << "Unable to fit dataset processing into limit of max_data_passes="
1602  << size_t(options.at("max_data_passes")) << " and target_memory_usage="
1603  << size_t(options.at("target_memory_usage")) << " "
1604  << "bytes. Employing more aggressive approximations; "
1605  << "increase target_memory_usage, "
1606  << "nearest_neighbors_interaction_proportion_threshold, "
1607  << "or max_data_passes to avoid this. "
1608  << std::endl;
1609 
1611  << " Setting degree_approximation_threshold="
1612  << degree_approximation_threshold << std::endl;
1613 
1614  options["degree_approximation_threshold"] /= 2;
1615  continue;
1616  } else {
1617  error_out();
1618  }
1619  }
1620 
1621  ITEM_SIM_DONE:
1622 
1623  ////////////////////////////////////////////////////////////////////////////////
1624  // Finalize it! Put in data.
1625  progress_tracker->print_footer();
1626 
1627  logprogress_stream << "Finalizing lookup tables." << std::endl;
1628 
1629  finalize_lookups();
1630 
1631  ret.insert(options.begin(), options.end());
1632  ret["training_time"] = total_timer.current_time();
1633  return ret;
1634  }
1635 
1636  ////////////////////////////////////////////////////////////////////////////////
1637  // Score item for the update
1638 
1639  mutable std::vector<std::vector<prediction_accumulation_type> >
1640  item_prediction_buffers_by_thread;
1641 
1642  /** Score all items in a list of item predictions given a list of
1643  * user-item interactions.
1644  */
1645  size_t score_items(std::vector<std::pair<size_t, double> >& item_predictions,
1646  const std::vector<std::pair<size_t, double> >& user_item_data) const {
1647 
1648  final_item_data_type _unused;
1649 
1650  // Use this in case we are already inside an in_parallel loop.
1651  size_t outer_thread_idx = thread::thread_id();
1652 
1653  DASSERT_LT(outer_thread_idx, item_prediction_buffers_by_thread.size());
1654 
1655  // Get the buffer we'll be using here
1656  auto& item_prediction_buffer = item_prediction_buffers_by_thread[outer_thread_idx];
1657  item_prediction_buffer.assign(total_num_items, prediction_accumulation_type());
1658 
1659  atomic<size_t> num_updates = 0;
1660 
1661  // The function that actually does the similarity calculations.
1662  auto _run_scoring = [&](
1663  size_t user_item_data_start, size_t user_item_data_end,
1664  bool use_unsafe_update_method) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
1665 
1666  for(size_t i = user_item_data_start; i < user_item_data_end; ++i) {
1667  size_t item = user_item_data[i].first;
1668  double score = user_item_data[i].second;
1669 
1670  if(item >= total_num_items) {
1671  continue;
1672  }
1673 
1674  for(size_t i = item_neighbor_boundaries[item]; i < item_neighbor_boundaries[item+1]; ++i) {
1675  const auto& item_neighbor = item_interaction_data[i];
1676 
1677  ++num_updates;
1678 
1679  if(use_unsafe_update_method) {
1680  similarity.update_prediction_unsafe(
1681  item_prediction_buffer[item_neighbor.first],
1682  item_neighbor.second,
1683  use_final_item_data() ? final_item_data[item] : _unused,
1684  use_final_item_data() ? final_item_data[item_neighbor.first] : _unused,
1685  score);
1686  } else {
1687  similarity.update_prediction(
1688  item_prediction_buffer[item_neighbor.first],
1689  item_neighbor.second,
1690  use_final_item_data() ? final_item_data[item] : _unused,
1691  use_final_item_data() ? final_item_data[item_neighbor.first] : _unused,
1692  score);
1693  }
1694  }
1695  }
1696  };
1697 
1698  // If possible, do the above in parallel to get accurate
1699  // recommendations.
1700  in_parallel([&](size_t thread_idx, size_t num_threads) {
1701  bool parallel_here = (num_threads != 1);
1702 
1703  if(parallel_here) {
1704  // Not inside an in_parallel call outside of this one; we
1705  // can totally parallelize this.
1706  size_t user_item_data_start = (thread_idx * user_item_data.size()) / num_threads;
1707  size_t user_item_data_end = ((thread_idx+1) * user_item_data.size()) / num_threads;
1708 
1709  _run_scoring(user_item_data_start, user_item_data_end, false);
1710  } else {
1711  // Means likely that we are already in an in_parallel call.
1712  _run_scoring(0, user_item_data.size(), true);
1713  }
1714  });
1715 
1716  in_parallel([&](size_t thread_idx, size_t num_threads) {
1717  size_t item_index_start = (thread_idx * item_predictions.size()) / num_threads;
1718  size_t item_index_end = ((thread_idx+1) * item_predictions.size()) / num_threads;
1719 
1720  for(size_t i = item_index_start; i < item_index_end; ++i) {
1721  // Finalize the result, dumping the scores into the main scoring function.
1722  auto& p = item_predictions[i];
1723  size_t item = p.first;
1724 
1725  if(item >= total_num_items) {
1726  p.second = 0;
1727  continue;
1728  }
1729 
1730  p.second = similarity.finalize_prediction(
1731  item_prediction_buffer[item],
1732  use_final_item_data() ? final_item_data[item] : _unused,
1733  user_item_data.size());
1734 
1735  DASSERT_TRUE(std::isfinite(p.second));
1736  }
1737  });
1738 
1739  // We're done!
1740  return size_t(num_updates);
1741  }
1742 
1743  ////////////////////////////////////////////////////////////////////////////////
1744  // Routines for loading and serialization.
1745 
1746  size_t get_version() const { return 1; }
1747 
1748  void save(turi::oarchive& oarc) const {
1749 
1750  oarc << get_version();
1751 
1752  oarc << total_num_items
1753  << final_item_data
1754  << item_neighbor_boundaries
1755  << item_interaction_data;
1756  }
1757 
1758  /** Load things.
1759  */
1760  void load(turi::iarchive& iarc) {
1761  size_t version = 0;
1762 
1763  iarc >> version;
1764 
1765  ASSERT_MSG(version == 1,
1766  "Item similarity lookup does not support loading from this version.");
1767 
1768  iarc >> total_num_items
1769  >> final_item_data
1770  >> item_neighbor_boundaries
1771  >> item_interaction_data;
1772  }
1773 
1774  ////////////////////////////////////////////////////////////////////////////////
1775  // Utilites for other operations
1776 
1777  /** For a given item, return the recorded closest nearest items.
1778  *
1779  */
1780  void get_similar_items(
1781  std::vector<std::pair<size_t, flexible_type> >& similar_items,
1782  size_t item,
1783  size_t top_k) const {
1784 
1785  final_item_data_type _unused;
1786 
1787  auto item_comparitor = [&](const interaction_info_type& p1, const interaction_info_type& p2) -> bool {
1788 
1789  return similarity.compare_interaction_values(
1790  p1.second,
1791  p2.second,
1792  use_final_item_data() ? final_item_data[item] : _unused,
1793  use_final_item_data() ? final_item_data[p1.first] : _unused,
1794  use_final_item_data() ? final_item_data[p2.first] : _unused);
1795  };
1796 
1797  if(item >= total_num_items) {
1798  similar_items.clear();
1799  return;
1800  }
1801 
1802  // Put them in a buffer that can be sorted by the item_comparitor function.
1803  std::vector<std::pair<size_t, final_interaction_data_type> > item_buffer(
1804  item_interaction_data.begin() + item_neighbor_boundaries[item],
1805  item_interaction_data.begin() + item_neighbor_boundaries[item + 1]);
1806 
1807  if(top_k < item_buffer.size()) {
1808  std::nth_element(item_buffer.begin(),
1809  item_buffer.begin() + top_k,
1810  item_buffer.end(),
1811  item_comparitor);
1812  item_buffer.resize(top_k);
1813  }
1814 
1815  std::sort(item_buffer.begin(), item_buffer.end(), item_comparitor);
1816 
1817  similar_items.resize(item_buffer.size());
1818  for(size_t i = 0; i < item_buffer.size(); ++i) {
1819  similar_items[i].first = item_buffer[i].first;
1820  similar_items[i].second = similarity.export_similarity_score(item_buffer[i].second);
1821  }
1822  }
1823 
1824  ////////////////////////////////////////////////////////////////////////////////
1825  // Debugging
1826 
1827  bool _debug_check_equal(const sparse_similarity_lookup& _other) const {
1828  auto other = dynamic_cast<const sparse_similarity_lookup_impl*>(&_other);
1829 
1830  std::map<std::pair<size_t, size_t>, final_interaction_data_type> edges_this, edges_other;
1831 
1832  for(size_t i = 0; i < total_num_items; ++i) {
1833  for(size_t j = item_neighbor_boundaries[i]; j < item_neighbor_boundaries[i + 1]; ++j) {
1834  edges_this[{i, item_interaction_data[j].first}] = item_interaction_data[j].second;
1835  }
1836  }
1837 
1838  for(size_t i = 0; i < total_num_items; ++i) {
1839  for(size_t j = other->item_neighbor_boundaries[i];
1840  j < other->item_neighbor_boundaries[i + 1]; ++j) {
1841 
1842  edges_other[{i, other->item_interaction_data[j].first}] = other->item_interaction_data[j].second;
1843  }
1844  }
1845 
1846  std::vector<std::pair<std::pair<size_t, size_t>, final_interaction_data_type> > in_this_not_that;
1847  std::vector<std::pair<std::pair<size_t, size_t>, final_interaction_data_type> > in_that_not_this;
1848  std::vector<std::pair<std::pair<size_t, size_t>,
1849  std::pair<final_interaction_data_type, final_interaction_data_type> > > diff_values;
1850 
1851  for(const auto& p : edges_this) {
1852  if(edges_other.count(p.first) == 0) {
1853  in_this_not_that.push_back(p);
1854  } else if(std::abs(edges_other.at(p.first) - p.second) > 1e-6) {
1855  diff_values.push_back( {p.first, {p.second, edges_other.at(p.first)} } );
1856  }
1857  }
1858 
1859  for(const auto& p : edges_other) {
1860  if(edges_this.count(p.first) == 0) {
1861  in_that_not_this.push_back(p);
1862  }
1863  }
1864 
1865  bool failed = false;
1866 
1867  if(!in_this_not_that.empty()) {
1868  std::cout << "IN THIS, NOT OTHER: " << std::endl;
1869  for(const auto& p : in_this_not_that) {
1870  std::cout << " (" << p.first.first << ", " << p.first.second << "): "
1871  << p.second << std::endl;
1872  }
1873  failed = true;
1874  }
1875 
1876  if(!in_that_not_this.empty()) {
1877  std::cout << "IN OTHER, NOT THIS: " << std::endl;
1878  for(const auto& p : in_that_not_this) {
1879  std::cout << " (" << p.first.first << ", " << p.first.second << "): "
1880  << p.second << std::endl;
1881  }
1882  failed = true;
1883  }
1884 
1885  if(!diff_values.empty()) {
1886  std::cout << "Differing Values: " << std::endl;
1887  for(const auto& p : diff_values) {
1888  std::cout << " (" << p.first.first << ", " << p.first.second << "): "
1889  << "(this = " << p.second.first << ", other = " << p.second.second << ")"
1890  << std::endl;
1891  }
1892  failed = true;
1893  }
1894 
1895  return !failed;
1896  }
1897 };
1898 
1899 }}
1900 
1901 #endif /* _ITEM_SIMILARITY_LOOKUP_H_ */
void start()
Reset the timer.
Definition: timer.hpp:75
#define logstream(lvl)
Definition: logger.hpp:276
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
GL_HOT_INLINE std::pair< size_t, size_t > find_slice_boundary_indices(const std::vector< std::pair< size_t, T > > &v, size_t item_index_lb, size_t item_index_ub)
Definition: utilities.hpp:26
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
GL_HOT_FLATTEN void remap_sparse_vector(std::vector< std::pair< size_t, T > > &data_vect) const
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
flex_type_enum column_type(size_t i) const
Definition: sframe.hpp:374
size_t map_internal_index_to_data_index(size_t internal_idx) const GL_HOT_INLINE_FLATTEN
#define LOG_INFO
Definition: logger.hpp:101
std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > transpose_sparse_sarray(std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > data, const std::vector< size_t > &item_counts, size_t max_memory_usage)
Definition: utilities.hpp:340
void iterate_through_sparse_item_array_by_slice(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, const std::vector< size_t > &slice_delimiters, SliceInitFunction &&init_slice, RowProcessFunction &&preprocess_row, ElementProcessFunction &&process_element, SliceFinalizeFunction &&finalize_slice)
Definition: utilities.hpp:146
bool get(size_t b) const
Returns the value of the bit b.
void fill()
Sets all bits to 1.
std::map< std::string, flexible_type > options
static size_t cpu_count()
static GL_HOT_INLINE_FLATTEN double log1me(double x)
Definition: logit_math.hpp:56
double current_time() const
Returns the elapsed time in seconds since turi::timer::start was last called.
Definition: timer.hpp:83
#define logprogress_stream
Definition: logger.hpp:325
static uint64_t hash64(const char *s, size_t len)
#define LOG_ERROR
Definition: logger.hpp:97
#define GL_HOT_INLINE
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
sframe select_columns(const std::vector< std::string > &names) const
size_t set_index_mapping_from_mask(const dense_bitset &is_active_entry)
void clear()
Sets all bits to 0.
#define GL_HOT_INLINE_FLATTEN
static size_t thread_id()
size_t size() const
Returns the number of bits in this bitset.
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
bool set_bit(size_t b)
Atomically sets the bit at position b to true returning the old value.
GL_HOT_FLATTEN void remap_vector(std::vector< T > &data_vect) const
A simple class that can be used for benchmarking/timing up to microsecond resolution.
Definition: timer.hpp:59
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364