6 #ifndef TURI_UNITY_ITEM_SIMILARITY_LOOKUP_CONSTRUCTION_H_ 7 #define TURI_UNITY_ITEM_SIMILARITY_LOOKUP_CONSTRUCTION_H_ 9 #include <toolkits/sparse_similarity/item_processing.hpp> 10 #include <toolkits/sparse_similarity/index_mapper.hpp> 11 #include <toolkits/sparse_similarity/sparse_similarity_lookup.hpp> 12 #include <toolkits/sparse_similarity/similarities.hpp> 13 #include <toolkits/sparse_similarity/utilities.hpp> 14 #include <toolkits/sparse_similarity/neighbor_search.hpp> 15 #include <toolkits/sparse_similarity/sliced_itemitem_matrix.hpp> 16 #include <core/logging/table_printer/table_printer.hpp> 17 #include <core/generics/sparse_parallel_2d_array.hpp> 18 #include <core/storage/sframe_data/sframe.hpp> 19 #include <core/storage/sframe_data/sframe_iterators.hpp> 20 #include <core/util/logit_math.hpp> 21 #include <core/util/cityhash_tc.hpp> 22 #include <core/util/dense_bitset.hpp> 23 #include <core/util/sys_util.hpp> 24 #include <core/parallel/pthread_tools.hpp> 26 namespace turi {
namespace sparse_sim {
33 template <
typename SimilarityType>
42 const std::map<std::string, flexible_type>& _options)
43 : similarity(_similarity)
44 , max_item_neighborhood_size(_options.at(
"max_item_neighborhood_size"))
51 return SimilarityType::name();
59 typedef typename SimilarityType::item_data_type item_data_type;
60 typedef typename SimilarityType::interaction_data_type interaction_data_type;
61 typedef typename SimilarityType::final_item_data_type final_item_data_type;
62 typedef typename SimilarityType::final_interaction_data_type final_interaction_data_type;
63 typedef typename SimilarityType::prediction_accumulation_type prediction_accumulation_type;
64 typedef std::vector<item_processing_info<SimilarityType> > item_info_vector;
67 static constexpr
bool use_final_item_data() {
68 return sparse_sim::use_final_item_data<SimilarityType>();
75 SimilarityType similarity;
76 size_t total_num_items = 0;
82 typedef std::pair<size_t, final_interaction_data_type> interaction_info_type;
84 std::vector<size_t> item_neighbor_boundaries;
85 std::vector<interaction_info_type> item_interaction_data;
87 size_t max_item_neighborhood_size = 0;
91 std::vector<final_item_data_type> final_item_data;
99 struct _progress_tracker {
101 _progress_tracker(
size_t _num_items)
102 : num_items(_num_items)
103 , table({{
"Elapsed Time (Constructing Lookups)", 0},
104 {
"Total % Complete", 0},
105 {
"Items Processed", 0}})
109 void print_header() { table.print_header(); }
110 void print_break() { table.print_line_break(); }
111 void print_footer() {
112 item_pair_count = num_items * num_items;
113 double percent_complete = 100.0;
120 table.print_footer();
124 void increment_item_counter(
size_t counter = 1) {
125 item_pair_count += counter;
127 if(UNLIKELY(table.time_for_next_row() && !in_print_next_row)) {
136 in_print_next_row =
true;
138 in_print_next_row =
false;
145 void _print_next_row() {
149 std::unique_lock<simple_spinlock> lg(_print_next_row_counter_lock, std::defer_lock);
151 bool acquired_lock = lg.try_lock();
157 size_t _item_pair_count = item_pair_count;
158 size_t items_processed = _item_pair_count / num_items;
160 double n_total_items = double(num_items) * num_items;
161 double prop_complete = std::min(1.0,
double(_item_pair_count) / n_total_items);
164 double percent_complete = std::floor(4 * 100.0 * prop_complete) / 4;
166 table.print_timed_progress_row(
172 size_t num_items = 0;
173 atomic<size_t> item_pair_count;
174 atomic<int> in_print_next_row;
183 std::vector<uint32_t> item_neighbor_counts;
184 std::vector<simple_spinlock> item_interaction_locks;
189 void init_item_lookups(
size_t num_items,
const std::vector<final_item_data_type>& _final_item_data) {
192 total_num_items = num_items;
193 item_neighbor_counts.assign(total_num_items, 0);
194 item_interaction_locks.resize(total_num_items);
198 item_interaction_data.reserve(total_num_items * max_item_neighborhood_size);
200 }
catch(
const std::bad_alloc& e) {
206 if(max_item_neighborhood_size >= 16) {
208 size_t new_max_item_neighborhood_size = std::min<size_t>(64, max_item_neighborhood_size / 2);
211 <<
"Error allocating proper lookup tables with max_item_neighborhood_size = " 212 << max_item_neighborhood_size <<
"; reattempting with max_item_neighborhood_size = " 213 << new_max_item_neighborhood_size <<
"." << std::endl;
215 max_item_neighborhood_size = new_max_item_neighborhood_size;
216 options.at(
"max_item_neighborhood_size") = max_item_neighborhood_size;
217 goto ALLOCATION_RETRY;
220 std::ostringstream ss;
221 ss <<
"Out-of-Memory error allocating proper lookup tables with max_item_neighborhood_size = " 222 << max_item_neighborhood_size <<
". This currently requires a lookup table of " 223 << (max_item_neighborhood_size * total_num_items * 16)
224 <<
" bytes. Please attempt with fewer items or use a machine with more memory." 226 log_and_throw(ss.str().c_str());
230 item_interaction_data.assign(total_num_items * max_item_neighborhood_size,
231 {0, final_interaction_data_type()});
234 if(use_final_item_data()) {
235 ASSERT_EQ(_final_item_data.size(), num_items);
236 final_item_data = _final_item_data;
245 void insert_into_lookup(
246 size_t item_a,
size_t item_b,
247 const final_interaction_data_type& value) {
249 std::pair<size_t, final_interaction_data_type> p = {item_b, value};
250 auto& count = item_neighbor_counts[item_a];
252 final_item_data_type _unused;
254 auto item_comparitor = [&](
const interaction_info_type& p1,
const interaction_info_type& p2)
257 DASSERT_LT(item_a, total_num_items);
258 DASSERT_LT(p1.first, total_num_items);
259 DASSERT_LT(p2.first, total_num_items);
261 return similarity.compare_interaction_values(
264 use_final_item_data() ? final_item_data[item_a] : _unused,
265 use_final_item_data() ? final_item_data[p1.first] : _unused,
266 use_final_item_data() ? final_item_data[p2.first] : _unused);
271 if(count == max_item_neighborhood_size
272 && !(item_comparitor(p, item_interaction_data[item_a * max_item_neighborhood_size + count - 1]))) {
278 auto insert_on_heap = [&]() GL_GCC_ONLY(GL_HOT_NOINLINE) {
281 std::lock_guard<simple_spinlock> lg(item_interaction_locks[item_a]);
286 for(
size_t i = 0; i < count; ++i) {
287 DASSERT_NE(item_interaction_data[item_a*max_item_neighborhood_size + i].first, p.first);
292 if(count < max_item_neighborhood_size) {
293 item_interaction_data[item_a*max_item_neighborhood_size + count] = p;
296 if(count == max_item_neighborhood_size) {
297 std::make_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
298 item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
302 if(LIKELY(item_comparitor(p, item_interaction_data[item_a * max_item_neighborhood_size + count - 1]))) {
304 item_interaction_data[item_a * max_item_neighborhood_size + count - 1] = p;
305 std::push_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
306 item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
308 std::pop_heap(item_interaction_data.begin() + (item_a*max_item_neighborhood_size),
309 item_interaction_data.begin() + ((item_a + 1)*max_item_neighborhood_size),
313 DASSERT_FALSE(item_comparitor(item_interaction_data[item_a * max_item_neighborhood_size + count - 1], p));
325 GL_HOT_NOINLINE_FLATTEN
326 void finalize_lookups() {
327 double threshold =
options.at(
"threshold");
330 size_t current_position = 0;
331 item_neighbor_boundaries.resize(total_num_items + 1);
332 for(
size_t i = 0; i < total_num_items; ++i) {
333 item_neighbor_boundaries[i] = current_position;
336 size_t write_pos = 0;
337 for(
size_t j = 0; j < item_neighbor_counts[i]; ++j) {
338 if(item_interaction_data[i * max_item_neighborhood_size + j].second > threshold) {
339 item_interaction_data[current_position + write_pos]
340 = item_interaction_data[i * max_item_neighborhood_size + j];
345 current_position += write_pos;
348 item_neighbor_boundaries[total_num_items] = current_position;
349 item_interaction_data.resize(current_position);
350 item_neighbor_counts.clear();
351 item_neighbor_counts.shrink_to_fit();
354 atomic<size_t> current_idx = 0;
356 in_parallel([&](
size_t thread_idx,
size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
358 size_t idx = (++current_idx) - 1;
359 if(idx >= total_num_items) {
363 std::sort(item_interaction_data.begin() + item_neighbor_boundaries[idx],
364 item_interaction_data.begin() + item_neighbor_boundaries[idx + 1],
365 [](
const std::pair<size_t, final_interaction_data_type>& p1,
366 const std::pair<size_t, final_interaction_data_type>& p2) {
367 return p1.first < p2.first;
378 void setup_by_raw_similarity(
381 const sframe& _interaction_data,
382 const std::string& item_column,
383 const std::string& similar_item_column,
384 const std::string& similarity_column,
385 bool add_reverse =
false) {
387 total_num_items = num_items;
390 std::vector<final_item_data_type> _final_item_data;
392 if(use_final_item_data()) {
393 ASSERT_EQ(item_data.size(), total_num_items);
395 _final_item_data.resize(total_num_items);
396 for(
size_t i = 0; i < total_num_items; ++i) {
397 similarity.import_final_item_value(_final_item_data[i], item_data[i]);
401 init_item_lookups(num_items, _final_item_data);
408 {item_column, similar_item_column, similarity_column});
411 log_and_throw(
"Items in provided data must be integers in the set {0, ..., num_items}.");
415 log_and_throw(
"Similar items in provided data must be integers in the set {0, ..., num_items}.");
420 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
422 final_interaction_data_type final_interaction_data = final_interaction_data_type();
425 size_t item_a = it.value(0);
426 size_t item_b = it.value(1);
427 if(item_a == item_b)
continue;
431 if(item_a >= num_items || item_b >= num_items) {
432 auto raise_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
433 std::ostringstream ss;
434 ss <<
"Out of range item index encountered in row " << it.row_index()
435 <<
"; item index = " << std::max(item_a, item_b) <<
" >= " << num_items <<
" = num_items." 437 log_and_throw(ss.str().c_str());
443 similarity.import_final_interaction_value(final_interaction_data, sim_value);
444 insert_into_lookup(item_a, item_b, final_interaction_data);
447 insert_into_lookup(item_b, item_a, final_interaction_data);
462 double estimate_sparse_matrix_density(
463 const item_info_vector& item_info,
464 const std::vector<size_t>& items_per_user) {
468 size_t degree_approximation_threshold =
options.at(
"degree_approximation_threshold");
469 size_t num_items = item_info.size();
519 std::vector<double> item_log1mp(num_items);
521 size_t total_item_counts = 0;
523 for(
const auto& ii : item_info) {
524 total_item_counts += ii.num_users;
529 DASSERT_GT(item_info[i].num_users, 0);
530 double hit_p = double(item_info[i].num_users) / total_item_counts;
531 item_log1mp[i] = std::log1p(-hit_p);
539 auto calc_log_hit_prob_accumulation = [&](
540 size_t i,
size_t j,
size_t user_item_count) {
542 size_t clipped_user_item_count
543 = std::min(user_item_count, degree_approximation_threshold);
546 log1me(std::min<double>(-1e-16, clipped_user_item_count * item_log1mp[i]))
547 +
log1me(std::min<double>(-1e-16, clipped_user_item_count * item_log1mp[j])));
558 return log1me(std::min<double>(1e-16, log_riju));
563 double log_1_m_q = 0;
564 double estimated_prob = 0;
567 size_t num_samples =
options.at(
"sparse_density_estimation_sample_size");
571 num_samples = std::min(num_samples, num_items * num_items);
574 std::vector<_sample> samples(num_samples);
578 [&](
size_t thread_idx,
size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
580 size_t sample_start_idx = (thread_idx * samples.size()) / num_threads;
581 size_t sample_end_idx = ((thread_idx + 1) * samples.size()) / num_threads;
583 for(
size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
584 auto& s = samples[s_idx];
585 s.i = random::fast_uniform<size_t>(0, num_items - 1);
586 s.j = random::fast_uniform<size_t>(0, num_items - 1);
590 double mult_factor = 1;
592 std::vector<size_t> item_count_distribution;
594 static constexpr
size_t user_count_dist_sample_size = 5000;
597 if(items_per_user.size() > user_count_dist_sample_size) {
599 item_count_distribution.resize(user_count_dist_sample_size);
600 for(
size_t i = 0; i < user_count_dist_sample_size; ++i) {
601 size_t idx = random::fast_uniform<size_t>(0, items_per_user.size() - 1);
602 item_count_distribution[i] = items_per_user[idx];
604 mult_factor = double(items_per_user.size()) / item_count_distribution.size();
606 item_count_distribution = items_per_user;
609 for(
size_t idx = 0; idx < item_count_distribution.size(); ++idx) {
610 for(
size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
611 auto& s = samples[s_idx];
612 size_t count = item_count_distribution[idx];
613 s.log_1_m_q += mult_factor * calc_log_hit_prob_accumulation(s.i, s.j, count);
617 for(
size_t s_idx = sample_start_idx; s_idx < sample_end_idx; ++s_idx) {
618 auto& s = samples[s_idx];
619 s.estimated_prob = -std::expm1(s.log_1_m_q);
620 DASSERT_LE(s.estimated_prob, 1.0 + 1e-6);
621 DASSERT_GE(s.estimated_prob, 0.0 - 1e-6);
625 double total_prob = 0;
627 for(
const _sample& s : samples) {
628 total_prob += s.estimated_prob;
631 double estimated_density = total_prob / samples.size();
633 return estimated_density;
639 std::vector<size_t> calculate_slice_structure(
640 size_t num_items,
size_t max_slices,
double bytes_per_interaction) {
642 size_t target_memory_usage =
options.at(
"target_memory_usage");
644 size_t target_num_items_per_slice
645 = std::ceil(target_memory_usage / bytes_per_interaction);
648 target_num_items_per_slice = std::max(num_items, target_num_items_per_slice);
650 auto slice_boundaries = calculate_upper_triangular_slice_structure(
651 num_items, target_num_items_per_slice, max_slices);
653 return slice_boundaries;
658 double bytes_per_item_dense() {
659 return sizeof(interaction_data_type);
664 double bytes_per_item_sparse(
const item_info_vector& item_info,
665 const std::vector<size_t>& items_per_user) {
667 logstream(
LOG_INFO) <<
"Estimating relative cost of doing sparse lookups vs. dense lookups." 670 double estimated_density = estimate_sparse_matrix_density(item_info, items_per_user);
673 << estimated_density <<
". " << std::endl;
677 double estimated_memory_usage_per_element
678 = estimated_density * (1.7 * (
sizeof(size_t) +
sizeof(interaction_data_type) ));
680 return estimated_memory_usage_per_element;
689 size_t get_item_count_threshold_for_user(
690 const item_info_vector& item_info,
691 const std::vector<std::pair<size_t, double> >& item_list) {
693 size_t degree_approximation_threshold =
694 options.at(
"degree_approximation_threshold");
696 DASSERT_GT(item_list.size(), degree_approximation_threshold);
698 std::vector<size_t> items(item_list.size());
699 for(
size_t i = 0; i < item_list.size(); ++i) {
700 items[i] = item_list[i].first;
709 items.begin() + degree_approximation_threshold,
711 [&](
size_t i,
size_t j) {
return item_info[i].num_users < item_info[j].num_users; });
713 size_t item_count_threshold = item_info[items[degree_approximation_threshold]].num_users;
732 size_t n1 = std::count_if(
733 item_list.begin(), item_list.end(),
734 [&](std::pair<size_t, double> p) {
735 return item_info[p.first].num_users <= item_count_threshold;
740 size_t n2 = std::count_if(
741 item_list.begin(), item_list.end(),
742 [&](std::pair<size_t, double> p) {
743 return item_info[p.first].num_users < item_count_threshold;
750 return item_count_threshold;
758 size_t preprocess_nearest_neighbors(
760 const std::shared_ptr<
sarray<std::vector<std::pair<size_t, double> > > >& data,
761 const item_info_vector& item_info,
762 const std::vector<size_t>& items_per_user,
763 std::shared_ptr<_progress_tracker> progress_tracker) {
766 DASSERT_EQ(item_in_nearest_neighbors.
size(), item_info.size());
767 DASSERT_EQ(item_in_nearest_neighbors.popcount(), 0);
769 size_t num_items = item_info.size();
770 size_t num_users = items_per_user.size();
772 std::string force_mode =
options.at(
"training_method");
773 DASSERT_TRUE(force_mode ==
"auto" || force_mode.substr(0,2) ==
"nn");
779 size_t nearest_neighbors_user_count_ratio_threshhold = 32;
785 size_t nearest_neighbors_min_num_items = 32;
786 nearest_neighbors_min_num_items = std::min(
787 item_info.size(), nearest_neighbors_min_num_items);
793 size_t n_in_nearest_neighbors = 0;
796 size_t user_count_threshold
797 = num_users / nearest_neighbors_user_count_ratio_threshhold;
799 for(
size_t i = 0; i < num_items; ++i) {
800 if(item_info[i].num_users > user_count_threshold) {
801 item_in_nearest_neighbors.
set_bit(i);
802 ++n_in_nearest_neighbors;
810 if(force_mode ==
"auto") {
814 if(n_in_nearest_neighbors == 0) {
820 if(nearest_neighbors_min_num_items == item_info.size()) {
821 item_in_nearest_neighbors.
fill();
822 n_in_nearest_neighbors = item_info.size();
827 if(force_mode ==
"nn") {
829 item_in_nearest_neighbors.
fill();
830 n_in_nearest_neighbors = item_info.size();
831 }
else if(force_mode.substr(0, 2) ==
"nn") {
839 if(nearest_neighbors_min_num_items == item_info.size()) {
840 nearest_neighbors_min_num_items /= 2;
845 if(n_in_nearest_neighbors == 0) {
852 if(n_in_nearest_neighbors == item_info.size()) {
853 item_in_nearest_neighbors.
clear();
856 n_in_nearest_neighbors = nearest_neighbors_min_num_items - 1;
868 if(n_in_nearest_neighbors < nearest_neighbors_min_num_items) {
873 std::vector<std::pair<size_t, size_t> > count_buffer(item_info.size());
874 for(
size_t i = 0; i < item_info.size(); ++i) {
875 count_buffer[i] = {item_info[i].num_users, i};
878 std::nth_element(count_buffer.begin(),
879 count_buffer.begin() + nearest_neighbors_min_num_items,
881 [](
const std::pair<size_t, size_t>& p1,
882 const std::pair<size_t, size_t>& p2) {
883 return p1.first > p2.first;
886 DASSERT_GE(count_buffer.at(nearest_neighbors_min_num_items - 1).first,
887 count_buffer.at(nearest_neighbors_min_num_items).first);
890 for(
size_t i = 0; i < nearest_neighbors_min_num_items; ++i) {
891 item_in_nearest_neighbors.
set_bit(count_buffer[i].second);
894 n_in_nearest_neighbors = nearest_neighbors_min_num_items;
897 DASSERT_EQ(item_in_nearest_neighbors.popcount(), n_in_nearest_neighbors);
900 <<
" most common items by brute force search." << std::endl;
906 std::vector<size_t> users_per_item(item_info.size());
907 for(
size_t i = 0; i < item_info.size(); ++i) {
908 users_per_item[i] = item_info[i].num_users;
911 std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > > transposed_data
913 data, users_per_item,
options.at(
"target_memory_usage"));
920 auto process_item_pair = [&](
size_t item_a,
size_t item_b,
const final_interaction_data_type& value)
923 DASSERT_NE(item_a, item_b);
925 insert_into_lookup(item_a, item_b, value);
926 insert_into_lookup(item_b, item_a, value);
929 progress_tracker->increment_item_counter();
938 if(query_idx == ref_idx)
941 if(query_idx < ref_idx) {
942 return item_in_nearest_neighbors.
get(ref_idx);
948 progress_tracker->print_header();
952 brute_force_all_pairs_similarity_with_vector_reference(
969 options.at(
"target_memory_usage"),
975 &item_in_nearest_neighbors);
978 return n_in_nearest_neighbors;
983 template <
typename ItemItemContainer>
984 void _train_with_sparse_matrix_sarray(
985 ItemItemContainer&& interaction_data,
986 const std::vector<size_t>& slice_boundaries,
987 const item_info_vector& item_info,
988 const std::vector<size_t>& items_per_row,
990 std::shared_ptr<_progress_tracker> progress_tracker,
991 const std::shared_ptr<
sarray<std::vector<std::pair<size_t, double> > > >& data) {
998 static constexpr
bool use_interaction_locks = (
1000 SimilarityType::require_interaction_locking()
1003 && !std::is_same<ItemItemContainer,
1006 static constexpr
size_t n_interaction_locks = (use_interaction_locks ? 1024 : 1);
1007 std::array<simple_spinlock, n_interaction_locks> interaction_locks;
1010 const size_t num_items = item_info.size();
1011 TURI_ATTRIBUTE_UNUSED_NDEBUG
const size_t n = data->size();
1012 DASSERT_EQ(items_per_row.size(), n);
1014 const size_t random_seed = (
options.count(
"random_seed")
1015 ? size_t(
options.at(
"random_seed"))
1018 size_t degree_approximation_threshold =
options.at(
"degree_approximation_threshold");
1021 std::map<size_t, size_t> pruned_user_item_count_thresholds;
1026 progress_tracker->print_header();
1031 size_t total_interactions_to_register = 0;
1035 for(
size_t m : items_per_row) {
1036 total_interactions_to_register += std::min(degree_approximation_threshold, m) * m;
1041 total_interactions_to_register *= (slice_boundaries.size() - 1);
1047 double progress_register_scale = (double(num_items) * num_items) / (total_interactions_to_register);
1053 auto init_slice = [&](
size_t slice_idx,
size_t item_idx_start,
size_t item_idx_end) {
1056 size_t slice_height = item_idx_end - item_idx_start;
1057 size_t slice_width = num_items - item_idx_start;
1058 DASSERT_GE(slice_height, 1);
1061 interaction_data.clear();
1062 interaction_data.resize(slice_height, slice_width);
1072 auto process_row = [&](
size_t thread_idx,
size_t row_idx,
1073 size_t item_idx_start,
size_t item_idx_end,
1074 std::vector<std::pair<size_t, double> >& item_list)
1084 size_t m = item_list.size();
1085 size_t n_interactions_to_register
1086 = size_t(progress_register_scale * std::min(m, degree_approximation_threshold) * m);
1088 progress_tracker->increment_item_counter(n_interactions_to_register);
1094 if(item_list.empty()) {
1102 size_t item_count_threshold = std::numeric_limits<size_t>::max();
1108 size_t rng_gen_value = 0;
1111 size_t rng_64bit_threshhold = std::numeric_limits<size_t>::max();
1114 auto exclude_item_by_sampling = [&](
size_t idx) {
1115 rng_gen_value =
hash64(rng_gen_value, idx);
1116 return rng_gen_value >= rng_64bit_threshhold;
1120 bool approximation_enabled = (item_list.size() > degree_approximation_threshold);
1122 if(approximation_enabled) {
1145 std::lock_guard<simple_spinlock> lg(pruned_user_item_count_thresholds_lock);
1147 auto it = pruned_user_item_count_thresholds.find(row_idx);
1149 if(it == pruned_user_item_count_thresholds.end()) {
1150 item_count_threshold
1151 = get_item_count_threshold_for_user(item_info, item_list);
1152 pruned_user_item_count_thresholds[row_idx] = item_count_threshold;
1154 item_count_threshold = it->second;
1161 rng_gen_value =
hash64(random_seed, row_idx);
1162 rng_64bit_threshhold = ((std::numeric_limits<size_t>::max() / item_list.size())
1163 * degree_approximation_threshold);
1169 size_t list_idx_start, list_idx_end;
1171 std::tie(list_idx_start, list_idx_end)
1175 if(UNLIKELY(list_idx_start == list_idx_end)) {
1180 DASSERT_LT(item_list[list_idx_end-1].first, item_idx_end);
1181 DASSERT_GE(item_list[list_idx_end-1].first, item_idx_start);
1182 DASSERT_LT(item_list[list_idx_start].first, item_idx_end);
1183 DASSERT_GE(item_list[list_idx_start].first, item_idx_start);
1188 for(
size_t idx_a = list_idx_start; idx_a < list_idx_end; ++idx_a) {
1190 size_t item_a = item_list[idx_a].first;
1191 const auto& value_a = item_list[idx_a].second;
1195 if(item_info[item_a].num_users > item_count_threshold)
1200 for(
size_t idx_b = idx_a + 1; idx_b < item_list.size(); ++idx_b) {
1201 if(exclude_item_by_sampling(idx_b))
1204 size_t item_b = item_list[idx_b].first;
1205 const auto& value_b = item_list[idx_b].second;
1207 size_t row_idx = item_a - item_idx_start;
1208 size_t col_idx = item_b - item_idx_start;
1210 DASSERT_LT(row_idx, col_idx);
1212 auto update_interaction_f = [&](interaction_data_type& edge)
1216 similarity.update_interaction(
1218 item_info[item_a].item_data, item_info[item_b].item_data,
1222 if(use_interaction_locks) {
1223 std::lock_guard<simple_spinlock> lg(
1224 interaction_locks[
hash64(row_idx, col_idx) % n_interaction_locks]);
1226 interaction_data.apply(row_idx, col_idx, update_interaction_f);
1228 interaction_data.apply(row_idx, col_idx, update_interaction_f);
1245 auto empty_process_element = [](
size_t thread_idx,
size_t row_idx,
1246 size_t item_idx_start,
size_t item_idx_end,
1247 size_t item_idx,
double value)
1257 auto finalize_slice = [&](
size_t slice_idx,
size_t item_idx_start,
size_t item_idx_end) {
1264 interaction_data.apply_all(
1265 [&](
size_t row_idx,
size_t col_idx,
const interaction_data_type& edge)
1268 final_interaction_data_type final_interaction_data = final_interaction_data_type();
1270 size_t item_a = item_idx_start + row_idx;
1271 size_t item_b = item_idx_start + col_idx;
1273 DASSERT_LT(item_a, item_b);
1275 this->similarity.finalize_interaction(
1276 final_interaction_data,
1277 item_info[item_a].final_item_data,
1278 item_info[item_b].final_item_data,
1280 item_info[item_a].item_data,
1281 item_info[item_b].item_data);
1288 insert_into_lookup(abs_item_a, abs_item_b, final_interaction_data);
1289 insert_into_lookup(abs_item_b, abs_item_a, final_interaction_data);
1299 empty_process_element,
1308 std::map<std::string, flexible_type>
1309 train_from_sparse_matrix_sarray(
1311 const std::shared_ptr<
sarray<std::vector<std::pair<size_t, double> > > >& data) {
1314 total_timer.
start();
1315 auto progress_tracker = std::make_shared<_progress_tracker>(num_items);
1317 std::map<std::string, flexible_type> ret;
1320 std::string force_mode =
options.at(
"training_method");
1328 item_info_vector item_info;
1330 std::vector<size_t> items_per_user;
1331 bool nearest_neighbors_run =
false;
1333 calculate_item_processing_colwise(
1334 item_info, similarity, data, num_items, &items_per_user);
1336 size_t num_items_remaining = item_info.size();
1340 std::vector<final_item_data_type> _final_item_data(num_items, final_item_data_type());
1342 for(
size_t i = 0; i < item_info.size(); ++i) {
1343 _final_item_data[i] = item_info[i].final_item_data;
1346 init_item_lookups(num_items, _final_item_data);
1352 auto attempt_dense_pass = [&](
size_t pass_limit) {
1354 std::vector<size_t> dense_slice_structure
1355 = calculate_slice_structure(num_items_remaining, pass_limit, bytes_per_item_dense());
1357 if(dense_slice_structure.empty()) {
1361 size_t num_dense_passes = dense_slice_structure.size() - 1;
1365 if(num_dense_passes <= pass_limit) {
1367 if(num_dense_passes == 1) {
1369 <<
"Processing data in one pass using dense lookup tables." 1373 <<
"Processing data in " << num_dense_passes <<
" passes using dense lookup tables." 1380 matrix_type dense_container;
1383 if(num_dense_passes != 1) {
1384 size_t target_memory_usage =
options.at(
"target_memory_usage");
1385 dense_container.reserve(target_memory_usage /
sizeof(interaction_data_type));
1389 _train_with_sparse_matrix_sarray(
1390 dense_container, dense_slice_structure, item_info,
1391 items_per_user, index_mapper, progress_tracker, data);
1402 size_t max_data_passes =
options.at(
"max_data_passes");
1404 if(force_mode ==
"auto") {
1405 bool success = attempt_dense_pass(4);
1409 options[
"training_method"] =
"dense";
1414 if(force_mode ==
"dense") {
1415 bool success = attempt_dense_pass(max_data_passes);
1419 std::ostringstream ss;
1420 ss <<
"Not enough allowed memory to use training_method = \"dense\" with " 1421 <<
"max_data_passes = " << max_data_passes
1422 <<
"; consider increasing target_memory_usage " 1423 <<
" or max_data_passes." << std::endl;
1424 log_and_throw(ss.str().c_str());
1432 if(force_mode ==
"auto" || force_mode.substr(0, 2) ==
"nn") {
1436 size_t n_in_nearest_neighbors = preprocess_nearest_neighbors(
1437 item_in_nearest_neighbors, data, item_info, items_per_user, progress_tracker);
1439 if(n_in_nearest_neighbors == num_items) {
1444 options[
"training_method"] =
"nn";
1447 }
else if(n_in_nearest_neighbors == 0) {
1450 nearest_neighbors_run =
false;
1454 item_in_nearest_neighbors.invert();
1457 DASSERT_EQ(num_items_remaining, item_info.size());
1458 nearest_neighbors_run =
true;
1461 progress_tracker->print_break();
1468 if(force_mode ==
"nn:dense") {
1469 bool success = attempt_dense_pass(max_data_passes);
1473 std::ostringstream ss;
1474 ss <<
"Not enough allowed memory to use training_method = \"nn:dense\" with " 1475 <<
"max_data_passes = " << max_data_passes
1476 <<
"; consider increasing target_memory_usage " 1477 <<
" or max_data_passes." << std::endl;
1478 log_and_throw(ss.str().c_str());
1488 for(
size_t attempt = 0; ; ++attempt) {
1490 auto error_out = []() {
1491 log_and_throw(
"Unable to determine reasonable way to run " 1492 "item_similarity given constrained running parameters. " 1493 "To fix, try: (1) increasing target_memory_usage, " 1494 "(2) increasing max_data_passes, or (3) forcing nearest " 1495 "neighbors mode with training_method='nn'.");
1504 double bpi_sparse = bytes_per_item_sparse(item_info, items_per_user);
1506 logstream(
LOG_INFO) <<
"Bytes per item in sparse matrix = " << bpi_sparse << std::endl;
1508 std::vector<size_t> sparse_slice_structure
1509 = calculate_slice_structure(num_items_remaining, max_data_passes, bpi_sparse);
1511 bool sparse_possible = !sparse_slice_structure.empty();
1513 size_t num_sparse_passes = (sparse_possible
1514 ? sparse_slice_structure.size() - 1
1515 : std::numeric_limits<size_t>::max());
1517 if(sparse_possible) {
1519 <<
" passes needed for sparse matrix." << std::endl;
1521 logstream(
LOG_INFO) <<
"Number of data passes too high for sparse matrix. " << std::endl;
1526 bool disable_dense = (force_mode ==
"sparse" || force_mode ==
"nn:sparse");
1530 if(!disable_dense) {
1532 size_t dense_mode_allowed_passes = max_data_passes;
1542 if(sparse_possible) {
1543 dense_mode_allowed_passes = std::min(8*num_sparse_passes, max_data_passes);
1546 bool success = attempt_dense_pass(dense_mode_allowed_passes);
1549 if(nearest_neighbors_run) {
1550 options[
"training_method"] =
"nn:dense";
1552 options[
"training_method"] =
"dense";
1559 if(sparse_possible) {
1561 if(num_sparse_passes == 1) {
1563 <<
"Processing data in one pass using sparse lookup tables." 1567 <<
"Processing data in " << num_sparse_passes
1568 <<
" passes using sparse lookup tables." 1574 _train_with_sparse_matrix_sarray(
1575 matrix_type(), sparse_slice_structure, item_info,
1576 items_per_user, index_mapper, progress_tracker, data);
1579 if(nearest_neighbors_run) {
1580 options[
"training_method"] =
"nn:sparse";
1582 options[
"training_method"] =
"sparse";
1596 size_t degree_approximation_threshold =
options.at(
"degree_approximation_threshold");
1599 if(degree_approximation_threshold > 8) {
1601 <<
"Unable to fit dataset processing into limit of max_data_passes=" 1602 << size_t(
options.at(
"max_data_passes")) <<
" and target_memory_usage=" 1603 <<
size_t(
options.at(
"target_memory_usage")) <<
" " 1604 <<
"bytes. Employing more aggressive approximations; " 1605 <<
"increase target_memory_usage, " 1606 <<
"nearest_neighbors_interaction_proportion_threshold, " 1607 <<
"or max_data_passes to avoid this. " 1611 <<
" Setting degree_approximation_threshold=" 1612 << degree_approximation_threshold << std::endl;
1614 options[
"degree_approximation_threshold"] /= 2;
1625 progress_tracker->print_footer();
1639 mutable std::vector<std::vector<prediction_accumulation_type> >
1640 item_prediction_buffers_by_thread;
1645 size_t score_items(std::vector<std::pair<size_t, double> >& item_predictions,
1646 const std::vector<std::pair<size_t, double> >& user_item_data)
const {
1648 final_item_data_type _unused;
1653 DASSERT_LT(outer_thread_idx, item_prediction_buffers_by_thread.size());
1656 auto& item_prediction_buffer = item_prediction_buffers_by_thread[outer_thread_idx];
1657 item_prediction_buffer.assign(total_num_items, prediction_accumulation_type());
1659 atomic<size_t> num_updates = 0;
1662 auto _run_scoring = [&](
1663 size_t user_item_data_start,
size_t user_item_data_end,
1666 for(
size_t i = user_item_data_start; i < user_item_data_end; ++i) {
1667 size_t item = user_item_data[i].first;
1668 double score = user_item_data[i].second;
1670 if(item >= total_num_items) {
1674 for(
size_t i = item_neighbor_boundaries[item]; i < item_neighbor_boundaries[item+1]; ++i) {
1675 const auto& item_neighbor = item_interaction_data[i];
1679 if(use_unsafe_update_method) {
1680 similarity.update_prediction_unsafe(
1681 item_prediction_buffer[item_neighbor.first],
1682 item_neighbor.second,
1683 use_final_item_data() ? final_item_data[item] : _unused,
1684 use_final_item_data() ? final_item_data[item_neighbor.first] : _unused,
1687 similarity.update_prediction(
1688 item_prediction_buffer[item_neighbor.first],
1689 item_neighbor.second,
1690 use_final_item_data() ? final_item_data[item] : _unused,
1691 use_final_item_data() ? final_item_data[item_neighbor.first] : _unused,
1700 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
1701 bool parallel_here = (num_threads != 1);
1706 size_t user_item_data_start = (thread_idx * user_item_data.size()) / num_threads;
1707 size_t user_item_data_end = ((thread_idx+1) * user_item_data.size()) / num_threads;
1709 _run_scoring(user_item_data_start, user_item_data_end,
false);
1712 _run_scoring(0, user_item_data.size(),
true);
1716 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
1717 size_t item_index_start = (thread_idx * item_predictions.size()) / num_threads;
1718 size_t item_index_end = ((thread_idx+1) * item_predictions.size()) / num_threads;
1720 for(
size_t i = item_index_start; i < item_index_end; ++i) {
1722 auto& p = item_predictions[i];
1723 size_t item = p.first;
1725 if(item >= total_num_items) {
1730 p.second = similarity.finalize_prediction(
1731 item_prediction_buffer[item],
1732 use_final_item_data() ? final_item_data[item] : _unused,
1733 user_item_data.size());
1740 return size_t(num_updates);
1746 size_t get_version()
const {
return 1; }
1750 oarc << get_version();
1752 oarc << total_num_items
1754 << item_neighbor_boundaries
1755 << item_interaction_data;
1765 ASSERT_MSG(version == 1,
1766 "Item similarity lookup does not support loading from this version.");
1768 iarc >> total_num_items
1770 >> item_neighbor_boundaries
1771 >> item_interaction_data;
1780 void get_similar_items(
1781 std::vector<std::pair<size_t, flexible_type> >& similar_items,
1783 size_t top_k)
const {
1785 final_item_data_type _unused;
1787 auto item_comparitor = [&](
const interaction_info_type& p1,
const interaction_info_type& p2) ->
bool {
1789 return similarity.compare_interaction_values(
1792 use_final_item_data() ? final_item_data[item] : _unused,
1793 use_final_item_data() ? final_item_data[p1.first] : _unused,
1794 use_final_item_data() ? final_item_data[p2.first] : _unused);
1797 if(item >= total_num_items) {
1798 similar_items.clear();
1803 std::vector<std::pair<size_t, final_interaction_data_type> > item_buffer(
1804 item_interaction_data.begin() + item_neighbor_boundaries[item],
1805 item_interaction_data.begin() + item_neighbor_boundaries[item + 1]);
1807 if(top_k < item_buffer.size()) {
1808 std::nth_element(item_buffer.begin(),
1809 item_buffer.begin() + top_k,
1812 item_buffer.resize(top_k);
1815 std::sort(item_buffer.begin(), item_buffer.end(), item_comparitor);
1817 similar_items.resize(item_buffer.size());
1818 for(
size_t i = 0; i < item_buffer.size(); ++i) {
1819 similar_items[i].first = item_buffer[i].first;
1820 similar_items[i].second = similarity.export_similarity_score(item_buffer[i].second);
1830 std::map<std::pair<size_t, size_t>, final_interaction_data_type> edges_this, edges_other;
1832 for(
size_t i = 0; i < total_num_items; ++i) {
1833 for(
size_t j = item_neighbor_boundaries[i]; j < item_neighbor_boundaries[i + 1]; ++j) {
1834 edges_this[{i, item_interaction_data[j].first}] = item_interaction_data[j].second;
1838 for(
size_t i = 0; i < total_num_items; ++i) {
1839 for(
size_t j = other->item_neighbor_boundaries[i];
1840 j < other->item_neighbor_boundaries[i + 1]; ++j) {
1842 edges_other[{i, other->item_interaction_data[j].first}] = other->item_interaction_data[j].second;
1846 std::vector<std::pair<std::pair<size_t, size_t>, final_interaction_data_type> > in_this_not_that;
1847 std::vector<std::pair<std::pair<size_t, size_t>, final_interaction_data_type> > in_that_not_this;
1848 std::vector<std::pair<std::pair<size_t, size_t>,
1849 std::pair<final_interaction_data_type, final_interaction_data_type> > > diff_values;
1851 for(
const auto& p : edges_this) {
1852 if(edges_other.count(p.first) == 0) {
1853 in_this_not_that.push_back(p);
1854 }
else if(std::abs(edges_other.at(p.first) - p.second) > 1e-6) {
1855 diff_values.push_back( {p.first, {p.second, edges_other.at(p.first)} } );
1859 for(
const auto& p : edges_other) {
1860 if(edges_this.count(p.first) == 0) {
1861 in_that_not_this.push_back(p);
1865 bool failed =
false;
1867 if(!in_this_not_that.empty()) {
1868 std::cout <<
"IN THIS, NOT OTHER: " << std::endl;
1869 for(
const auto& p : in_this_not_that) {
1870 std::cout <<
" (" << p.first.first <<
", " << p.first.second <<
"): " 1871 << p.second << std::endl;
1876 if(!in_that_not_this.empty()) {
1877 std::cout <<
"IN OTHER, NOT THIS: " << std::endl;
1878 for(
const auto& p : in_that_not_this) {
1879 std::cout <<
" (" << p.first.first <<
", " << p.first.second <<
"): " 1880 << p.second << std::endl;
1885 if(!diff_values.empty()) {
1886 std::cout <<
"Differing Values: " << std::endl;
1887 for(
const auto& p : diff_values) {
1888 std::cout <<
" (" << p.first.first <<
", " << p.first.second <<
"): " 1889 <<
"(this = " << p.second.first <<
", other = " << p.second.second <<
")" void start()
Reset the timer.
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
std::string similarity_name() const
GL_HOT_INLINE std::pair< size_t, size_t > find_slice_boundary_indices(const std::vector< std::pair< size_t, T > > &v, size_t item_index_lb, size_t item_index_ub)
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
GL_HOT_FLATTEN void remap_sparse_vector(std::vector< std::pair< size_t, T > > &data_vect) const
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
flex_type_enum column_type(size_t i) const
size_t map_internal_index_to_data_index(size_t internal_idx) const GL_HOT_INLINE_FLATTEN
std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > transpose_sparse_sarray(std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > data, const std::vector< size_t > &item_counts, size_t max_memory_usage)
void iterate_through_sparse_item_array_by_slice(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, const std::vector< size_t > &slice_delimiters, SliceInitFunction &&init_slice, RowProcessFunction &&preprocess_row, ElementProcessFunction &&process_element, SliceFinalizeFunction &&finalize_slice)
bool get(size_t b) const
Returns the value of the bit b.
void fill()
Sets all bits to 1.
std::map< std::string, flexible_type > options
static size_t cpu_count()
static GL_HOT_INLINE_FLATTEN double log1me(double x)
double current_time() const
Returns the elapsed time in seconds since turi::timer::start was last called.
#define logprogress_stream
static uint64_t hash64(const char *s, size_t len)
#define DASSERT_FALSE(cond)
sframe select_columns(const std::vector< std::string > &names) const
size_t set_index_mapping_from_mask(const dense_bitset &is_active_entry)
void clear()
Sets all bits to 0.
#define GL_HOT_INLINE_FLATTEN
static size_t thread_id()
size_t size() const
Returns the number of bits in this bitset.
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
bool set_bit(size_t b)
Atomically sets the bit at position b to true returning the old value.
GL_HOT_FLATTEN void remap_vector(std::vector< T > &data_vect) const
A simple class that can be used for benchmarking/timing up to microsecond resolution.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)