Turi Create  4.0
ranking_sgd_solver_explicit.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGD_EXPLICIT_RANKING_SGD_SOLVER_CLASS_H_
7 #define TURI_SGD_EXPLICIT_RANKING_SGD_SOLVER_CLASS_H_
8 
9 #include <map>
10 #include <vector>
11 #include <random>
12 #include <type_traits>
13 #include <core/util/code_optimization.hpp>
14 #include <toolkits/ml_data_2/ml_data.hpp>
15 #include <toolkits/factorization/ranking_sgd_solver_base.hpp>
16 
17 namespace turi { namespace factorization {
18 
19 /** Ranking When Target Is Present
20  *
21  * ============================================================
22  *
23  * When the target is present, simultaneously attempt to fit the model
24  * to the targets, while penalizing items that are predicted above
25  * value_of_unobserved_items.
26  */
27 template <class SGDInterface>
28 class explicit_ranking_sgd_solver final : public ranking_sgd_solver_base<SGDInterface> {
29 public:
30 
31  /** Constructor
32  */
34  const std::shared_ptr<sgd::sgd_interface_base>& main_interface,
35  const v2::ml_data& train_data,
36  const std::map<std::string, flexible_type>& options)
37 
38  : ranking_sgd_solver_base<SGDInterface>(main_interface, train_data, options)
39  , ranking_regularization(options.at("ranking_regularization"))
40  , _unobserved_rating_value(options.at("unobserved_rating_value"))
41  , num_sampled_negative_examples(options.at("num_sampled_negative_examples"))
42  , observation_block_size(options.at("sgd_sampling_block_size"))
43  {
44  DASSERT_GT(ranking_regularization, 0);
45 
46  if(!(_unobserved_rating_value > std::numeric_limits<float>::lowest())
47  && train_data.has_target()) {
48 
49  const auto& target_stats = train_data.metadata()->target_statistics();
50 
51  if(target_stats->stdev(0) == 0) {
52  _unobserved_rating_value = target_stats->mean(0) - 1;
53 
54  } else {
55  _unobserved_rating_value = (target_stats->mean(0)
56  - 1.96 * target_stats->stdev(0));
57  }
58  }
59 
60  DASSERT_TRUE(std::isfinite(_unobserved_rating_value));
61 
62  // Set up all the thread-local buffers.
63  size_t max_n_threads = thread::cpu_count();
64  thread_x_buffers.resize(max_n_threads);
65  thread_candidate_pool.resize(max_n_threads);
66  thread_order_index_buffers.resize(max_n_threads);
67  thread_negative_example_flags.resize(max_n_threads);
68  thread_item_observed.resize(max_n_threads);
69  }
70 
71 private:
72 
73  ////////////////////////////////////////////////////////////////////////////////
74  //
75  // Typedefs -- need to pull these here.
76 
79 
80  double ranking_regularization = 0;
81  double _unobserved_rating_value = NAN;
82  size_t num_sampled_negative_examples = 0;
83  size_t observation_block_size = 0;
84 
85  static constexpr bool using_logistic_loss =
86  std::is_same<typename SGDInterface::LossModelProfile, loss_logistic>::value;
87 
88  inline double unobserved_rating_value() const {
89  return using_logistic_loss ? 0 : _unobserved_rating_value;
90  }
91 
92  /// vector of (row data, target_value) pairs.
93  typedef std::pair<std::vector<v2::ml_data_entry>, double> x_buffer_row_type;
94  typedef std::vector<x_buffer_row_type> x_buffer_type;
95 
96  // These will be used by each thread
97  std::vector<x_buffer_type> thread_x_buffers;
98 
99  std::vector<std::vector<std::pair<double, size_t> > > thread_candidate_pool;
100  std::vector<std::vector<size_t> > thread_order_index_buffers;
101  std::vector<dense_bitset> thread_negative_example_flags;
102  std::vector<dense_bitset> thread_item_observed;
103 
104  /** The main method to do the explicit ranking stuff.
105  *
106  * \param[in] thread_idx The thread index determining this block.
107  *
108  * \param[in] num_threads The number of threads.
109  *
110  * \param[in] data The ml_data instance we're working with.
111  * Primarily needed for the metadata.
112  *
113  * \param[in] it_init The iterator inializer for the
114  * ml_data_block_iterator used for this thread.
115  *
116  * \param[in] iface The working SGD interface.
117  *
118  * \param[in] step_size The current SGD step size, set by the
119  * higher level algorithm.
120  *
121  * \param[in,out] error_detected If set to true, a numerical error
122  * is detected.
123  *
124  * \return (loss, rank_loss) -- loss is the cumulative estimated
125  * loss value for this thread on predicting the training data, and
126  * rank_loss is the weighted loss on the negative examples.
127  */
128  std::pair<double, double> run_sgd_thread(
129  size_t iteration,
130  size_t thread_idx, size_t num_threads,
131  size_t block_idx, size_t num_blocks,
132  const v2::ml_data& data,
133  SGDInterface* iface,
134  double step_size,
135  volatile bool& error_detected) GL_HOT {
136 
137  // Init an alternative random engine for shuffling things. We
138  // want the calculate_objective function to track the same calls
139  // to the turicreate random number generator so that the exact
140  // objective computed at the end is over the same pass of data
141  // points as the one we track. It's a small thing internally, but
142  // important from a user experience perspective.
143 
144  std::default_random_engine shuffle_g(static_cast<uint32_t>(hash64(iteration, block_idx)));
145 
146  static constexpr size_t ITEM_COLUMN_INDEX = 1;
147 
148  double loss_value = 0, rank_loss_value = 0;
149 
150  size_t n_items = data.metadata()->column_size(1);
151 
152  x_buffer_type& x_buffer = thread_x_buffers[thread_idx];
153  dense_bitset& negative_example_flag = thread_negative_example_flags[thread_idx];
154 
155  size_t min_buffer_size = (11 * observation_block_size) / 10;
156  if(x_buffer.size() < min_buffer_size) {
157 
158  // Give a little extra room to avoid expensive resizes
159  x_buffer.resize(min_buffer_size);
160  negative_example_flag.resize(min_buffer_size);
161  }
162 
163  // Now, go through and extract everything from this block.
164  auto it = data.get_block_iterator(block_idx, num_blocks);
165 
166  neg_sample_proc_buffer neg_exm_buffer;
167 
168  // A dense bitset for the number of observed items.
169  dense_bitset& item_observed = thread_item_observed[thread_idx];
170  item_observed.resize(n_items);
171  item_observed.clear();
172 
173  // The vector of candidate examples. This is what we use to
174  // choose the negative example pool.
175 
176  std::vector<std::pair<double, size_t> >& candidate_pool = thread_candidate_pool[thread_idx];
177 
178  if(candidate_pool.size() != n_items)
179  candidate_pool.resize(n_items);
180 
181  ////////////////////////////////////////////////////////////////////////////////
182  // The main loop.
183 
184  while(!it.done() && !error_detected) {
185 
186  size_t n_items_in_buffer = 0;
187 
188  DASSERT_EQ(negative_example_flag.size(), x_buffer.size());
189  DASSERT_TRUE(negative_example_flag.empty());
190 
191  // Fill up the buffer as much as possible.
192  while(!it.done() && !error_detected && n_items_in_buffer < observation_block_size) {
193 
194  DASSERT_TRUE(item_observed.empty());
195 
196  size_t start_of_positive_examples = n_items_in_buffer;
197  size_t n_taken_items = 0;
198 
199  size_t write_idx = start_of_positive_examples;
200 
201  while(!it.done()) {
202 
203  if(UNLIKELY(x_buffer.size() <= write_idx)) {
204  auto resize = [&]() GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
205  size_t new_size = (5*(write_idx + 4)) / 4;
206  x_buffer.resize(new_size);
207  negative_example_flag.resize(new_size);
208  };
209 
210  resize();
211  }
212 
213  auto& x = x_buffer[write_idx].first;
214  it.fill_observation(x);
215  x_buffer[write_idx].second = it.target_value();
216  negative_example_flag.clear_bit_unsync(write_idx);
217 
218  size_t item = x[ITEM_COLUMN_INDEX].index;
219  DASSERT_LT(item, n_items);
220 
221  bool old_bit = item_observed.set_bit_unsync(item);
222 
223  // Account for the possibility there are duplicate items.
224  if(!old_bit)
225  ++n_taken_items;
226 
227  DASSERT_FALSE(x.empty());
228  DASSERT_FALSE(x_buffer[write_idx].first.empty());
229 
230  ++write_idx;
231  ++it;
232 
233  if(it.done() || it.is_start_of_new_block())
234  break;
235  }
236 
237  size_t n_rows = write_idx - start_of_positive_examples;
238 
239  n_items_in_buffer += n_rows;
240 
241  ////////////////////////////////////////////////////////////
242  // Step 2.2: Loop through these rows, choosing one at each
243  // point.
244 
245  // Don't add any negative examples to this part if the possible negative examples aren't there.
246  if(UNLIKELY(n_taken_items == n_items)) {
247  item_observed.clear();
248  continue;
249  }
250 
251  size_t target_num_negative_examples = n_rows;
252 
253  // Now attempt to fill up the buffer with candidate negative items
254  size_t negative_sample_start_index = n_items_in_buffer;
255  size_t current_write_idx = negative_sample_start_index;
256 
257  // Make sure we'll have enough room to add in the negative examples
258  size_t required_x_buffer_size = current_write_idx + target_num_negative_examples;
259 
260  if(UNLIKELY(x_buffer.size() <= required_x_buffer_size)) {
261  x_buffer.resize(required_x_buffer_size);
262  negative_example_flag.resize(required_x_buffer_size);
263  }
264 
265  // Now, for each positive example, select a number of
266  // candidate negative examples. Put these in the block [negative_sample_start_index, ...)
267  for(size_t pos_idx = start_of_positive_examples;
268  pos_idx < n_items_in_buffer && !error_detected;
269  ++pos_idx) {
270 
271  // If there are no more items to sample
272  if(n_taken_items == n_items)
273  break;
274 
275  double score = this->choose_negative_example(
276  thread_idx, data, iface,
277  x_buffer[current_write_idx].first,
278  x_buffer[pos_idx].first,
279  item_observed,
280  n_rows, n_items,
281  n_taken_items,
282  neg_exm_buffer);
283 
284  if(UNLIKELY(!std::isfinite(score))) {
285  error_detected = true;
286  break;
287  }
288 
289  // Only use this item if it's acceptable.
290  if(using_logistic_loss || score > unobserved_rating_value()) {
291  negative_example_flag.set_bit_unsync(current_write_idx);
292  ++n_taken_items;
293  DASSERT_LT(current_write_idx, x_buffer.size());
294  DASSERT_LT(ITEM_COLUMN_INDEX, x_buffer[current_write_idx].first.size());
295  DASSERT_LT(x_buffer[current_write_idx].first[ITEM_COLUMN_INDEX].index, item_observed.size());
296 
297  item_observed.set_bit_unsync(x_buffer[current_write_idx].first[ITEM_COLUMN_INDEX].index);
298  ++current_write_idx;
299  }
300  }
301 
302  if(UNLIKELY(error_detected))
303  break;
304 
305  size_t num_negative_examples = current_write_idx - negative_sample_start_index;
306 
307  n_items_in_buffer = current_write_idx;
308 
309  // Clear out the points in the buffer. It's important to do
310  // this now, since the steps below will alter the negative examples.
311 
313  item_observed, n_rows + num_negative_examples, n_items,
314 
315  // This function maps from 0, ..., n_rows + num_candidate_items to the item index;
316  [&](size_t i) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
317 
318  // indexes 0, ..., n_rows-1 are for the positive examples.
319  return x_buffer[i + start_of_positive_examples].first[ITEM_COLUMN_INDEX].index;
320 
321  } );
322 
323  DASSERT_TRUE(item_observed.empty());
324 
325 
326 #ifndef NDEBUG
327  for(size_t i = 0; i < n_items_in_buffer; ++i) {
328  DASSERT_FALSE(x_buffer[i].first.empty());
329  }
330 #endif
331 
332  } // End of while loop; loop while buffer has room or more data in block.
333 
334  if(UNLIKELY(error_detected))
335  break;
336 
337  ////////////////////////////////////////////////////////////////////////////////
338  // Part 2: Now we have the buffer; run through things.
339 
340  DASSERT_LE(n_items_in_buffer, x_buffer.size());
341 
342  ////////////////////////////////////////////////////////////////////////////////
343  // Part 2.1: Shuffle things via a random mapping so that we take
344  // descents in random order.
345 
346  std::vector<size_t>& descent_order_indices = thread_order_index_buffers[thread_idx];
347  descent_order_indices.resize(n_items_in_buffer);
348  std::iota(descent_order_indices.begin(), descent_order_indices.end(), size_t(0));
349  std::shuffle(descent_order_indices.begin(), descent_order_indices.end(), shuffle_g);
350 
351  ////////////////////////////////////////////////////////////////////////////////
352  // Part 2.2: Now, go through and do a descent on each of these.
353 
354  for(size_t i = 0; i < n_items_in_buffer; ++i) {
355 
356  size_t index = descent_order_indices[i];
357  DASSERT_LT(index, n_items_in_buffer);
358 
359  const auto& row = x_buffer[index];
360 
361  bool is_negative_example = negative_example_flag.get(index);
362 
363  double rr_scale = using_logistic_loss ? 1.0 : ranking_regularization;
364 
365  double r = (is_negative_example
366  ? unobserved_rating_value()
367  : row.second);
368 
369  double ss = (is_negative_example
370  ? step_size * rr_scale
371  : step_size);
372 
373  // Only apply the regularization descent operation if it's a
374  // positive example.
375  bool apply_regularization = !is_negative_example;
376 
377  DASSERT_FALSE(row.first.empty());
378 
379  double descent_loss = iface->apply_sgd_step(
380  thread_idx, row.first, r, ss, apply_regularization);
381 
382 
383  if(is_negative_example) {
384  rank_loss_value += rr_scale * descent_loss;
385  } else {
386  loss_value += descent_loss;
387  }
388  // Test for numerical issues.
389 
390  if(!std::isfinite(loss_value + rank_loss_value)) {
391  error_detected = true;
392  break;
393  }
394  } // End loop over points in the buffer
395 
396 
397  negative_example_flag.clear();
398 
399  // Further checks
400  if(!iface->state_is_numerically_stable()) {
401  error_detected = true;
402  }
403  }
404 
405  if(error_detected)
406  return {NAN, NAN};
407  else
408  return {loss_value, rank_loss_value};
409  }
410 
411  ////////////////////////////////////////////////////////////////////////////////
412 
413  /** Calculate the loss value for the block of data assigned to a
414  * particular thread.
415  *
416  * \param[in] thread_idx The thread index determining this block.
417  *
418  * \param[in] num_threads The number of threads.
419  *
420  * \param[in] data The ml_data instance we're working with.
421  * Primarily needed for the metadata.
422  *
423  * \param[in] it_init The iterator inializer for the
424  * ml_data_block_iterator used for this thread.
425  *
426  * \param[in] iface The working SGD interface.
427  *
428  * \return (loss, rank_loss) -- loss is the cumulative estimated
429  * loss value for this thread on predicting the training data, and
430  * rank_loss is the weighted loss on the negative examples.
431  */
432  std::pair<double, double> run_loss_calculation_thread(
433  size_t thread_idx, size_t num_threads,
434  const v2::ml_data& data,
435  SGDInterface* iface) const {
436 
437  double loss_value = 0, rank_loss_value = 0;
438 
439  size_t n_items = data.metadata()->column_size(1);
440 
441  x_buffer_type x_buffer;
442 
443  // Start out with 4K possible items per user; doubles as needed.
444  x_buffer.resize(4*1024);
445 
446  std::vector<v2::ml_data_entry> negative_example_x;
447 
448  neg_sample_proc_buffer neg_exm_buffer;
449 
450  dense_bitset item_observed(n_items);
451 
452  for(auto it = data.get_block_iterator(thread_idx, num_threads); !it.done();) {
453 
454  ////////////////////////////////////////////////////////////
455  // Step 2.1: Fill up the buffer with potential positive
456  // examples.
457  size_t n_rows, n_rated_items;
458 
459  std::tie(n_rows, n_rated_items) =
461  x_buffer, it, n_items, item_observed);
462 
463  ////////////////////////////////////////////////////////////
464  // Step 2.2: Loop through these rows, choosing one at each
465  // point.
466 
467  if(n_rated_items == n_items) {
468 
469  ////////////////////////////////////////////////////////////
470  // 2.2, case 1: All items have been rated. If there are no
471  // negative examples, then we just measure the fit to this
472  // user. This case is uncommon, but an important corner case.
473 
474  for(size_t i = 0; i < n_rows; ++i) {
475  const std::vector<v2::ml_data_entry>& x = x_buffer[i].first;
476  double y = x_buffer[i].second;
477 
478  double fx_hat = iface->calculate_fx(thread_idx, x);
479  double loss = iface->loss_model.loss(fx_hat, y);
480 
481  ASSERT_GE(loss, 0);
482 
483  loss_value += loss;
484  }
485 
486  } else {
487 
488  ////////////////////////////////////////////////////////////
489  // 2.2, case 2: Not all items have been rated. Sample
490  // negative items at each stage, and score them.
491 
492  for(size_t i = 0; i < n_rows; ++i) {
493 
494  const std::vector<v2::ml_data_entry>& x = x_buffer[i].first;
495  double y = x_buffer[i].second;
496 
497  DASSERT_GE(x.size(), 2);
498 
499  // Get the loss value from the positive example
500  double fx_hat = iface->calculate_fx(thread_idx, x);
501  loss_value += iface->loss_model.loss(fx_hat, y);
502 
503  // Choose a negative example.
504  double negative_example_fx =
505  this->choose_negative_example(
506  thread_idx,
507  data,
508  iface,
509  negative_example_x, x,
510  item_observed,
511  n_rows, n_items, n_rated_items,
512  neg_exm_buffer);
513 
514  if(!std::isfinite(negative_example_fx) || std::fabs(negative_example_fx) > 1e10) {
515  return {NAN, NAN};
516  }
517 
518  // Debug: Make sure the negative example isn't there
519 #ifndef NDEBUG
520  for(size_t x_check = 0; x_check < n_rows; ++x_check) {
521  DASSERT_NE(negative_example_x[1].index, x_buffer[x_check].first[1].index);
522  }
523 #endif
524 
525  if(using_logistic_loss || negative_example_fx > unobserved_rating_value()) {
526 
527  double loss = iface->loss_model.loss(negative_example_fx, unobserved_rating_value());
528 
529  ASSERT_GE(loss, 0.0);
530 
531  double rr_scale = using_logistic_loss ? 1.0 : ranking_regularization;
532 
533  rank_loss_value += (rr_scale * loss);
534  }
535 
536  } // End loop over points in the buffer
537  }
538 
539  ////////////////////////////////////////////////////////////
540  // Step 3. Clear out the points in the buffer.
541 
542  this->clear_item_observed_buffer(item_observed, n_rows, n_items,
543  [&](size_t i) { return x_buffer[i].first[1].index; } );
544  }
545 
546  return {loss_value, rank_loss_value};
547  }
548 
549 };
550 
551 }}
552 
553 #endif
GL_HOT_INLINE_FLATTEN void clear_item_observed_buffer(dense_bitset &item_observed, size_t n_rows, size_t n_items, const BufferIndexToItemIndexMapper &map_index) const
bool set_bit_unsync(size_t b)
bool get(size_t b) const
Returns the value of the bit b.
void shuffle(std::vector< T > &vec)
Definition: random.hpp:536
static size_t cpu_count()
void resize(size_t n)
explicit_ranking_sgd_solver(const std::shared_ptr< sgd::sgd_interface_base > &main_interface, const v2::ml_data &train_data, const std::map< std::string, flexible_type > &options)
static uint64_t hash64(const char *s, size_t len)
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
void clear()
Sets all bits to 0.
#define GL_HOT_INLINE_FLATTEN
std::pair< size_t, size_t > fill_x_buffer_with_users_items(std::vector< std::pair< std::vector< v2::ml_data_entry >, double > > &x_buffer, v2::ml_data_block_iterator &it, size_t n_items, dense_bitset &item_observed) const GL_HOT_INLINE_FLATTEN
size_t size() const
Returns the number of bits in this bitset.
const std::map< std::string, flexible_type > options
bool clear_bit_unsync(size_t b)
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364