Turi Create  4.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
factorization_model_impl.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_FACTORIZATION_GENERAL_LINEAR_MODEL_SECOND_ORDER_H_
7 #define TURI_FACTORIZATION_GENERAL_LINEAR_MODEL_SECOND_ORDER_H_
8 
9 #include <Eigen/Core>
10 #include <cmath>
11 #include <vector>
12 #include <string>
13 #include <map>
14 #include <algorithm>
15 #include <core/util/branch_hints.hpp>
16 #include <model_server/lib/variant_deep_serialize.hpp>
17 #include <core/data/flexible_type/flexible_type.hpp>
18 #include <toolkits/ml_data_2/ml_data.hpp>
19 #include <core/storage/serialization/serialization_includes.hpp>
20 #include <toolkits/ml_data_2/ml_data_iterators.hpp>
21 #include <toolkits/factorization/factorization_model.hpp>
22 #include <toolkits/factorization/factors_to_sframe.hpp>
23 #include <model_server/lib/extensions/model_base.hpp>
24 #include <core/util/fast_top_k.hpp>
25 
26 namespace turi { namespace factorization {
27 
28 /** The model factor mode. This enum determines the particular mode
29  * that the base class operates in.
30  */
31 enum class model_factor_mode {
32  factorization_machine,
33  matrix_factorization,
34  pure_linear_model};
35 
36 ////////////////////////////////////////////////////////////////////////////////
37 
38 template <model_factor_mode _factor_mode, flex_int _num_factors_if_known>
39 class factorization_model_impl final : public factorization_model {
40 public:
41 
42  ////////////////////////////////////////////////////////////////////////////////
43  // Set up the types and constants governing the behavior of the model.
44 
45  static constexpr model_factor_mode factor_mode = _factor_mode;
46  static constexpr flex_int num_factors_if_known = _num_factors_if_known;
47 
48  typedef Eigen::Matrix<float, 1, num_factors_if_known, Eigen::RowMajor> factor_type;
49  typedef Eigen::Matrix<float, Eigen::Dynamic, num_factors_if_known, Eigen::RowMajor> factor_matrix_type;
50  typedef Eigen::Matrix<float, Eigen::Dynamic, 1> vector_type;
51 
52  ////////////////////////////////////////////////////////////////////////////////
53  // Declare flags governing how the model works.
54 
55  size_t _num_factors = 0;
56  inline size_t num_factors() const {
57  if(_num_factors == 0)
58  DASSERT_TRUE(factor_mode == model_factor_mode::pure_linear_model);
59 
60  return _num_factors_if_known == Eigen::Dynamic ? _num_factors : _num_factors_if_known;
61  }
62 
63  size_t num_factor_dimensions = 0;
64 
65  bool enable_intercept_term = true;
66  bool enable_linear_features = true;
67  bool nmf_mode = false;
68 
69  ////////////////////////////////////////////////////////////////////////////////
70  // Declare model variables.
71 
72  volatile double w0 = NAN;
73  vector_type w;
74  factor_matrix_type V;
75 
76  ////////////////////////////////////////////////////////////////////////////////
77  // Declare variables for calculating things.
78 
79  size_t n_threads = 1;
80  size_t max_row_size = 0;
81 
82  struct calculate_fx_processing_buffer {
83  mutable factor_matrix_type XV;
84  mutable factor_type xv_accumulator;
85  };
86 
87  std::vector<calculate_fx_processing_buffer> buffers;
88 
89  /** Sets up the processing buffers. Called after internal_setup and
90  * after internal_load.
91  */
92  void setup_buffers() {
93 
94  if(num_factors() == 0) {
95  DASSERT_TRUE(factor_mode == model_factor_mode::pure_linear_model);
96  }
97 
98  // Set the number of threads
99  n_threads = thread::cpu_count();
100 
101  // Set up the intermediate computing buffers
102  buffers.resize(n_threads);
103  for(calculate_fx_processing_buffer& buffer : buffers) {
104  buffer.xv_accumulator.resize(num_factors());
105  buffer.XV.resize(max_row_size, num_factors());
106  }
107 
108  recommend_cache.resize(n_threads);
109  }
110 
111 
112 public:
113 
114  /// Clone the current model
115  std::shared_ptr<factorization_model> clone() const {
116  return std::shared_ptr<factorization_model>(new factorization_model_impl(*this));
117  }
118 
119  ////////////////////////////////////////////////////////////////////////////////
120  // Functions to initialize and calculate parts of the model.
121 
122  /** Set up some of the internal processing constants and buffers,
123  * etc.
124  */
125  void internal_setup(const v2::ml_data& train_data) {
126  // Set the number of factors; this is model dependent.
127  switch(factor_mode) {
128  case model_factor_mode::factorization_machine:
129  _num_factors = options.at("num_factors");
130  num_factor_dimensions = n_total_dimensions;
131  break;
132  case model_factor_mode::matrix_factorization:
133  _num_factors = options.at("num_factors");
134  num_factor_dimensions = index_sizes[0] + index_sizes[1];
135  break;
136  case model_factor_mode::pure_linear_model:
137  _num_factors = 0;
138  num_factor_dimensions = 0;
139  break;
140  }
141 
142  nmf_mode = options.at("nmf");
143  enable_linear_features = !nmf_mode;
144  enable_intercept_term = !nmf_mode;
145 
146  if(num_factors_if_known != Eigen::Dynamic)
147  DASSERT_EQ(num_factors_if_known, _num_factors);
148 
149  max_row_size = train_data.max_row_size();
150 
151  setup_buffers();
152 
153  w.resize(n_total_dimensions);
154  V.resize(num_factor_dimensions, num_factors());
155 
156  reset_state(options.at("random_seed"), 0.001);
157  }
158 
159  /** Initialize the model at a random starting point. Is
160  * deterministic based on the random seed and num_threads.
161  */
162  void reset_state(size_t random_seed, double sd) GL_HOT {
163 
164  // Normalize it -- otherwise, the factors could really blow this
165  // up.
166  const double V_sd = sd / (1 + std::sqrt(num_factors()));
167 
168  size_t num_factor_init_random = index_sizes[0] + index_sizes[1];
169  size_t num_factor_init_zero = num_factor_dimensions - num_factor_init_random;
170 
171  in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_FLATTEN) {
172  random::seed(hash64(random_seed, thread_idx, num_threads));
173 
174  // Compute the w part.
175  if(enable_linear_features) {
176 
177  size_t start_w_idx = (thread_idx * n_total_dimensions) / num_threads;
178  size_t end_w_idx = ((thread_idx + 1) * n_total_dimensions) / num_threads;
179 
180  for(size_t i = start_w_idx; i < end_w_idx; ++i)
181  w[i] = (sd > 0) ? random::fast_uniform<double>(-sd/2, sd/2) : 0;
182  } else {
183  w.setZero();
184  }
185 
186  // Compute the V part
187  {
188  size_t start_V_idx = (thread_idx * num_factor_init_random) / num_threads;
189  size_t end_V_idx = ((thread_idx + 1) * num_factor_init_random) / num_threads;
190 
191  for(size_t i = start_V_idx; i < end_V_idx; ++i) {
192  for(size_t j = 0; j < num_factors(); ++j) {
193 
194  double lb = nmf_mode ? 0 : -V_sd / 2;
195  double ub = nmf_mode ? V_sd : V_sd / 2;
196 
197  // Now, to promote diversity at the beginning, only have
198  // a handful of the factor terms on each particular
199  // factor vector be initialized to a larger value than
200  // the rest. On the rest, just downscale the std dev of
201  // the starting value by 1000 or so.
202  //
203  // Here, each latent factor starts off with about 8
204  // terms that are large and the rest small. On
205  // experiments with the Amazon dataset (350m
206  // observations, num_factors > 100), this gave good
207  // starting values and didn't diverge on reset.
208 
209  V(i, j) = (V_sd > 0) ? random::fast_uniform<double>(lb, ub) : 0;
210 
211  if(random::fast_uniform<size_t>(0, num_factors()) > std::min<size_t>(4ULL, num_factors() / 2))
212  V(i, j) /= 1000;
213  }
214  }
215  }
216 
217  // Compute the V part
218  if(num_factor_init_zero > 0) {
219  size_t start_V_idx = num_factor_init_random + (thread_idx * num_factor_init_zero) / num_threads;
220  size_t end_V_idx = num_factor_init_random + ((thread_idx + 1) * num_factor_init_zero) / num_threads;
221 
222  for(size_t i = start_V_idx; i < end_V_idx; ++i) {
223  for(size_t j = 0; j < num_factors(); ++j) {
224  V(i, j) = 0;
225  }
226  }
227  }
228 
229  });
230 
231  w0 = nmf_mode ? 0 : target_mean;
232  }
233 
234  /** Calculate the linear function value at the given point.
235  *
236  * x is the observation vector it the standard ml_data_entry format.
237  * Each entry of x is an ml_data_entry structure containing the
238  * column index, index, and value of each observation point. See
239  * ml_data_iterator for more information on it.
240  *
241  * In the context of the recommender system, x[0] is the info about
242  * the user and x[1] is the info about the item. x[0].index is the
243  * user's index, and x[1].index is the item's index. As for all
244  * categorical variables, the value is 1.
245  */
246  double calculate_fx(size_t thread_idx, const std::vector<v2::ml_data_entry>& x) const GL_HOT_FLATTEN {
247 
248  const size_t x_size = x.size();
249 
250  // Depending on the model, do the calculation in the most effecient way possible
251 
252  switch(factor_mode) {
253 
254  ////////////////////////////////////////////////////////////////////////////////
255  // Case 1: Factorization Machine
256 
257  case model_factor_mode::factorization_machine: {
258 
259  factor_matrix_type& XV = buffers[thread_idx].XV;
260 
261  DASSERT_GE(size_t(XV.rows()), x_size);
262 
263  factor_type& xv_accumulator = buffers[thread_idx].xv_accumulator;
264  xv_accumulator.setZero();
265 
266  double fx_value = w0;
267 
268  size_t idx = 0;
269 
270  for(size_t j = 0; j < x_size; ++j) {
271  const v2::ml_data_entry& v = x[j];
272 
273  // Check if this feature has been seen before; if not, the
274  // corresponding factors are assumed to be zero and to have no
275  // effect on any of the totals below; thus we just skip them.
276  if(__unlikely__(v.index >= index_sizes[v.column_index]))
277  continue;
278 
279  const size_t global_idx = index_offsets[v.column_index] + v.index;
280 
281  double value_shift, value_scale;
282  std::tie(value_shift, value_scale) = this->column_shift_scales[global_idx];
283 
284  double xv = value_scale * (x[j].value - value_shift);
285 
286  XV.row(idx) = xv * V.row(global_idx);
287  xv_accumulator += XV.row(idx);
288 
289  fx_value += xv * w[global_idx];
290 
291  ++idx;
292  }
293 
294  for(size_t j = 0; j < idx; ++j)
295  fx_value += 0.5*(xv_accumulator.dot(XV.row(j)) - XV.row(j).squaredNorm());
296 
297  return fx_value;
298  }
299 
300  ////////////////////////////////////////////////////////////////////////////////
301  // Case 2: Matrix Factorization
302 
303  case model_factor_mode::matrix_factorization: {
304 
305  factor_matrix_type& XV = buffers[thread_idx].XV;
306 
307  DASSERT_GE(size_t(XV.rows()), x_size);
308  DASSERT_EQ(size_t(XV.cols()), num_factors());
309 
310  ////////////////////////////////////////////////////////////////////////////////
311  //
312  // Step 1: Calculate the first two dimensions
313 
314  double fx_value = w0;
315 
316  for(size_t j : {0, 1}) {
317  const v2::ml_data_entry& v = x[j];
318 
319  // The column index is just going to be 0 or 1.
320  DASSERT_EQ(v.column_index, j);
321 
322  // Check if this feature has been seen before; if not, the
323  // corresponding factors are assumed to be zero and to have no
324  // effect on any of the totals below; thus we just skip them.
325  if(__unlikely__(v.index >= index_sizes[v.column_index])) {
326 
327  XV.row(j).setZero();
328 
329  } else {
330 
331  // Get the global index
332  const size_t global_idx = index_offsets[j] + v.index;
333 
334  // No column scaling on the first two dimensions under MF model.
335  DASSERT_EQ(v.value, 1);
336 
337  XV.row(j) = V.row(global_idx);
338 
339  // Add in the contribution
340  fx_value += w[global_idx];
341  }
342  }
343 
344  ////////////////////////////////////////////////////////////////////////////////
345  //
346  // Step 2: Pull in the contribution from the product terms.
347 
348  fx_value += XV.row(0).dot(XV.row(1));
349 
350  ////////////////////////////////////////////////////////////////////////////////
351  // Step 3: Calculate the dimensions past the first two. These
352  // can have anything in them.
353 
354  for(size_t j = 2; j < x_size; ++j) {
355  const v2::ml_data_entry& v = x[j];
356 
357  // Check if this feature has been seen before; if not, the
358  // corresponding factors are assumed to be zero and to have no
359  // effect on any of the totals below; thus we just skip them.
360  if(__unlikely__(v.index >= index_sizes[v.column_index]))
361  continue;
362 
363  // Get the global index
364  const size_t global_idx = index_offsets[v.column_index] + v.index;
365 
366  // Set the scaling on this column
367  double value_shift, value_scale;
368  std::tie(value_shift, value_scale) = column_shift_scales[global_idx];
369 
370  double xv = value_scale * (v.value - value_shift);
371 
372  fx_value += xv * w[global_idx];
373  }
374 
375  return fx_value;
376  }
377 
378  ////////////////////////////////////////////////////////////////////////////////
379  // Case 3: Linear Model
380 
381  case model_factor_mode::pure_linear_model: {
382 
383  double fx_value = w0;
384 
385  for(size_t j = 0; j < x_size; ++j) {
386  const v2::ml_data_entry& v = x[j];
387 
388  // Check if this feature has been seen before; if not, the
389  // corresponding factors are assumed to be zero and to have no
390  // effect on any of the totals below; thus we just skip them.
391  if(__unlikely__(v.index >= index_sizes[v.column_index]))
392  continue;
393 
394  // Get the global index
395  const size_t global_idx = index_offsets[v.column_index] + v.index;
396 
397  // Set the scaling on this column
398  double value_shift, value_scale;
399  std::tie(value_shift, value_scale) = column_shift_scales[global_idx];
400 
401  double xv = value_scale * (v.value - value_shift);
402 
403  fx_value += xv * w[global_idx];
404  }
405 
406  return fx_value;
407  }
408 
409  default:
410  return 0;
411  }
412  }
413 
414  /** Calculate the linear function value at the given point.
415  *
416  * This is an overload of the above function that does not require
417  * the thread_idx parameter.
418  */
419  double calculate_fx(const std::vector<v2::ml_data_entry>& x) const GL_HOT_FLATTEN {
420  size_t thread_idx = thread::thread_id();
421 
422  ASSERT_MSG(thread_idx < buffers.size(),
423  "Threading set up in nonstandard way; thread_id() larger than cpu_count().");
424 
425  return calculate_fx(thread_idx, x);
426  }
427 
428  ////////////////////////////////////////////////////////////////////////////////
429  // Saving and loading of the model.
430 
431  void get_item_similarity_scores(
432  size_t item, std::vector<std::pair<size_t, double> >& sim_scores) const {
433 
434  switch(factor_mode) {
435  case model_factor_mode::factorization_machine:
436  case model_factor_mode::matrix_factorization:
437  {
438  // Just go through calculating the cosine metric
439  if(item >= index_sizes[1]) {
440  for(auto& p : sim_scores) {
441  p.second = 0;
442  }
443  return;
444  }
445 
446  auto base_row = V.row(index_offsets[1] + item);
447 
448  float it_r = base_row.squaredNorm();
449 
450  for(auto& p : sim_scores) {
451  if(p.first >= index_sizes[1]) {
452  p.second = 0;
453  continue;
454  }
455 
456  size_t idx = index_offsets[1] + p.first;
457  auto item_row = V.row(idx);
458  p.second = item_row.dot(base_row) / std::sqrt(it_r * item_row.squaredNorm());
459  }
460 
461  break;
462  }
463  case model_factor_mode::pure_linear_model:
464  {
465  // Do nothing here.
466  return;
467  }
468  }
469  }
470 
471  ////////////////////////////////////////////////////////////////////////////////
472  // Saving and loading of the model.
473 
474  std::map<std::string, variant_type> get_serialization_parameters() const {
475 
476  std::map<std::string, variant_type> save_parameters;
477 
478  std::string factor_mode_str;
479 
480  switch(factor_mode) {
481  case model_factor_mode::factorization_machine:
482  factor_mode_str = "factorization_machine";
483  break;
484  case model_factor_mode::matrix_factorization:
485  factor_mode_str = "matrix_factorization";
486  break;
487  case model_factor_mode::pure_linear_model:
488  factor_mode_str = "pure_linear_model";
489  break;
490  }
491 
492  save_parameters["factor_mode"] = to_variant(factor_mode_str);
493 
494  size_t __num_factors_if_known = num_factors_if_known;
495 
496  save_parameters["num_factors_if_known"] = to_variant(__num_factors_if_known);
497 
498  return save_parameters;
499  }
500 
501 
502  ////////////////////////////////////////////////////////////////////////////////
503  // Saving and loading of the model.
504 
505  size_t get_version() const { return 1; }
506 
507  /** Save routine.
508  */
509  void save_impl(turi::oarchive& oarc) const {
510 
511  std::map<std::string, variant_type> terms;
512 
513  // Dump the model parameters.
514  terms["_num_factors"] = to_variant(num_factors());
515  terms["num_factor_dimensions"] = to_variant(num_factor_dimensions);
516  terms["enable_intercept_term"] = to_variant(enable_intercept_term);
517  terms["enable_linear_features"] = to_variant(enable_linear_features);
518  terms["nmf_mode"] = to_variant(nmf_mode);
519  terms["max_row_size"] = to_variant(max_row_size);
520 
521  variant_deep_save(to_variant(terms), oarc);
522 
523  // Now dump out the other things.
524  double _w0 = w0;
525  oarc << _w0 << w << V;
526  }
527 
528  /** Load routine.
529  */
530  void load_version(turi::iarchive& iarc, size_t version) {
531  DASSERT_EQ(version, 1);
532 
533  variant_type terms_v;
534 
535  variant_deep_load(terms_v, iarc);
536 
537  auto terms = variant_get_value<std::map<std::string, variant_type> >(terms_v);
538 
539 #define __EXTRACT(varname) \
540  varname = variant_get_value<decltype(varname)>(terms.at(#varname));
541 
542  __EXTRACT(_num_factors);
543  __EXTRACT(num_factor_dimensions);
544  __EXTRACT(enable_intercept_term);
545  __EXTRACT(enable_linear_features);
546  __EXTRACT(nmf_mode);
547  __EXTRACT(max_row_size);
548 
549 #undef __EXTRACT
550 
551  // Now dump out the other things.
552  double _w0;
553  iarc >> _w0 >> w >> V;
554  w0 = _w0;
555 
556  setup_buffers();
557  }
558 
559  ////////////////////////////////////////////////////////////////////////////////
560  // Retrieve the coefficients of the model.
561 
562  std::map<std::string, variant_type> get_coefficients() const {
563 
564  std::map<std::string, variant_type> ret;
565 
566  ////////////////////////////////////////
567  // Add in the intercept
568 
569  if(enable_intercept_term)
570  ret["intercept"] = double(w0);
571 
572  ////////////////////////////////////////
573  // Add in the user and item terms
574  {
575  bool include_V_term = true;
576  bool include_w_term = enable_linear_features;
577 
578  switch(factor_mode) {
579  case model_factor_mode::factorization_machine:
580  case model_factor_mode::matrix_factorization:
581  include_V_term = true;
582  break;
583  case model_factor_mode::pure_linear_model:
584  include_V_term = false;
585  break;
586  }
587 
588  for(size_t col_idx : {0, 1} ) {
589 
590  std::string k = metadata->column_name(col_idx);
591 
592  sframe res = fill_linear_model_sframe_from_eigen_data(
593  metadata,
594  col_idx,
595 
596  index_sizes[col_idx],
597 
598  include_w_term,
599  index_offsets[col_idx],
600  "linear_terms",
601  w,
602 
603  include_V_term,
604  index_offsets[col_idx],
605  "factors",
606  V);
607 
608  std::shared_ptr<unity_sframe> lt_sf(new unity_sframe);
609  lt_sf->construct_from_sframe(res);
610 
611  ret[k] = to_variant(lt_sf);
612  }
613  }
614 
615  ////////////////////////////////////////////////////////////////////////////////
616 
617  // Now, do the same thing for the remaining side columns, but
618  // include them as one sframe with all the "indices" given as
619  // strings.
620  {
621  std::vector<sframe> additional_columns;
622  bool include_V_term = true;
623  bool include_w_term = enable_linear_features;
624 
625  switch(factor_mode) {
626  case model_factor_mode::factorization_machine:
627  include_V_term = true;
628  break;
629  case model_factor_mode::pure_linear_model:
630  case model_factor_mode::matrix_factorization:
631  include_V_term = false;
632  break;
633  }
634 
635  for(size_t col_idx = 2; col_idx < metadata->num_columns(); ++col_idx) {
636 
637  std::string k = metadata->column_name(col_idx);
638 
639  sframe res = fill_linear_model_sframe_from_eigen_data(
640  metadata,
641  col_idx,
642 
643  index_sizes[col_idx],
644 
645  include_w_term,
646  index_offsets[col_idx],
647  "linear_terms",
648  w,
649 
650  include_V_term,
651  index_offsets[col_idx],
652  "factors",
653  V);
654 
655  // Change the column type of one of them
656  {
657 
658  std::shared_ptr<sarray<flexible_type> > new_x(new sarray<flexible_type>);
659 
660  new_x->open_for_write();
661  new_x->set_type(flex_type_enum::STRING);
662  auto it_out = new_x->get_output_iterator(0);
663 
664  std::shared_ptr<sarray<flexible_type> > old_x = res.select_column(k);
665  auto reader = old_x->get_reader();
666  size_t num_segments = old_x->num_segments();
667 
668  for(size_t sidx = 0; sidx < num_segments; ++sidx) {
669  auto src_it = reader->begin(sidx);
670  auto src_it_end = reader->end(sidx);
671 
672  for(; src_it != src_it_end; ++src_it, ++it_out)
673  *it_out = flex_string(*src_it);
674  }
675 
676  new_x->close();
677 
678  res = res.remove_column(res.column_index(k));
679  res = res.add_column(new_x, "index");
680 
681  std::shared_ptr<sarray<flexible_type> > name_column(
682  new sarray<flexible_type>(
683  flexible_type(k), res.num_rows()));
684 
685  res = res.add_column(name_column, "feature");
686  }
687 
688  additional_columns.push_back(std::move(res));
689  }
690 
691  ////////////////////////////////////////////////////////////////////////////////
692  // Now normalize these things
693 
694  if(!additional_columns.empty()) {
695  sframe all_res = additional_columns[0];
696 
697  for(size_t i = 1; i < additional_columns.size(); ++i)
698  all_res = all_res.append(additional_columns[i]);
699 
700  std::shared_ptr<unity_sframe> lt_sf(new unity_sframe);
701 
702  std::vector<std::string> names = {"feature", "index"};
703 
704  if(include_w_term)
705  names.push_back("linear_terms");
706 
707  if(include_V_term)
708  names.push_back("factors");
709 
710  lt_sf->construct_from_sframe(all_res.select_columns(names));
711 
712  ret["side_data"] = to_variant(lt_sf);
713  }
714  }
715 
716  return ret;
717  }
718 
719 
720  /** Scores all the items in scores, updating the score. Used by the
721  * recommender system.
722  */
723  void score_all_items(
724  std::vector<std::pair<size_t, double> >& scores,
725  const std::vector<v2::ml_data_entry>& query_row,
726  size_t top_k,
727  const std::shared_ptr<v2::ml_data_side_features>& known_side_features) const {
728 
729  DASSERT_GE(query_row.size(), 2);
730  DASSERT_EQ(query_row[USER_COLUMN_INDEX].column_index, USER_COLUMN_INDEX);
731  DASSERT_EQ(query_row[ITEM_COLUMN_INDEX].column_index, ITEM_COLUMN_INDEX);
732 
733  bool has_side_features = (known_side_features != nullptr);
734  bool has_additional_columns = has_side_features || (query_row.size() > 2);
735 
736  size_t user = query_row[USER_COLUMN_INDEX].index;
737 
738  // Direct it to the appropriate function
739  if(factor_mode == model_factor_mode::matrix_factorization
740  && !has_additional_columns
741  && user < index_sizes[USER_COLUMN_INDEX]) {
742 
743  _score_all_items_simple_mf(scores, user, top_k);
744 
745  } else {
746  std::vector<v2::ml_data_entry> x = query_row;
747 
748  if(has_side_features) {
749  _score_all_items_general_purpose<true>(scores, std::move(x), top_k, known_side_features);
750  } else {
751  _score_all_items_general_purpose<false>(scores, std::move(x), top_k, nullptr);
752  }
753  }
754  }
755 
756  // A cache of the vector values to avoid memory reallocations.
757  mutable std::vector<vector_type> recommend_cache;
758 
759  /** Scoring things when it's the simple matrix factorization case.
760  * Here, we use a matrix vector product for speed.
761  *
762  */
763  void _score_all_items_simple_mf(
764  std::vector<std::pair<size_t, double> >& scores,
765  size_t user,
766  size_t top_k) const GL_HOT {
767 
768 
769  size_t items_offset = index_offsets[ITEM_COLUMN_INDEX];
770  size_t num_items = index_sizes[ITEM_COLUMN_INDEX];
771 
772  size_t thread_idx = thread::thread_id();
773 
774  vector_type& cached_user_item_product = recommend_cache[thread_idx];
775 
776  cached_user_item_product.noalias() =
777  V.middleRows(items_offset, num_items) * V.row(user).transpose()
778  + w.segment(items_offset, num_items);
779 
780  size_t user_global_index = index_offsets[USER_COLUMN_INDEX] + user;
781 
782  DASSERT_LT(user_global_index, w.size());
783 
784  double adjustment = (w0 + w[user_global_index]);
785 
786  // The general purpose one.
787  for(size_t i = 0; i < scores.size(); ++i) {
788 
789  size_t item = scores[i].first;
790 
791  // Add in any side information concerning the item to the
792  // observation vector.
793 
794  double raw_score = (item < index_sizes[ITEM_COLUMN_INDEX]
795  ? cached_user_item_product[item]
796  : 0);
797 
798  scores[i].second = adjustment + raw_score;
799  }
800 
801  auto p_less_than = [](const std::pair<size_t, double>& p1,
802  const std::pair<size_t, double>& p2) {
803  return p1.second < p2.second;
804  };
805 
806  // Now, pull off the top k elements for these.
807  extract_and_sort_top_k(scores, top_k, p_less_than);
808 
809  // Now, if the loss function translates the raw scores, we need to work with that.
810  if(loss_model->prediction_is_translated()) {
811  for(size_t i = 0; i < scores.size(); ++i) {
812  scores[i].second = loss_model->translate_fx_to_prediction(scores[i].second);
813  }
814  }
815  }
816 
817  /** Run the recommendations when the routine uses something more
818  * than just the straight matrix factorization. In this case, we
819  * call the calculate_fx function to get the scores, then get the
820  * top_k out, then translate just those to the correct scores.
821  */
822  template <bool has_side_features>
824  void _score_all_items_general_purpose(
825  std::vector<std::pair<size_t, double> >& scores,
826  std::vector<v2::ml_data_entry>&& x,
827  size_t top_k,
828  const std::shared_ptr<v2::ml_data_side_features>& known_side_features) const {
829 
830  size_t thread_idx = thread::thread_id();
831 
832  // Remember the size of this vector for the rest of the rounds;
833  // resizing x to this size erases any of the item data present.
834 
835  const size_t x_base_size = x.size();
836 
837  // The general purpose one.
838  for(size_t i = 0; i < scores.size(); ++i) {
839 
840  size_t item = scores[i].first;
841 
842  if(has_side_features)
843  x.resize(x_base_size);
844 
845  x[ITEM_COLUMN_INDEX].index = item;
846 
847  // Add in any side information concerning the item to the
848  // observation vector.
849 
850  if(has_side_features)
851  known_side_features->add_partial_side_features_to_row(x, ITEM_COLUMN_INDEX, item);
852 
853  // Get the raw score.
854  double raw_score = calculate_fx(thread_idx, x);
855 
856  // Possibly add it to the score.
857  scores[i] = {item, raw_score};
858  }
859 
860  // Now, pull off the top k elements for these.
861  auto p_less_than_2 = [](const std::pair<size_t, double>& p1,
862  const std::pair<size_t, double>& p2) {
863  return p1.second < p2.second;
864  };
865 
866  // Now, pull off the top k elements for these.
867  extract_and_sort_top_k(scores, top_k, p_less_than_2);
868 
869  // Now, if the loss function translates the raw scores, we need to work with that.
870  if(loss_model->prediction_is_translated()) {
871  for(size_t i = 0; i < scores.size(); ++i) {
872  scores[i].second = loss_model->translate_fx_to_prediction(scores[i].second);
873  }
874  }
875  }
876 
877 
878  mutable mutex factor_norm_lock;
879  mutable bool factor_norms_computed = false;
880  mutable vector_type factor_norms;
881 
882  /** Computes the cosine similarity between a particular factor
883  * within a column and all the other factors within that column.
884  */
885  void calculate_intracolumn_similarity( vector_type& dest, size_t column_index, size_t ref_index) const {
886 
887  dest.resize(index_sizes[column_index]);
888 
889  if(V.rows() == 0) {
890  dest.setZero();
891  return;
892  }
893 
894  // Get the factor norms if we need them.
895  if(!factor_norms_computed) {
896  std::lock_guard<mutex> lg(factor_norm_lock);
897 
898  if(!factor_norms_computed) {
899  factor_norms.noalias() = V.rowwise().norm();
900  factor_norms_computed = true;
901  }
902  }
903 
904  size_t start_idx = index_offsets[column_index];
905  size_t block_size = index_sizes[column_index];
906 
907  dest.noalias() = V.middleRows(start_idx, block_size) * V.row(start_idx + ref_index).transpose();
908  dest.array() /= (V.row(start_idx + ref_index).norm() * factor_norms.segment(start_idx, block_size).array());
909  }
910 
911 };
912 
913 }}
914 
915 #endif
#define GL_HOT_FLATTEN
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
void extract_and_sort_top_k(std::vector< T > &v, size_t top_k, LessThan less_than)
Definition: fast_top_k.hpp:87
void variant_deep_load(variant_type &v, iarchive &iarc)
void variant_deep_save(const variant_type &v, oarchive &oarc)
#define __unlikely__(x)
boost::make_recursive_variant< flexible_type, std::shared_ptr< unity_sgraph_base >, dataframe_t, std::shared_ptr< model_base >, std::shared_ptr< unity_sframe_base >, std::shared_ptr< unity_sarray_base >, std::map< std::string, boost::recursive_variant_ >, std::vector< boost::recursive_variant_ >, boost::recursive_wrapper< function_closure_info > >::type variant_type
Definition: variant.hpp:24
static size_t cpu_count()
static uint64_t hash64(const char *s, size_t len)
std::string flex_string
variant_type to_variant(const T &f)
Definition: variant.hpp:308
static size_t thread_id()
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364