6 #ifndef TURI_FACTORIZATION_GENERAL_LINEAR_MODEL_SECOND_ORDER_H_ 7 #define TURI_FACTORIZATION_GENERAL_LINEAR_MODEL_SECOND_ORDER_H_ 15 #include <core/util/branch_hints.hpp> 16 #include <model_server/lib/variant_deep_serialize.hpp> 17 #include <core/data/flexible_type/flexible_type.hpp> 18 #include <toolkits/ml_data_2/ml_data.hpp> 19 #include <core/storage/serialization/serialization_includes.hpp> 20 #include <toolkits/ml_data_2/ml_data_iterators.hpp> 21 #include <toolkits/factorization/factorization_model.hpp> 22 #include <toolkits/factorization/factors_to_sframe.hpp> 23 #include <model_server/lib/extensions/model_base.hpp> 24 #include <core/util/fast_top_k.hpp> 26 namespace turi {
namespace factorization {
31 enum class model_factor_mode {
32 factorization_machine,
38 template <model_factor_mode _factor_mode, flex_
int _num_factors_if_known>
39 class factorization_model_impl final :
public factorization_model {
45 static constexpr model_factor_mode factor_mode = _factor_mode;
46 static constexpr
flex_int num_factors_if_known = _num_factors_if_known;
48 typedef Eigen::Matrix<float, 1, num_factors_if_known, Eigen::RowMajor> factor_type;
49 typedef Eigen::Matrix<float, Eigen::Dynamic, num_factors_if_known, Eigen::RowMajor> factor_matrix_type;
50 typedef Eigen::Matrix<float, Eigen::Dynamic, 1> vector_type;
55 size_t _num_factors = 0;
56 inline size_t num_factors()
const {
58 DASSERT_TRUE(factor_mode == model_factor_mode::pure_linear_model);
60 return _num_factors_if_known == Eigen::Dynamic ? _num_factors : _num_factors_if_known;
63 size_t num_factor_dimensions = 0;
65 bool enable_intercept_term =
true;
66 bool enable_linear_features =
true;
67 bool nmf_mode =
false;
72 volatile double w0 = NAN;
80 size_t max_row_size = 0;
82 struct calculate_fx_processing_buffer {
83 mutable factor_matrix_type XV;
84 mutable factor_type xv_accumulator;
87 std::vector<calculate_fx_processing_buffer> buffers;
92 void setup_buffers() {
94 if(num_factors() == 0) {
95 DASSERT_TRUE(factor_mode == model_factor_mode::pure_linear_model);
102 buffers.resize(n_threads);
103 for(calculate_fx_processing_buffer& buffer : buffers) {
104 buffer.xv_accumulator.resize(num_factors());
105 buffer.XV.resize(max_row_size, num_factors());
108 recommend_cache.resize(n_threads);
115 std::shared_ptr<factorization_model> clone()
const {
116 return std::shared_ptr<factorization_model>(
new factorization_model_impl(*
this));
125 void internal_setup(
const v2::ml_data& train_data) {
127 switch(factor_mode) {
128 case model_factor_mode::factorization_machine:
129 _num_factors = options.at(
"num_factors");
130 num_factor_dimensions = n_total_dimensions;
132 case model_factor_mode::matrix_factorization:
133 _num_factors = options.at(
"num_factors");
134 num_factor_dimensions = index_sizes[0] + index_sizes[1];
136 case model_factor_mode::pure_linear_model:
138 num_factor_dimensions = 0;
142 nmf_mode = options.at(
"nmf");
143 enable_linear_features = !nmf_mode;
144 enable_intercept_term = !nmf_mode;
146 if(num_factors_if_known != Eigen::Dynamic)
147 DASSERT_EQ(num_factors_if_known, _num_factors);
149 max_row_size = train_data.max_row_size();
153 w.resize(n_total_dimensions);
154 V.resize(num_factor_dimensions, num_factors());
156 reset_state(options.at(
"random_seed"), 0.001);
162 void reset_state(
size_t random_seed,
double sd) GL_HOT {
166 const double V_sd = sd / (1 + std::sqrt(num_factors()));
168 size_t num_factor_init_random = index_sizes[0] + index_sizes[1];
169 size_t num_factor_init_zero = num_factor_dimensions - num_factor_init_random;
175 if(enable_linear_features) {
177 size_t start_w_idx = (thread_idx * n_total_dimensions) / num_threads;
178 size_t end_w_idx = ((thread_idx + 1) * n_total_dimensions) / num_threads;
180 for(
size_t i = start_w_idx; i < end_w_idx; ++i)
181 w[i] = (sd > 0) ? random::fast_uniform<double>(-sd/2, sd/2) : 0;
188 size_t start_V_idx = (thread_idx * num_factor_init_random) / num_threads;
189 size_t end_V_idx = ((thread_idx + 1) * num_factor_init_random) / num_threads;
191 for(
size_t i = start_V_idx; i < end_V_idx; ++i) {
192 for(
size_t j = 0; j < num_factors(); ++j) {
194 double lb = nmf_mode ? 0 : -V_sd / 2;
195 double ub = nmf_mode ? V_sd : V_sd / 2;
209 V(i, j) = (V_sd > 0) ? random::fast_uniform<double>(lb, ub) : 0;
211 if(random::fast_uniform<size_t>(0, num_factors()) > std::min<size_t>(4ULL, num_factors() / 2))
218 if(num_factor_init_zero > 0) {
219 size_t start_V_idx = num_factor_init_random + (thread_idx * num_factor_init_zero) / num_threads;
220 size_t end_V_idx = num_factor_init_random + ((thread_idx + 1) * num_factor_init_zero) / num_threads;
222 for(
size_t i = start_V_idx; i < end_V_idx; ++i) {
223 for(
size_t j = 0; j < num_factors(); ++j) {
231 w0 = nmf_mode ? 0 : target_mean;
246 double calculate_fx(
size_t thread_idx,
const std::vector<v2::ml_data_entry>& x)
const GL_HOT_FLATTEN {
248 const size_t x_size = x.size();
252 switch(factor_mode) {
257 case model_factor_mode::factorization_machine: {
259 factor_matrix_type& XV = buffers[thread_idx].XV;
261 DASSERT_GE(
size_t(XV.rows()), x_size);
263 factor_type& xv_accumulator = buffers[thread_idx].xv_accumulator;
264 xv_accumulator.setZero();
266 double fx_value = w0;
270 for(
size_t j = 0; j < x_size; ++j) {
271 const v2::ml_data_entry& v = x[j];
276 if(
__unlikely__(v.index >= index_sizes[v.column_index]))
279 const size_t global_idx = index_offsets[v.column_index] + v.index;
281 double value_shift, value_scale;
282 std::tie(value_shift, value_scale) = this->column_shift_scales[global_idx];
284 double xv = value_scale * (x[j].value - value_shift);
286 XV.row(idx) = xv * V.row(global_idx);
287 xv_accumulator += XV.row(idx);
289 fx_value += xv * w[global_idx];
294 for(
size_t j = 0; j < idx; ++j)
295 fx_value += 0.5*(xv_accumulator.dot(XV.row(j)) - XV.row(j).squaredNorm());
303 case model_factor_mode::matrix_factorization: {
305 factor_matrix_type& XV = buffers[thread_idx].XV;
307 DASSERT_GE(
size_t(XV.rows()), x_size);
308 DASSERT_EQ(
size_t(XV.cols()), num_factors());
314 double fx_value = w0;
316 for(
size_t j : {0, 1}) {
317 const v2::ml_data_entry& v = x[j];
320 DASSERT_EQ(v.column_index, j);
325 if(
__unlikely__(v.index >= index_sizes[v.column_index])) {
332 const size_t global_idx = index_offsets[j] + v.index;
335 DASSERT_EQ(v.value, 1);
337 XV.row(j) = V.row(global_idx);
340 fx_value += w[global_idx];
348 fx_value += XV.row(0).dot(XV.row(1));
354 for(
size_t j = 2; j < x_size; ++j) {
355 const v2::ml_data_entry& v = x[j];
360 if(
__unlikely__(v.index >= index_sizes[v.column_index]))
364 const size_t global_idx = index_offsets[v.column_index] + v.index;
367 double value_shift, value_scale;
368 std::tie(value_shift, value_scale) = column_shift_scales[global_idx];
370 double xv = value_scale * (v.value - value_shift);
372 fx_value += xv * w[global_idx];
381 case model_factor_mode::pure_linear_model: {
383 double fx_value = w0;
385 for(
size_t j = 0; j < x_size; ++j) {
386 const v2::ml_data_entry& v = x[j];
391 if(
__unlikely__(v.index >= index_sizes[v.column_index]))
395 const size_t global_idx = index_offsets[v.column_index] + v.index;
398 double value_shift, value_scale;
399 std::tie(value_shift, value_scale) = column_shift_scales[global_idx];
401 double xv = value_scale * (v.value - value_shift);
403 fx_value += xv * w[global_idx];
419 double calculate_fx(
const std::vector<v2::ml_data_entry>& x)
const GL_HOT_FLATTEN {
422 ASSERT_MSG(thread_idx < buffers.size(),
423 "Threading set up in nonstandard way; thread_id() larger than cpu_count().");
425 return calculate_fx(thread_idx, x);
431 void get_item_similarity_scores(
432 size_t item, std::vector<std::pair<size_t, double> >& sim_scores)
const {
434 switch(factor_mode) {
435 case model_factor_mode::factorization_machine:
436 case model_factor_mode::matrix_factorization:
439 if(item >= index_sizes[1]) {
440 for(
auto& p : sim_scores) {
446 auto base_row = V.row(index_offsets[1] + item);
448 float it_r = base_row.squaredNorm();
450 for(
auto& p : sim_scores) {
451 if(p.first >= index_sizes[1]) {
456 size_t idx = index_offsets[1] + p.first;
457 auto item_row = V.row(idx);
458 p.second = item_row.dot(base_row) / std::sqrt(it_r * item_row.squaredNorm());
463 case model_factor_mode::pure_linear_model:
474 std::map<std::string, variant_type> get_serialization_parameters()
const {
476 std::map<std::string, variant_type> save_parameters;
478 std::string factor_mode_str;
480 switch(factor_mode) {
481 case model_factor_mode::factorization_machine:
482 factor_mode_str =
"factorization_machine";
484 case model_factor_mode::matrix_factorization:
485 factor_mode_str =
"matrix_factorization";
487 case model_factor_mode::pure_linear_model:
488 factor_mode_str =
"pure_linear_model";
492 save_parameters[
"factor_mode"] =
to_variant(factor_mode_str);
494 size_t __num_factors_if_known = num_factors_if_known;
496 save_parameters[
"num_factors_if_known"] =
to_variant(__num_factors_if_known);
498 return save_parameters;
505 size_t get_version()
const {
return 1; }
511 std::map<std::string, variant_type> terms;
514 terms[
"_num_factors"] =
to_variant(num_factors());
515 terms[
"num_factor_dimensions"] =
to_variant(num_factor_dimensions);
516 terms[
"enable_intercept_term"] =
to_variant(enable_intercept_term);
517 terms[
"enable_linear_features"] =
to_variant(enable_linear_features);
519 terms[
"max_row_size"] =
to_variant(max_row_size);
525 oarc << _w0 << w << V;
531 DASSERT_EQ(version, 1);
537 auto terms = variant_get_value<std::map<std::string, variant_type> >(terms_v);
539 #define __EXTRACT(varname) \ 540 varname = variant_get_value<decltype(varname)>(terms.at(#varname)); 542 __EXTRACT(_num_factors);
543 __EXTRACT(num_factor_dimensions);
544 __EXTRACT(enable_intercept_term);
545 __EXTRACT(enable_linear_features);
547 __EXTRACT(max_row_size);
553 iarc >> _w0 >> w >> V;
562 std::map<std::string, variant_type> get_coefficients()
const {
564 std::map<std::string, variant_type> ret;
569 if(enable_intercept_term)
570 ret[
"intercept"] = double(w0);
575 bool include_V_term =
true;
576 bool include_w_term = enable_linear_features;
578 switch(factor_mode) {
579 case model_factor_mode::factorization_machine:
580 case model_factor_mode::matrix_factorization:
581 include_V_term =
true;
583 case model_factor_mode::pure_linear_model:
584 include_V_term =
false;
588 for(
size_t col_idx : {0, 1} ) {
590 std::string k = metadata->column_name(col_idx);
592 sframe res = fill_linear_model_sframe_from_eigen_data(
596 index_sizes[col_idx],
599 index_offsets[col_idx],
604 index_offsets[col_idx],
608 std::shared_ptr<unity_sframe> lt_sf(
new unity_sframe);
609 lt_sf->construct_from_sframe(res);
621 std::vector<sframe> additional_columns;
622 bool include_V_term =
true;
623 bool include_w_term = enable_linear_features;
625 switch(factor_mode) {
626 case model_factor_mode::factorization_machine:
627 include_V_term =
true;
629 case model_factor_mode::pure_linear_model:
630 case model_factor_mode::matrix_factorization:
631 include_V_term =
false;
635 for(
size_t col_idx = 2; col_idx < metadata->num_columns(); ++col_idx) {
637 std::string k = metadata->column_name(col_idx);
639 sframe res = fill_linear_model_sframe_from_eigen_data(
643 index_sizes[col_idx],
646 index_offsets[col_idx],
651 index_offsets[col_idx],
658 std::shared_ptr<sarray<flexible_type> > new_x(
new sarray<flexible_type>);
660 new_x->open_for_write();
662 auto it_out = new_x->get_output_iterator(0);
664 std::shared_ptr<sarray<flexible_type> > old_x = res.select_column(k);
665 auto reader = old_x->get_reader();
666 size_t num_segments = old_x->num_segments();
668 for(
size_t sidx = 0; sidx < num_segments; ++sidx) {
669 auto src_it = reader->begin(sidx);
670 auto src_it_end = reader->end(sidx);
672 for(; src_it != src_it_end; ++src_it, ++it_out)
678 res = res.remove_column(res.column_index(k));
679 res = res.add_column(new_x,
"index");
681 std::shared_ptr<sarray<flexible_type> > name_column(
682 new sarray<flexible_type>(
683 flexible_type(k), res.num_rows()));
685 res = res.add_column(name_column,
"feature");
688 additional_columns.push_back(std::move(res));
694 if(!additional_columns.empty()) {
695 sframe all_res = additional_columns[0];
697 for(
size_t i = 1; i < additional_columns.size(); ++i)
698 all_res = all_res.append(additional_columns[i]);
700 std::shared_ptr<unity_sframe> lt_sf(
new unity_sframe);
702 std::vector<std::string> names = {
"feature",
"index"};
705 names.push_back(
"linear_terms");
708 names.push_back(
"factors");
710 lt_sf->construct_from_sframe(all_res.select_columns(names));
723 void score_all_items(
724 std::vector<std::pair<size_t, double> >& scores,
725 const std::vector<v2::ml_data_entry>& query_row,
727 const std::shared_ptr<v2::ml_data_side_features>& known_side_features)
const {
729 DASSERT_GE(query_row.size(), 2);
730 DASSERT_EQ(query_row[USER_COLUMN_INDEX].column_index, USER_COLUMN_INDEX);
731 DASSERT_EQ(query_row[ITEM_COLUMN_INDEX].column_index, ITEM_COLUMN_INDEX);
733 bool has_side_features = (known_side_features !=
nullptr);
734 bool has_additional_columns = has_side_features || (query_row.size() > 2);
736 size_t user = query_row[USER_COLUMN_INDEX].index;
739 if(factor_mode == model_factor_mode::matrix_factorization
740 && !has_additional_columns
741 && user < index_sizes[USER_COLUMN_INDEX]) {
743 _score_all_items_simple_mf(scores, user, top_k);
746 std::vector<v2::ml_data_entry> x = query_row;
748 if(has_side_features) {
749 _score_all_items_general_purpose<true>(scores, std::move(x), top_k, known_side_features);
751 _score_all_items_general_purpose<false>(scores, std::move(x), top_k,
nullptr);
757 mutable std::vector<vector_type> recommend_cache;
763 void _score_all_items_simple_mf(
764 std::vector<std::pair<size_t, double> >& scores,
766 size_t top_k)
const GL_HOT {
769 size_t items_offset = index_offsets[ITEM_COLUMN_INDEX];
770 size_t num_items = index_sizes[ITEM_COLUMN_INDEX];
774 vector_type& cached_user_item_product = recommend_cache[thread_idx];
776 cached_user_item_product.noalias() =
777 V.middleRows(items_offset, num_items) * V.row(user).transpose()
778 + w.segment(items_offset, num_items);
780 size_t user_global_index = index_offsets[USER_COLUMN_INDEX] + user;
782 DASSERT_LT(user_global_index, w.size());
784 double adjustment = (w0 + w[user_global_index]);
787 for(
size_t i = 0; i < scores.size(); ++i) {
789 size_t item = scores[i].first;
794 double raw_score = (item < index_sizes[ITEM_COLUMN_INDEX]
795 ? cached_user_item_product[item]
798 scores[i].second = adjustment + raw_score;
801 auto p_less_than = [](
const std::pair<size_t, double>& p1,
802 const std::pair<size_t, double>& p2) {
803 return p1.second < p2.second;
810 if(loss_model->prediction_is_translated()) {
811 for(
size_t i = 0; i < scores.size(); ++i) {
812 scores[i].second = loss_model->translate_fx_to_prediction(scores[i].second);
822 template <
bool has_s
ide_features>
824 void _score_all_items_general_purpose(
825 std::vector<std::pair<size_t, double> >& scores,
826 std::vector<v2::ml_data_entry>&& x,
828 const std::shared_ptr<v2::ml_data_side_features>& known_side_features)
const {
835 const size_t x_base_size = x.size();
838 for(
size_t i = 0; i < scores.size(); ++i) {
840 size_t item = scores[i].first;
842 if(has_side_features)
843 x.resize(x_base_size);
845 x[ITEM_COLUMN_INDEX].index = item;
850 if(has_side_features)
851 known_side_features->add_partial_side_features_to_row(x, ITEM_COLUMN_INDEX, item);
854 double raw_score = calculate_fx(thread_idx, x);
857 scores[i] = {item, raw_score};
861 auto p_less_than_2 = [](
const std::pair<size_t, double>& p1,
862 const std::pair<size_t, double>& p2) {
863 return p1.second < p2.second;
870 if(loss_model->prediction_is_translated()) {
871 for(
size_t i = 0; i < scores.size(); ++i) {
872 scores[i].second = loss_model->translate_fx_to_prediction(scores[i].second);
878 mutable mutex factor_norm_lock;
879 mutable bool factor_norms_computed =
false;
880 mutable vector_type factor_norms;
885 void calculate_intracolumn_similarity( vector_type& dest,
size_t column_index,
size_t ref_index)
const {
887 dest.resize(index_sizes[column_index]);
895 if(!factor_norms_computed) {
896 std::lock_guard<mutex> lg(factor_norm_lock);
898 if(!factor_norms_computed) {
899 factor_norms.noalias() = V.rowwise().norm();
900 factor_norms_computed =
true;
904 size_t start_idx = index_offsets[column_index];
905 size_t block_size = index_sizes[column_index];
907 dest.noalias() = V.middleRows(start_idx, block_size) * V.row(start_idx + ref_index).transpose();
908 dest.array() /= (V.row(start_idx + ref_index).norm() * factor_norms.segment(start_idx, block_size).array());
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
void extract_and_sort_top_k(std::vector< T > &v, size_t top_k, LessThan less_than)
void variant_deep_load(variant_type &v, iarchive &iarc)
void variant_deep_save(const variant_type &v, oarchive &oarc)
boost::make_recursive_variant< flexible_type, std::shared_ptr< unity_sgraph_base >, dataframe_t, std::shared_ptr< model_base >, std::shared_ptr< unity_sframe_base >, std::shared_ptr< unity_sarray_base >, std::map< std::string, boost::recursive_variant_ >, std::vector< boost::recursive_variant_ >, boost::recursive_wrapper< function_closure_info > >::type variant_type
static size_t cpu_count()
static uint64_t hash64(const char *s, size_t len)
variant_type to_variant(const T &f)
static size_t thread_id()
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
#define DASSERT_TRUE(cond)