Turi Create  4.0
als.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_ALS_H_
7 #define TURI_ALS_H_
8 
9 #include <Eigen/Cholesky>
10 // ML-Data & options manager
11 #include <toolkits/ml_data_2/ml_data.hpp>
12 #include <model_server/lib/extensions/option_manager.hpp>
13 #include <core/logging/table_printer/table_printer.hpp>
14 
15 // Factorization model impl
16 #include <toolkits/factorization/factorization_model_impl.hpp>
17 #include <algorithm>
18 
19 
20 // TODO: List of todo's for this file
21 //------------------------------------------------------------------------------
22 
23 namespace turi {
24 namespace recsys {
25 namespace als {
26 
27 // Typedefs
28 typedef factorization::factorization_model_impl
29  <factorization::model_factor_mode::matrix_factorization, Eigen::Dynamic>
30  model_type;
31 typedef Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic,
32  Eigen::RowMajor> DenseMatrix;
33 typedef Eigen::Matrix<float, Eigen::Dynamic, 1> DenseVector;
34 
35 
36 /**
37 * Make sure that the two meta-data's have the same mappings.
38 * --------------------------------------------------------------------------
39 * user_mapping creates a mapping from the user-id of the second metadata
40 * to the user-id of the first metadata. Likewise, the item-mapping does
41 * the same.
42 *
43 * This is required because there are 2 ml_data objects needed for this method.
44 * The first is sorted by user, while the second is sorted by item. However,
45 * they do not share the same metadata. We need to make sure that the index
46 * for each user and item are the same when iterating over both the
47 * ml_datas.
48 *
49 * The master metadata is the one saved in ml_data_sorted_by_user, the
50 * item_mapping and user_mapping map the indices from ml_data_sorted_by_index
51 * to the ml_data_sorted_by_user
52 *
53 *
54 * \param[in] training_data_by_user ml-data sorted by user
55 * \param[in] training_data_by_item ml-data sorted by item
56 * \param[out] item_mapping Index in item metadata -> User index
57 * \param[out] user_mapping Index in user metadata -> Item index
58 *
59 */
60 void get_common_user_item_local_index_mapping(
61  const v2::ml_data& training_data_by_user,
62  const v2::ml_data& training_data_by_item,
63  std::vector<size_t>& user_mapping,
64  std::vector<size_t>& item_mapping) {
65 
66  size_t num_users = training_data_by_user.metadata()->column_size(0);
67  size_t num_items = training_data_by_user.metadata()->column_size(1);
68  std::shared_ptr<v2::ml_data_internal::column_indexer> user_index_sorted_by_user =
69  training_data_by_user.metadata()->indexer(0);
70  std::shared_ptr<v2::ml_data_internal::column_indexer> item_index_sorted_by_user =
71  training_data_by_user.metadata()->indexer(1);
72  std::shared_ptr<v2::ml_data_internal::column_indexer> user_index_sorted_by_item =
73  training_data_by_item.metadata()->indexer(1);
74  std::shared_ptr<v2::ml_data_internal::column_indexer> item_index_sorted_by_item =
75  training_data_by_item.metadata()->indexer(0);
76 
77  user_mapping.resize(num_users);
78  item_mapping.resize(num_items);
79  for(size_t u = 0; u < num_users; u++){
80  user_mapping[u] = user_index_sorted_by_user->immutable_map_value_to_index(
81  user_index_sorted_by_item->map_index_to_value(u));
82  }
83  for(size_t i = 0; i < num_items; i++){
84  item_mapping[i] = item_index_sorted_by_user->immutable_map_value_to_index(
85  item_index_sorted_by_item->map_index_to_value(i));
86  }
87 
88 }
89 
90 /**
91  *
92  * Solve a recommender problem with ALS.
93  *
94  * \param[in] training_data_by_user ml-data sorted by user
95  * \param[in] training_data_by_item ml-data sorted by item
96  * \param[in] options List of options provided by the user
97  *
98  *
99  * Python Pseudo code for this method is:
100  * ---------------------------------------------------------------------------
101  * lambda_ = 10
102  * n_factors = 8
103  * m, n = Q.shape
104  * n_iterations = 20
105  *
106  * Q = rating # For only those users and items that had rating
107  * W = Q>0.5
108  * W[W == True] = 1
109  * W[W == False] = 0
110  *
111  * X = 5 * np.random.rand(m, n_factors)
112  * Y = 5 * np.random.rand(n_factors, n)
113  *
114  * def get_error(Q, X, Y):
115  * return np.sum((Q - np.dot(X, Y))**2)
116  *
117  * weighted_errors = []
118  * for ii in range(n_iterations):
119  *
120  * X = np.linalg.solve(np.dot(Y, Y.T) + lambda_ * np.eye(n_factors),
121  * np.dot(Y, Q.T)).T
122  * Y = np.linalg.solve(np.dot(X.T, X) + lambda_ * np.eye(n_factors),
123  * np.dot(X.T, Q))
124  *
125 */
126 inline std::shared_ptr<factorization::factorization_model> als(
127  const v2::ml_data& training_data_by_user,
128  const v2::ml_data& training_data_by_item,
129  const std::map<std::string, flexible_type>& options){
130 
131 
132 
133  // Setup the model
134  std::shared_ptr<model_type> model(new model_type);
135  model->setup("squared_error", training_data_by_user, options);
136 
137  // Problem definition
138  size_t num_users = training_data_by_user.metadata()->column_size(0);
139  size_t num_factors = model->num_factors();
140  size_t num_ratings = training_data_by_user.num_rows();
141  double lambda = std::max<double>(1e-6,
142  num_ratings * (double) options.at("regularization"));
143  size_t max_iters = (size_t) options.at("max_iterations");
144  size_t seed = (size_t) options.at("random_seed");
145  double init_rand_sigma = (double) options.at("init_random_sigma");
146 
147  // Make sure that the two meta-data's have the same mappings.
148  std::vector<size_t> user_mapping;
149  std::vector<size_t> item_mapping;
150  get_common_user_item_local_index_mapping(training_data_by_user, training_data_by_item,
151  user_mapping, item_mapping);
152 
153  // Global variables needed
154  DenseMatrix eye = DenseMatrix::Identity(num_factors, num_factors);
155  double rmse, best_rmse = 1e20;
156 
157  // Setup the table printer
158  table_printer table({
159  {"Iter.", 7},
160  {"Elapsed time", 12},
161  {"RMSE", 22}});
162  table.print_header();
163  table.print_row("Initial", progress_time(), "NA");
164  table.print_line_break();
165 
166  // Init the model
167  model->reset_state(seed, init_rand_sigma);
168  model->w.setZero();
169 
170  double reset_fraction = 1;
171  double reset_fraction_reduction_rate = 1e-2;
172 
173  // Each iteration of ALS
174  // --------------------------------------------------------------------------
175  size_t iter = 0;
176  for (iter = 0; iter < max_iters; iter++){
177 
178  // Step 1: User step
179  // ------------------------------------------------------------------------
180  in_parallel([&](size_t thread_idx, size_t num_threads) {
181  DenseMatrix A(num_factors, num_factors);
182  DenseVector b(num_factors);
183  std::vector<v2::ml_data_entry> x;
184  size_t user_id, item_id = 0;
185  double rating = 0;
186  A = lambda * eye;
187  b.setZero();
188  for(auto it =
189  training_data_by_user.get_block_iterator(thread_idx, num_threads);
190  !it.done();) {
191 
192  it.fill_observation(x);
193  user_id = x[0].index;
194  item_id = num_users + x[1].index;
195  rating = it.target_value() - model->w0;
196 
197  A += model->V.row(item_id).transpose() * model->V.row(item_id);
198  b += rating * model->V.row(item_id);
199  ++it;
200 
201  // Solve the system
202  if(it.is_start_of_new_block() || it.done()){
203  model->V.row(user_id) = (A.ldlt().solve(b)).transpose();
204  if (it.done()) break;
205 
206  // Reset for the next user
207  A = lambda * eye;
208  b.setZero();
209  }
210  }});
211 
212  // Step 2: Item Step
213  // ------------------------------------------------------------------------
214  // Compute Yt C Y using equation (4) of [1]
215  in_parallel([&](size_t thread_idx, size_t num_threads) {
216  DenseMatrix A(num_factors, num_factors);
217  DenseVector b(num_factors);
218  std::vector<v2::ml_data_entry> x;
219  size_t user_id, item_id = 0;
220  double rating = 0.0;
221  A = lambda * eye;
222  b.setZero();
223 
224  for(auto it =
225  training_data_by_item.get_block_iterator(thread_idx, num_threads);
226  !it.done();) {
227 
228  // Update equations
229  it.fill_observation(x);
230  user_id = user_mapping[x[1].index];
231  item_id = num_users + item_mapping[x[0].index];
232  rating = it.target_value() - model->w0;
233 
234  A += model->V.row(user_id).transpose() * model->V.row(user_id);
235  b += rating * model->V.row(user_id);
236  ++it;
237 
238  // Solve the system
239  if(it.is_start_of_new_block() || it.done()){
240  model->V.row(item_id) =
241  (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
242  if (it.done()) break;
243 
244  // Reset for the next item
245  A = lambda * eye;
246  b.setZero();
247  }
248  }});
249 
250  // Step 3: Calculate RMSE
251  // ------------------------------------------------------------------------
252  if (best_rmse < rmse) best_rmse = rmse;
253  rmse = model->loss_model->reported_loss_value(
254  model->calculate_loss(training_data_by_user));
255  table.print_row(iter, progress_time(), rmse);
256 
257  // Step 4: Termination checking
258  // ------------------------------------------------------------------------
259  // The iteration count is still ticking, so the reseted model does not get
260  // the full max_iterations. This ensures that the model does not reset
261  // infinitely.
262  if (rmse > 10 * best_rmse || std::isnan(rmse) || std::isinf(rmse)) {
263  logprogress_stream << "Resetting model." << std::endl;
264  reset_fraction *= reset_fraction_reduction_rate;
265  model->reset_state(rand(), reset_fraction);
266  model->w.setZero();
267 
268  continue;
269  }
270  }
271 
272  std::string termination_criterion = "";
273  table.print_row("FINAL", progress_time(), rmse);
274  table.print_footer();
275  if (iter == max_iters){
276  termination_criterion = "Optimization Complete: Iteration limit reached.";
277  }
278  logprogress_stream << termination_criterion << std::endl;
279 
280  // Return the training stats
281  std::map<std::string, variant_type> training_stats;
282  training_stats["training_time"] = progress_time().elapsed_seconds;
283  training_stats["num_iterations"] = to_variant(iter);
284  training_stats["final_objective_value"] = to_variant(rmse);
285  model->_training_stats = training_stats;
286  return model;
287 }
288 
289 
290 
291 /**
292  *
293  * Solve a recommender problem with Implcit ALS [1]
294  *
295  * \param[in] training_data_by_user ml-data sorted by user
296  * \param[in] training_data_by_item ml-data sorted by item
297  * \param[in] options List of options provided by the user
298  *
299  * References:
300  *
301  * [1] Collaborative Filtering for Implicit Feedback Datasets (Yifan Hu et. al)
302  *
303  *
304  * Python Pseudo code for this method is:
305  * ---------------------------------------------------------------------------
306  * lambda_ = 10
307  * n_factors = 8
308  * m, n = Q.shape
309  * n_iterations = 20
310  *
311  * X = 5 * np.random.rand(m, n_factors)
312  * Y = 5 * np.random.rand(n_factors, n)
313  *
314  * def get_error(Q, X, Y, W):
315  * return np.sum((W * (Q - np.dot(X, Y)))**2)
316  *
317  * weighted_errors = []
318  * for ii in range(n_iterations):
319  * for u, Wu in enumerate(W):
320  * X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), Y.T)) +
321  * lambda_ * np.eye(n_factors),
322  * np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T
323  * for i, Wi in enumerate(W.T):
324  * Y[:,i] = np.linalg.solve(np.dot(X.T, np.dot(np.diag(Wi), X)) +
325  * lambda_ * np.eye(n_factors),
326  * np.dot(X.T, np.dot(np.diag(Wi), Q[:, i])))
327  *
328 */
329 inline std::shared_ptr<factorization::factorization_model> implicit_als(
330  const v2::ml_data& training_data_by_user,
331  const v2::ml_data& training_data_by_item,
332  const std::map<std::string, flexible_type>& options){
333 
334 
335  // Setup the model
336  std::shared_ptr<model_type> model(new model_type);
337  model->setup("squared_error", training_data_by_user, options);
338 
339  // Problem definition
340  size_t num_users = training_data_by_user.metadata()->column_size(0);
341  size_t num_items = training_data_by_user.metadata()->column_size(1);
342  size_t num_factors = model->num_factors();
343  size_t num_ratings = training_data_by_user.num_rows();
344  double lambda = std::max<double>(1e-6,
345  num_ratings * (double) options.at("regularization"));
346  size_t max_iters = (size_t) options.at("max_iterations");
347  size_t seed = (size_t) options.at("random_seed");
348  double init_rand_sigma = (double) options.at("init_random_sigma");
349 
350  // Make sure that the two meta-data's have the same mappings.
351  std::vector<size_t> user_mapping;
352  std::vector<size_t> item_mapping;
353  get_common_user_item_local_index_mapping(training_data_by_user, training_data_by_item,
354  user_mapping, item_mapping);
355 
356 
357  // Global variables needed
358  DenseMatrix eye = DenseMatrix::Identity(num_factors, num_factors);
359  double rmse, best_rmse = 1e20;
360  std::vector<double>rmse_per_thread
361  (turi::thread_pool::get_instance().size(), 0.0);
362 
363  // Setup the table printer
364  table_printer table({
365  {"Iter.", 7},
366  {"Elapsed time", 12},
367  {"Estimated Objective Value", 22}});
368  table.print_header();
369  table.print_row("Initial", progress_time(), "NA");
370  table.print_line_break();
371 
372  // Init the model
373  model->reset_state(seed, init_rand_sigma);
374  model->w.setZero();
375  model->w0 = 0;
376 
377  // Two random constants used in the ials paper [1]
378  double alpha = options.at("ials_confidence_scaling_factor");
379  size_t is_log_scaling = (options.at("ials_confidence_scaling_type") == "log");
380  double eps = 1e-8;
381  double reset_fraction = 1;
382  double reset_fraction_reduction_rate = 1e-2;
383 
384  // Each iteration of ALS
385  // --------------------------------------------------------------------------
386  size_t iter = 0;
387  for (iter = 0; iter < max_iters; iter++){
388 
389  // Calcuate the common base matrix to use.
390  DenseMatrix A_cached(num_factors, num_factors);
391  A_cached.triangularView<Eigen::Upper>() = lambda * eye +
392  model->V.bottomRows(num_items).transpose() * model->V.bottomRows(num_items);
393 
394  // Step 1: User step
395  // ------------------------------------------------------------------------
396  in_parallel([&](size_t thread_idx, size_t num_threads) {
397  DenseMatrix A(num_factors, num_factors);
398  DenseVector b(num_factors);
399 
400  std::vector<v2::ml_data_entry> x;
401  double scaling = 0.0;
402  size_t user_id, item_id = 0;
403  A.triangularView<Eigen::Upper>() = A_cached;
404 
405  b.setZero();
406 
407  for(auto it =
408  training_data_by_user.get_block_iterator(thread_idx, num_threads);
409  !it.done();) {
410  it.fill_observation(x);
411  user_id = x[0].index;
412  item_id = num_users + x[1].index;
413  if (is_log_scaling) {
414  scaling = alpha * log(1 + it.target_value() / eps);
415  } else {
416  scaling = alpha * it.target_value();
417  }
418  A.triangularView<Eigen::Upper>() += scaling *
419  model->V.row(item_id).transpose() * model->V.row(item_id);
420  b += (1 + scaling) * model->V.row(item_id);
421  ++it;
422 
423  // Update user factor
424  if(it.is_start_of_new_block() || it.done()){
425  model->V.row(user_id) =
426  (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
427  if (it.done()) break;
428 
429  // Reset for the next user
430  A.triangularView<Eigen::Upper>() = A_cached;
431  b.setZero();
432  }
433  }});
434 
435  // Step 2: Item Step
436  // ------------------------------------------------------------------------
437  // Compute Yt C Y using equation (4) of [1]
438  in_parallel([&](size_t thread_idx, size_t num_threads) {
439  DenseMatrix A(num_factors, num_factors);
440  DenseMatrix A_cached(num_factors, num_factors);
441  DenseVector b(num_factors);
442 
443  std::vector<v2::ml_data_entry> x;
444  size_t user_id, item_id = 0;
445  double scaling = 0.0;
446  A_cached.triangularView<Eigen::Upper>() = lambda * eye +
447  model->V.topRows(num_users).transpose() * model->V.topRows(num_users);
448  A.triangularView<Eigen::Upper>() = A_cached;
449  b.setZero();
450 
451  for(auto it =
452  training_data_by_item.get_block_iterator(thread_idx, num_threads);
453  !it.done();) {
454  // Update equations
455  it.fill_observation(x);
456  user_id = user_mapping[x[1].index];
457  item_id = num_users + item_mapping[x[0].index];
458  if (is_log_scaling) {
459  scaling = alpha * log(1 + std::max(it.target_value(), 0.0) / eps);
460  } else {
461  scaling = alpha * std::max(it.target_value(), 0.0);
462  }
463  A.triangularView<Eigen::Upper>() += scaling *
464  model->V.row(user_id).transpose() * model->V.row(user_id);
465  b += (1 + scaling) * model->V.row(user_id);
466  ++it;
467 
468  // Solve the system
469  if(it.is_start_of_new_block() || it.done()){
470  model->V.row(item_id) =
471  (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
472  if (it.done()) break;
473 
474  // Reset for the next item
475  A.triangularView<Eigen::Upper>() = A_cached;
476  b.setZero();
477  }
478  }});
479 
480  // Step 3: Calculate RMSE
481  // ------------------------------------------------------------------------
482  if (best_rmse < rmse) best_rmse = rmse;
483  rmse = model->loss_model->reported_loss_value(
484  model->calculate_loss(training_data_by_user));
485  table.print_row(iter, progress_time(), rmse);
486 
487  // Step 4: Termination checking
488  // ------------------------------------------------------------------------
489  // The iteration count is still ticking, so the reseted model does not get
490  // the full max_iterations. This ensures that the model does not reset
491  // infinitely.
492  if (rmse > 10 * best_rmse || std::isnan(rmse) || std::isinf(rmse)) {
493  logprogress_stream << "Resetting model." << std::endl;
494  reset_fraction *= reset_fraction_reduction_rate;
495  model->reset_state(rand(), reset_fraction);
496  model->w.setZero();
497  model->w0 = 0;
498  continue;
499  }
500  }
501  table.print_row("FINAL", progress_time(), rmse);
502  table.print_footer();
503 
504  std::string termination_criterion = "";
505  if (iter == max_iters){
506  termination_criterion = "Optimization Complete: Iteration limit reached.";
507  }
508  logprogress_stream << termination_criterion << std::endl;
509 
510  // Return the training stats
511  std::map<std::string, variant_type> training_stats;
512  training_stats["training_time"] = progress_time().elapsed_seconds;
513  training_stats["num_iterations"] = to_variant(iter);
514  training_stats["final_objective_value"] = to_variant(rmse);
515  model->_training_stats = training_stats;
516  return model;
517 }
518 
519 
520 } // namespace als
521 } // namespace recsys
522 } // namespace turi
523 
524 
525 #endif
static thread_pool & get_instance()
#define logprogress_stream
Definition: logger.hpp:325
variant_type to_variant(const T &f)
Definition: variant.hpp:308
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
int rand()
Definition: random.hpp:431