6 #ifndef TURI_SGD_RANKING_SGD_SOLVER_BASE_CLASS_H_ 7 #define TURI_SGD_RANKING_SGD_SOLVER_BASE_CLASS_H_ 11 #include <type_traits> 12 #include <core/util/code_optimization.hpp> 13 #include <toolkits/ml_data_2/ml_data.hpp> 14 #include <toolkits/ml_data_2/ml_data_iterators.hpp> 15 #include <toolkits/sgd/sgd_solver_base.hpp> 16 #include <toolkits/factorization/loss_model_profiles.hpp> 18 namespace turi {
namespace factorization {
32 template <
class SGDInterface>
36 const size_t max_n_threads;
37 double num_sampled_negative_examples;
38 size_t random_seed = 0;
46 const std::shared_ptr<sgd::sgd_interface_base>& main_interface,
47 const v2::ml_data& train_data,
48 const std::map<std::string, flexible_type>&
options)
51 , max_n_threads(
thread::cpu_count())
52 , num_sampled_negative_examples(options.at(
"num_sampled_negative_examples"))
53 , random_seed(
hash64(options.at(
"random_seed")))
55 DASSERT_GE(num_sampled_negative_examples, 1);
98 size_t thread_idx,
size_t num_threads,
99 size_t block_idx,
size_t num_blocks,
100 const v2::ml_data& data,
103 volatile bool& error_detected) = 0;
127 size_t thread_idx,
size_t num_threads,
128 const v2::ml_data& data,
129 SGDInterface* iface)
const = 0;
161 const v2::ml_data& data,
167 SGDInterface* iface =
dynamic_cast<SGDInterface*
>(model_iface);
169 const size_t data_size = data.size();
179 std::vector<double> loss_values(max_n_threads, 0);
181 std::vector<double> rank_loss_values(max_n_threads, 0);
186 volatile bool error_detected =
false;
188 iface->setup_iteration(iteration, step_size);
195 std::vector<size_t> blocks_to_use(num_blocks);
196 std::iota(blocks_to_use.begin(), blocks_to_use.end(), 0);
199 atomic<size_t> current_block = 0;
201 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
204 while(!error_detected) {
205 size_t block_lookup_idx = (++current_block) - 1;
208 if(block_lookup_idx >= num_blocks)
216 thread_idx, num_threads,
217 blocks_to_use[block_lookup_idx], num_blocks,
218 data, iface, step_size, error_detected);
220 loss_values[thread_idx] += lv;
221 rank_loss_values[thread_idx] += rlv;
229 return {std::numeric_limits<double>::max(), std::numeric_limits<double>::max()};
235 iface->finalize_iteration();
237 double loss_no_regularization
238 = (std::accumulate(loss_values.begin(), loss_values.end(), double(0.0))
239 / std::max(
size_t(1), data_size));
242 = (std::accumulate(rank_loss_values.begin(), rank_loss_values.end(), double(0.0))
243 / std::max(
size_t(1), data_size));
245 double regularization_penalty = iface->current_regularization_penalty();
246 double objective_value_estimate = loss_no_regularization + rank_loss + regularization_penalty;
249 if(objective_value_estimate <= 1e-16) {
253 double reported_training_loss = iface->loss_model.reported_loss_value(loss_no_regularization);
258 return {objective_value_estimate, reported_training_loss};
279 const v2::ml_data& data,
280 size_t iteration)
const GL_HOT {
285 SGDInterface* iface =
dynamic_cast<SGDInterface*
>(model_iface);
287 const size_t data_size = data.size();
293 std::vector<double> loss_values(max_n_threads, 0);
294 std::vector<double> rank_loss_values(max_n_threads, 0);
296 volatile bool error_detected =
false;
298 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
301 double loss = 0, rank_loss = 0;
305 thread_idx, num_threads, data, iface);
307 if(!std::isfinite(loss) || loss == std::numeric_limits<double>::max()) {
308 error_detected =
true;
310 loss_values[thread_idx] = loss;
311 rank_loss_values[thread_idx] = rank_loss;
316 return {std::numeric_limits<double>::max(), std::numeric_limits<double>::max()};
322 double loss_no_regularization
323 = (std::accumulate(loss_values.begin(), loss_values.end(), double(0.0))
324 / std::max(
size_t(1), data_size));
327 = (std::accumulate(rank_loss_values.begin(), rank_loss_values.end(), double(0.0))
328 / std::max(
size_t(1), data_size));
330 double regularization_penalty = iface->current_regularization_penalty();
331 double objective_value_estimate = loss_no_regularization + rank_loss + regularization_penalty;
333 double reported_training_loss = iface->loss_model.reported_loss_value(loss_no_regularization);
335 return {objective_value_estimate, reported_training_loss};
359 inline std::pair<size_t, size_t>
361 std::vector<std::pair<std::vector<v2::ml_data_entry>,
double> >& x_buffer,
367 size_t n_rated_items = 0;
370 size_t index = n_rows;
372 if(x_buffer.size() <= index)
373 x_buffer.resize(2*index);
375 auto& x = x_buffer[index].first;
376 it.fill_observation(x);
378 x_buffer[index].second = it.target_value();
380 size_t item = x[1].index;
381 DASSERT_LT(item, n_items);
383 bool old_bit = item_observed.set_bit_unsync(item);
388 }
while(!it.done() && !it.is_start_of_new_block());
390 return std::make_pair(n_rows, n_rated_items);
409 std::vector<v2::ml_data_entry> candidate_x;
410 std::vector<size_t> chosen_negative_items;
411 std::vector<size_t> candidate_negative_items;
423 size_t user_of_available_item_list;
424 std::vector<size_t> available_item_list;
425 std::vector<size_t> available_item_list_chosen_indices;
434 inline double choose_negative_example(
436 const v2::ml_data& data,
438 std::vector<v2::ml_data_entry>& negative_example_x,
439 const std::vector<v2::ml_data_entry>& current_positive_example,
441 size_t n_rows,
size_t n_items,
442 size_t n_rated_items,
445 const size_t ITEM_COLUMN_INDEX = 1;
454 std::vector<size_t>& chosen_negative_items = proc_buf.chosen_negative_items;
455 chosen_negative_items.resize(num_sampled_negative_examples);
457 std::vector<size_t>& candidate_negative_items = proc_buf.candidate_negative_items;
458 candidate_negative_items.resize(num_sampled_negative_examples);
460 size_t n_points_picked = 0;
461 bool remove_from_available_item_list =
false;
477 if(8 * (n_items - n_rated_items) > n_items) {
478 while(n_points_picked < num_sampled_negative_examples) {
481 for(
size_t i = 0; i < num_sampled_negative_examples; ++i) {
482 size_t candidate_item = random::fast_uniform<size_t>(0, n_items - 1);
483 item_observed.
prefetch(candidate_item);
484 candidate_negative_items[i] = candidate_item;
489 i < num_sampled_negative_examples && n_points_picked < num_sampled_negative_examples;
492 size_t candidate_item = candidate_negative_items[i];
493 if(!item_observed.
get(candidate_item))
494 chosen_negative_items[n_points_picked++] = candidate_item;
508 size_t user = current_positive_example[0].index;
513 if(proc_buf.user_of_available_item_list != user
514 || proc_buf.available_item_list.empty() ) {
517 DASSERT_LT(n_rated_items, n_items);
519 proc_buf.available_item_list.resize(n_items - n_rated_items);
521 size_t current_position = 0;
531 proc_buf.available_item_list[0] = current_position;
533 size_t index_count = 1;
536 bool found_index = item_observed.
next_zero_bit(current_position);
539 proc_buf.available_item_list[index_count++] = current_position;
545 DASSERT_EQ(index_count, n_items - n_rated_items);
548 proc_buf.user_of_available_item_list = user;
549 proc_buf.available_item_list.resize(index_count);
555 proc_buf.available_item_list_chosen_indices.resize(num_sampled_negative_examples);
556 for(
size_t i = 0; i < num_sampled_negative_examples; ++i) {
557 size_t idx = random::fast_uniform<size_t>(0, proc_buf.available_item_list.size()-1);
558 chosen_negative_items[i] = proc_buf.available_item_list[idx];
559 proc_buf.available_item_list_chosen_indices[i] = idx;
563 remove_from_available_item_list =
true;
564 n_points_picked = num_sampled_negative_examples;
572 DASSERT_EQ(n_points_picked, num_sampled_negative_examples);
574 for(
size_t i = 0; i < n_points_picked; ++i) {
584 std::vector<v2::ml_data_entry>& candidate_x = proc_buf.candidate_x;
585 candidate_x = current_positive_example;
586 size_t trim_size = candidate_x.size();
588 if(data.has_side_features()) {
594 data.get_side_features()->column_indices_of_side_information_block(ITEM_COLUMN_INDEX);
596 auto new_end = std::remove_if(candidate_x.begin(), candidate_x.end(),
598 return (lb <= v.column_index) && (v.column_index < ub);
601 trim_size = new_end - candidate_x.begin();
604 double highest_fx = std::numeric_limits<double>::lowest();
605 size_t chosen_idx = 0;
608 for(
size_t i = 0; i < n_points_picked; ++i) {
609 candidate_x[1].index = chosen_negative_items[i];
612 if(data.has_side_features()) {
613 candidate_x.resize(trim_size);
614 data.get_side_features()->add_partial_side_features_to_row(candidate_x, ITEM_COLUMN_INDEX);
617 double fx_hat = iface->calculate_fx(thread_idx, candidate_x);
620 if(!std::isfinite(fx_hat))
623 if(fx_hat > highest_fx) {
625 negative_example_x = candidate_x;
631 if(highest_fx == std::numeric_limits<double>::lowest())
634 if(remove_from_available_item_list) {
635 DASSERT_LT(chosen_idx, proc_buf.available_item_list_chosen_indices.size());
636 size_t remove_idx = proc_buf.available_item_list_chosen_indices[chosen_idx];
637 DASSERT_LT(remove_idx, proc_buf.available_item_list.size());
638 std::swap(proc_buf.available_item_list[remove_idx], proc_buf.available_item_list.back());
639 proc_buf.available_item_list.pop_back();
654 template <
typename BufferIndexToItemIndexMapper>
657 dense_bitset& item_observed,
size_t n_rows,
size_t n_items,
658 const BufferIndexToItemIndexMapper& map_index)
const {
665 if(n_rows < n_items / ( 8*
sizeof(
size_t) * 8) ) {
666 for(
size_t i = 0; i < n_rows; ++i) {
667 size_t index = map_index(i);
671 item_observed.
clear();
ranking_sgd_solver_base(const std::shared_ptr< sgd::sgd_interface_base > &main_interface, const v2::ml_data &train_data, const std::map< std::string, flexible_type > &options)
bool next_zero_bit(size_t &b) const
GL_HOT_INLINE_FLATTEN void clear_item_observed_buffer(dense_bitset &item_observed, size_t n_rows, size_t n_items, const BufferIndexToItemIndexMapper &map_index) const
std::pair< double, double > calculate_objective(sgd::sgd_interface_base *model_iface, const v2::ml_data &data, size_t iteration) const GL_HOT
std::pair< double, double > run_iteration(size_t iteration, sgd::sgd_interface_base *model_iface, const v2::ml_data &data, double step_size)
void prefetch(size_t b) const
Prefetches the word containing the bit b.
void clear_word_unsync(size_t b)
bool get(size_t b) const
Returns the value of the bit b.
void shuffle(std::vector< T > &vec)
static size_t cpu_count()
bool first_zero_bit(size_t &b) const
static uint64_t hash64(const char *s, size_t len)
#define DASSERT_FALSE(cond)
void clear()
Sets all bits to 0.
#define GL_HOT_INLINE_FLATTEN
std::pair< size_t, size_t > fill_x_buffer_with_users_items(std::vector< std::pair< std::vector< v2::ml_data_entry >, double > > &x_buffer, v2::ml_data_block_iterator &it, size_t n_items, dense_bitset &item_observed) const GL_HOT_INLINE_FLATTEN
sgd_solver_base(const std::shared_ptr< sgd_interface_base > &model_interface, const v2::ml_data &_train_data, const std::map< std::string, flexible_type > &_options)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
const std::map< std::string, flexible_type > options
virtual std::pair< double, double > run_loss_calculation_thread(size_t thread_idx, size_t num_threads, const v2::ml_data &data, SGDInterface *iface) const =0
#define DASSERT_TRUE(cond)
virtual std::pair< double, double > run_sgd_thread(size_t iteration, size_t thread_idx, size_t num_threads, size_t block_idx, size_t num_blocks, const v2::ml_data &data, SGDInterface *iface, double step_size, volatile bool &error_detected)=0