9 #include <Eigen/Cholesky> 11 #include <toolkits/ml_data_2/ml_data.hpp> 12 #include <model_server/lib/extensions/option_manager.hpp> 13 #include <core/logging/table_printer/table_printer.hpp> 16 #include <toolkits/factorization/factorization_model_impl.hpp> 28 typedef factorization::factorization_model_impl
29 <factorization::model_factor_mode::matrix_factorization, Eigen::Dynamic>
31 typedef Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic,
32 Eigen::RowMajor> DenseMatrix;
33 typedef Eigen::Matrix<float, Eigen::Dynamic, 1> DenseVector;
60 void get_common_user_item_local_index_mapping(
61 const v2::ml_data& training_data_by_user,
62 const v2::ml_data& training_data_by_item,
63 std::vector<size_t>& user_mapping,
64 std::vector<size_t>& item_mapping) {
66 size_t num_users = training_data_by_user.metadata()->column_size(0);
67 size_t num_items = training_data_by_user.metadata()->column_size(1);
68 std::shared_ptr<v2::ml_data_internal::column_indexer> user_index_sorted_by_user =
69 training_data_by_user.metadata()->indexer(0);
70 std::shared_ptr<v2::ml_data_internal::column_indexer> item_index_sorted_by_user =
71 training_data_by_user.metadata()->indexer(1);
72 std::shared_ptr<v2::ml_data_internal::column_indexer> user_index_sorted_by_item =
73 training_data_by_item.metadata()->indexer(1);
74 std::shared_ptr<v2::ml_data_internal::column_indexer> item_index_sorted_by_item =
75 training_data_by_item.metadata()->indexer(0);
77 user_mapping.resize(num_users);
78 item_mapping.resize(num_items);
79 for(
size_t u = 0; u < num_users; u++){
80 user_mapping[u] = user_index_sorted_by_user->immutable_map_value_to_index(
81 user_index_sorted_by_item->map_index_to_value(u));
83 for(
size_t i = 0; i < num_items; i++){
84 item_mapping[i] = item_index_sorted_by_user->immutable_map_value_to_index(
85 item_index_sorted_by_item->map_index_to_value(i));
126 inline std::shared_ptr<factorization::factorization_model> als(
127 const v2::ml_data& training_data_by_user,
128 const v2::ml_data& training_data_by_item,
129 const std::map<std::string, flexible_type>& options){
134 std::shared_ptr<model_type> model(
new model_type);
135 model->setup(
"squared_error", training_data_by_user, options);
138 size_t num_users = training_data_by_user.metadata()->column_size(0);
139 size_t num_factors = model->num_factors();
140 size_t num_ratings = training_data_by_user.num_rows();
141 double lambda = std::max<double>(1e-6,
142 num_ratings * (double) options.at(
"regularization"));
143 size_t max_iters = (size_t) options.at(
"max_iterations");
144 size_t seed = (size_t) options.at(
"random_seed");
145 double init_rand_sigma = (double) options.at(
"init_random_sigma");
148 std::vector<size_t> user_mapping;
149 std::vector<size_t> item_mapping;
150 get_common_user_item_local_index_mapping(training_data_by_user, training_data_by_item,
151 user_mapping, item_mapping);
154 DenseMatrix eye = DenseMatrix::Identity(num_factors, num_factors);
155 double rmse, best_rmse = 1e20;
158 table_printer table({
160 {
"Elapsed time", 12},
162 table.print_header();
163 table.print_row(
"Initial", progress_time(),
"NA");
164 table.print_line_break();
167 model->reset_state(seed, init_rand_sigma);
170 double reset_fraction = 1;
171 double reset_fraction_reduction_rate = 1e-2;
176 for (iter = 0; iter < max_iters; iter++){
180 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
181 DenseMatrix A(num_factors, num_factors);
182 DenseVector b(num_factors);
183 std::vector<v2::ml_data_entry> x;
184 size_t user_id, item_id = 0;
189 training_data_by_user.get_block_iterator(thread_idx, num_threads);
192 it.fill_observation(x);
193 user_id = x[0].index;
194 item_id = num_users + x[1].index;
195 rating = it.target_value() - model->w0;
197 A += model->V.row(item_id).transpose() * model->V.row(item_id);
198 b += rating * model->V.row(item_id);
202 if(it.is_start_of_new_block() || it.done()){
203 model->V.row(user_id) = (A.ldlt().solve(b)).transpose();
204 if (it.done())
break;
215 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
216 DenseMatrix A(num_factors, num_factors);
217 DenseVector b(num_factors);
218 std::vector<v2::ml_data_entry> x;
219 size_t user_id, item_id = 0;
225 training_data_by_item.get_block_iterator(thread_idx, num_threads);
229 it.fill_observation(x);
230 user_id = user_mapping[x[1].index];
231 item_id = num_users + item_mapping[x[0].index];
232 rating = it.target_value() - model->w0;
234 A += model->V.row(user_id).transpose() * model->V.row(user_id);
235 b += rating * model->V.row(user_id);
239 if(it.is_start_of_new_block() || it.done()){
240 model->V.row(item_id) =
241 (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
242 if (it.done())
break;
252 if (best_rmse < rmse) best_rmse = rmse;
253 rmse = model->loss_model->reported_loss_value(
254 model->calculate_loss(training_data_by_user));
255 table.print_row(iter, progress_time(), rmse);
262 if (rmse > 10 * best_rmse || std::isnan(rmse) || std::isinf(rmse)) {
264 reset_fraction *= reset_fraction_reduction_rate;
265 model->reset_state(
rand(), reset_fraction);
272 std::string termination_criterion =
"";
273 table.print_row(
"FINAL", progress_time(), rmse);
274 table.print_footer();
275 if (iter == max_iters){
276 termination_criterion =
"Optimization Complete: Iteration limit reached.";
281 std::map<std::string, variant_type> training_stats;
282 training_stats[
"training_time"] = progress_time().elapsed_seconds;
283 training_stats[
"num_iterations"] =
to_variant(iter);
284 training_stats[
"final_objective_value"] =
to_variant(rmse);
285 model->_training_stats = training_stats;
329 inline std::shared_ptr<factorization::factorization_model> implicit_als(
330 const v2::ml_data& training_data_by_user,
331 const v2::ml_data& training_data_by_item,
332 const std::map<std::string, flexible_type>& options){
336 std::shared_ptr<model_type> model(
new model_type);
337 model->setup(
"squared_error", training_data_by_user, options);
340 size_t num_users = training_data_by_user.metadata()->column_size(0);
341 size_t num_items = training_data_by_user.metadata()->column_size(1);
342 size_t num_factors = model->num_factors();
343 size_t num_ratings = training_data_by_user.num_rows();
344 double lambda = std::max<double>(1e-6,
345 num_ratings * (double) options.at(
"regularization"));
346 size_t max_iters = (size_t) options.at(
"max_iterations");
347 size_t seed = (size_t) options.at(
"random_seed");
348 double init_rand_sigma = (double) options.at(
"init_random_sigma");
351 std::vector<size_t> user_mapping;
352 std::vector<size_t> item_mapping;
353 get_common_user_item_local_index_mapping(training_data_by_user, training_data_by_item,
354 user_mapping, item_mapping);
358 DenseMatrix eye = DenseMatrix::Identity(num_factors, num_factors);
359 double rmse, best_rmse = 1e20;
360 std::vector<double>rmse_per_thread
364 table_printer table({
366 {
"Elapsed time", 12},
367 {
"Estimated Objective Value", 22}});
368 table.print_header();
369 table.print_row(
"Initial", progress_time(),
"NA");
370 table.print_line_break();
373 model->reset_state(seed, init_rand_sigma);
378 double alpha = options.at(
"ials_confidence_scaling_factor");
379 size_t is_log_scaling = (options.at(
"ials_confidence_scaling_type") ==
"log");
381 double reset_fraction = 1;
382 double reset_fraction_reduction_rate = 1e-2;
387 for (iter = 0; iter < max_iters; iter++){
390 DenseMatrix A_cached(num_factors, num_factors);
391 A_cached.triangularView<Eigen::Upper>() = lambda * eye +
392 model->V.bottomRows(num_items).transpose() * model->V.bottomRows(num_items);
396 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
397 DenseMatrix A(num_factors, num_factors);
398 DenseVector b(num_factors);
400 std::vector<v2::ml_data_entry> x;
401 double scaling = 0.0;
402 size_t user_id, item_id = 0;
403 A.triangularView<Eigen::Upper>() = A_cached;
408 training_data_by_user.get_block_iterator(thread_idx, num_threads);
410 it.fill_observation(x);
411 user_id = x[0].index;
412 item_id = num_users + x[1].index;
413 if (is_log_scaling) {
414 scaling = alpha * log(1 + it.target_value() / eps);
416 scaling = alpha * it.target_value();
418 A.triangularView<Eigen::Upper>() += scaling *
419 model->V.row(item_id).transpose() * model->V.row(item_id);
420 b += (1 + scaling) * model->V.row(item_id);
424 if(it.is_start_of_new_block() || it.done()){
425 model->V.row(user_id) =
426 (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
427 if (it.done())
break;
430 A.triangularView<Eigen::Upper>() = A_cached;
438 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
439 DenseMatrix A(num_factors, num_factors);
440 DenseMatrix A_cached(num_factors, num_factors);
441 DenseVector b(num_factors);
443 std::vector<v2::ml_data_entry> x;
444 size_t user_id, item_id = 0;
445 double scaling = 0.0;
446 A_cached.triangularView<Eigen::Upper>() = lambda * eye +
447 model->V.topRows(num_users).transpose() * model->V.topRows(num_users);
448 A.triangularView<Eigen::Upper>() = A_cached;
452 training_data_by_item.get_block_iterator(thread_idx, num_threads);
455 it.fill_observation(x);
456 user_id = user_mapping[x[1].index];
457 item_id = num_users + item_mapping[x[0].index];
458 if (is_log_scaling) {
459 scaling = alpha * log(1 + std::max(it.target_value(), 0.0) / eps);
461 scaling = alpha * std::max(it.target_value(), 0.0);
463 A.triangularView<Eigen::Upper>() += scaling *
464 model->V.row(user_id).transpose() * model->V.row(user_id);
465 b += (1 + scaling) * model->V.row(user_id);
469 if(it.is_start_of_new_block() || it.done()){
470 model->V.row(item_id) =
471 (A.selfadjointView<Eigen::Upper>().ldlt().solve(b)).transpose();
472 if (it.done())
break;
475 A.triangularView<Eigen::Upper>() = A_cached;
482 if (best_rmse < rmse) best_rmse = rmse;
483 rmse = model->loss_model->reported_loss_value(
484 model->calculate_loss(training_data_by_user));
485 table.print_row(iter, progress_time(), rmse);
492 if (rmse > 10 * best_rmse || std::isnan(rmse) || std::isinf(rmse)) {
494 reset_fraction *= reset_fraction_reduction_rate;
495 model->reset_state(
rand(), reset_fraction);
501 table.print_row(
"FINAL", progress_time(), rmse);
502 table.print_footer();
504 std::string termination_criterion =
"";
505 if (iter == max_iters){
506 termination_criterion =
"Optimization Complete: Iteration limit reached.";
511 std::map<std::string, variant_type> training_stats;
512 training_stats[
"training_time"] = progress_time().elapsed_seconds;
513 training_stats[
"num_iterations"] =
to_variant(iter);
514 training_stats[
"final_objective_value"] =
to_variant(rmse);
515 model->_training_stats = training_stats;
static thread_pool & get_instance()
#define logprogress_stream
variant_type to_variant(const T &f)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)