6 #ifndef TURI_SGD_EXPLICIT_RANKING_SGD_SOLVER_CLASS_H_ 7 #define TURI_SGD_EXPLICIT_RANKING_SGD_SOLVER_CLASS_H_ 12 #include <type_traits> 13 #include <core/util/code_optimization.hpp> 14 #include <toolkits/ml_data_2/ml_data.hpp> 15 #include <toolkits/factorization/ranking_sgd_solver_base.hpp> 17 namespace turi {
namespace factorization {
27 template <
class SGDInterface>
34 const std::shared_ptr<sgd::sgd_interface_base>& main_interface,
35 const v2::ml_data& train_data,
36 const std::map<std::string, flexible_type>&
options)
39 , ranking_regularization(options.at(
"ranking_regularization"))
40 , _unobserved_rating_value(options.at(
"unobserved_rating_value"))
41 , num_sampled_negative_examples(options.at(
"num_sampled_negative_examples"))
42 , observation_block_size(options.at(
"sgd_sampling_block_size"))
44 DASSERT_GT(ranking_regularization, 0);
46 if(!(_unobserved_rating_value > std::numeric_limits<float>::lowest())
47 && train_data.has_target()) {
49 const auto& target_stats = train_data.metadata()->target_statistics();
51 if(target_stats->stdev(0) == 0) {
52 _unobserved_rating_value = target_stats->mean(0) - 1;
55 _unobserved_rating_value = (target_stats->mean(0)
56 - 1.96 * target_stats->stdev(0));
64 thread_x_buffers.resize(max_n_threads);
65 thread_candidate_pool.resize(max_n_threads);
66 thread_order_index_buffers.resize(max_n_threads);
67 thread_negative_example_flags.resize(max_n_threads);
68 thread_item_observed.resize(max_n_threads);
80 double ranking_regularization = 0;
81 double _unobserved_rating_value = NAN;
82 size_t num_sampled_negative_examples = 0;
83 size_t observation_block_size = 0;
85 static constexpr
bool using_logistic_loss =
86 std::is_same<typename SGDInterface::LossModelProfile, loss_logistic>::value;
88 inline double unobserved_rating_value()
const {
89 return using_logistic_loss ? 0 : _unobserved_rating_value;
93 typedef std::pair<std::vector<v2::ml_data_entry>,
double> x_buffer_row_type;
94 typedef std::vector<x_buffer_row_type> x_buffer_type;
97 std::vector<x_buffer_type> thread_x_buffers;
99 std::vector<std::vector<std::pair<double, size_t> > > thread_candidate_pool;
100 std::vector<std::vector<size_t> > thread_order_index_buffers;
101 std::vector<dense_bitset> thread_negative_example_flags;
102 std::vector<dense_bitset> thread_item_observed;
128 std::pair<double, double> run_sgd_thread(
130 size_t thread_idx,
size_t num_threads,
131 size_t block_idx,
size_t num_blocks,
132 const v2::ml_data& data,
135 volatile bool& error_detected) GL_HOT {
144 std::default_random_engine shuffle_g(static_cast<uint32_t>(
hash64(iteration, block_idx)));
146 static constexpr
size_t ITEM_COLUMN_INDEX = 1;
148 double loss_value = 0, rank_loss_value = 0;
150 size_t n_items = data.metadata()->column_size(1);
152 x_buffer_type& x_buffer = thread_x_buffers[thread_idx];
153 dense_bitset& negative_example_flag = thread_negative_example_flags[thread_idx];
155 size_t min_buffer_size = (11 * observation_block_size) / 10;
156 if(x_buffer.size() < min_buffer_size) {
159 x_buffer.resize(min_buffer_size);
160 negative_example_flag.
resize(min_buffer_size);
164 auto it = data.get_block_iterator(block_idx, num_blocks);
166 neg_sample_proc_buffer neg_exm_buffer;
169 dense_bitset& item_observed = thread_item_observed[thread_idx];
170 item_observed.
resize(n_items);
171 item_observed.
clear();
176 std::vector<std::pair<double, size_t> >& candidate_pool = thread_candidate_pool[thread_idx];
178 if(candidate_pool.size() != n_items)
179 candidate_pool.resize(n_items);
184 while(!it.done() && !error_detected) {
186 size_t n_items_in_buffer = 0;
188 DASSERT_EQ(negative_example_flag.
size(), x_buffer.size());
192 while(!it.done() && !error_detected && n_items_in_buffer < observation_block_size) {
196 size_t start_of_positive_examples = n_items_in_buffer;
197 size_t n_taken_items = 0;
199 size_t write_idx = start_of_positive_examples;
203 if(UNLIKELY(x_buffer.size() <= write_idx)) {
204 auto resize = [&]() GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
205 size_t new_size = (5*(write_idx + 4)) / 4;
206 x_buffer.resize(new_size);
207 negative_example_flag.
resize(new_size);
213 auto& x = x_buffer[write_idx].first;
214 it.fill_observation(x);
215 x_buffer[write_idx].second = it.target_value();
218 size_t item = x[ITEM_COLUMN_INDEX].index;
219 DASSERT_LT(item, n_items);
233 if(it.done() || it.is_start_of_new_block())
237 size_t n_rows = write_idx - start_of_positive_examples;
239 n_items_in_buffer += n_rows;
246 if(UNLIKELY(n_taken_items == n_items)) {
247 item_observed.
clear();
251 size_t target_num_negative_examples = n_rows;
254 size_t negative_sample_start_index = n_items_in_buffer;
255 size_t current_write_idx = negative_sample_start_index;
258 size_t required_x_buffer_size = current_write_idx + target_num_negative_examples;
260 if(UNLIKELY(x_buffer.size() <= required_x_buffer_size)) {
261 x_buffer.resize(required_x_buffer_size);
262 negative_example_flag.
resize(required_x_buffer_size);
267 for(
size_t pos_idx = start_of_positive_examples;
268 pos_idx < n_items_in_buffer && !error_detected;
272 if(n_taken_items == n_items)
275 double score = this->choose_negative_example(
276 thread_idx, data, iface,
277 x_buffer[current_write_idx].first,
278 x_buffer[pos_idx].first,
284 if(UNLIKELY(!std::isfinite(score))) {
285 error_detected =
true;
290 if(using_logistic_loss || score > unobserved_rating_value()) {
293 DASSERT_LT(current_write_idx, x_buffer.size());
294 DASSERT_LT(ITEM_COLUMN_INDEX, x_buffer[current_write_idx].first.size());
295 DASSERT_LT(x_buffer[current_write_idx].first[ITEM_COLUMN_INDEX].index, item_observed.
size());
297 item_observed.
set_bit_unsync(x_buffer[current_write_idx].first[ITEM_COLUMN_INDEX].index);
302 if(UNLIKELY(error_detected))
305 size_t num_negative_examples = current_write_idx - negative_sample_start_index;
307 n_items_in_buffer = current_write_idx;
313 item_observed, n_rows + num_negative_examples, n_items,
319 return x_buffer[i + start_of_positive_examples].first[ITEM_COLUMN_INDEX].index;
327 for(
size_t i = 0; i < n_items_in_buffer; ++i) {
334 if(UNLIKELY(error_detected))
340 DASSERT_LE(n_items_in_buffer, x_buffer.size());
346 std::vector<size_t>& descent_order_indices = thread_order_index_buffers[thread_idx];
347 descent_order_indices.resize(n_items_in_buffer);
348 std::iota(descent_order_indices.begin(), descent_order_indices.end(), size_t(0));
349 std::shuffle(descent_order_indices.begin(), descent_order_indices.end(), shuffle_g);
354 for(
size_t i = 0; i < n_items_in_buffer; ++i) {
356 size_t index = descent_order_indices[i];
357 DASSERT_LT(index, n_items_in_buffer);
359 const auto& row = x_buffer[index];
361 bool is_negative_example = negative_example_flag.
get(index);
363 double rr_scale = using_logistic_loss ? 1.0 : ranking_regularization;
365 double r = (is_negative_example
366 ? unobserved_rating_value()
369 double ss = (is_negative_example
370 ? step_size * rr_scale
375 bool apply_regularization = !is_negative_example;
379 double descent_loss = iface->apply_sgd_step(
380 thread_idx, row.first, r, ss, apply_regularization);
383 if(is_negative_example) {
384 rank_loss_value += rr_scale * descent_loss;
386 loss_value += descent_loss;
390 if(!std::isfinite(loss_value + rank_loss_value)) {
391 error_detected =
true;
397 negative_example_flag.
clear();
400 if(!iface->state_is_numerically_stable()) {
401 error_detected =
true;
408 return {loss_value, rank_loss_value};
432 std::pair<double, double> run_loss_calculation_thread(
433 size_t thread_idx,
size_t num_threads,
434 const v2::ml_data& data,
435 SGDInterface* iface)
const {
437 double loss_value = 0, rank_loss_value = 0;
439 size_t n_items = data.metadata()->column_size(1);
441 x_buffer_type x_buffer;
444 x_buffer.resize(4*1024);
446 std::vector<v2::ml_data_entry> negative_example_x;
448 neg_sample_proc_buffer neg_exm_buffer;
452 for(
auto it = data.get_block_iterator(thread_idx, num_threads); !it.done();) {
457 size_t n_rows, n_rated_items;
459 std::tie(n_rows, n_rated_items) =
461 x_buffer, it, n_items, item_observed);
467 if(n_rated_items == n_items) {
474 for(
size_t i = 0; i < n_rows; ++i) {
475 const std::vector<v2::ml_data_entry>& x = x_buffer[i].first;
476 double y = x_buffer[i].second;
478 double fx_hat = iface->calculate_fx(thread_idx, x);
479 double loss = iface->loss_model.loss(fx_hat, y);
492 for(
size_t i = 0; i < n_rows; ++i) {
494 const std::vector<v2::ml_data_entry>& x = x_buffer[i].first;
495 double y = x_buffer[i].second;
497 DASSERT_GE(x.size(), 2);
500 double fx_hat = iface->calculate_fx(thread_idx, x);
501 loss_value += iface->loss_model.loss(fx_hat, y);
504 double negative_example_fx =
505 this->choose_negative_example(
509 negative_example_x, x,
511 n_rows, n_items, n_rated_items,
514 if(!std::isfinite(negative_example_fx) || std::fabs(negative_example_fx) > 1e10) {
520 for(
size_t x_check = 0; x_check < n_rows; ++x_check) {
521 DASSERT_NE(negative_example_x[1].index, x_buffer[x_check].first[1].index);
525 if(using_logistic_loss || negative_example_fx > unobserved_rating_value()) {
527 double loss = iface->loss_model.loss(negative_example_fx, unobserved_rating_value());
529 ASSERT_GE(loss, 0.0);
531 double rr_scale = using_logistic_loss ? 1.0 : ranking_regularization;
533 rank_loss_value += (rr_scale * loss);
543 [&](
size_t i) {
return x_buffer[i].first[1].index; } );
546 return {loss_value, rank_loss_value};
GL_HOT_INLINE_FLATTEN void clear_item_observed_buffer(dense_bitset &item_observed, size_t n_rows, size_t n_items, const BufferIndexToItemIndexMapper &map_index) const
bool set_bit_unsync(size_t b)
bool get(size_t b) const
Returns the value of the bit b.
void shuffle(std::vector< T > &vec)
static size_t cpu_count()
explicit_ranking_sgd_solver(const std::shared_ptr< sgd::sgd_interface_base > &main_interface, const v2::ml_data &train_data, const std::map< std::string, flexible_type > &options)
static uint64_t hash64(const char *s, size_t len)
#define DASSERT_FALSE(cond)
void clear()
Sets all bits to 0.
#define GL_HOT_INLINE_FLATTEN
std::pair< size_t, size_t > fill_x_buffer_with_users_items(std::vector< std::pair< std::vector< v2::ml_data_entry >, double > > &x_buffer, v2::ml_data_block_iterator &it, size_t n_items, dense_bitset &item_observed) const GL_HOT_INLINE_FLATTEN
size_t size() const
Returns the number of bits in this bitset.
const std::map< std::string, flexible_type > options
bool clear_bit_unsync(size_t b)
#define DASSERT_TRUE(cond)