6 #ifndef TURI_UNITY_SPARSE_SIMILARITY_ITEM_STATISTICS_H 7 #define TURI_UNITY_SPARSE_SIMILARITY_ITEM_STATISTICS_H 9 #include <toolkits/sparse_similarity/similarities.hpp> 10 #include <toolkits/sparse_similarity/utilities.hpp> 11 #include <core/parallel/pthread_tools.hpp> 12 #include <core/util/dense_bitset.hpp> 13 #include <core/parallel/atomic_ops.hpp> 14 #include <core/logging/table_printer/table_printer.hpp> 15 #include <core/storage/sframe_data/sarray.hpp> 16 #include <core/data/flexible_type/flexible_type.hpp> 21 namespace turi {
namespace sparse_sim {
30 template <
typename SimilarityType>
31 struct item_processing_info {
32 typedef typename SimilarityType::item_data_type item_data_type;
33 typedef typename SimilarityType::final_item_data_type final_item_data_type;
36 item_data_type item_data = item_data_type();
37 final_item_data_type final_item_data = final_item_data_type();
52 template <
typename SimilarityType>
53 void calculate_item_processing_colwise(
54 std::vector<item_processing_info<SimilarityType> >& item_info,
55 const SimilarityType& similarity,
56 const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data,
58 std::vector<size_t>* items_per_user =
nullptr) {
60 item_info.resize(num_items);
62 const size_t n = data->size();
67 static constexpr
bool use_item_locking = SimilarityType::require_item_locking();
69 std::vector<simple_spinlock> item_locks;
71 if(use_item_locking) {
72 item_locks.resize(num_items);
75 if(items_per_user !=
nullptr) {
76 items_per_user->assign(n, 0);
81 table_printer table( { {
"Elapsed Time (Item Statistics)", 0}, {
"% Complete", 0} } );
85 atomic<size_t> rows_processed_total = 0;
91 auto process_row_f = [&](
size_t thread_idx,
size_t row_idx,
92 const std::vector<std::pair<size_t, double> >& item_list)
95 if(items_per_user !=
nullptr) {
96 (*items_per_user)[row_idx] = item_list.size();
99 for(
size_t idx_a = 0; idx_a < item_list.size(); idx_a++) {
101 const size_t item_a = item_list[idx_a].first;
102 const auto& value_a = item_list[idx_a].second;
106 DASSERT_LT(item_a, item_info.size());
108 if(use_item_locking) {
109 DASSERT_LT(item_a, item_locks.size());
110 std::lock_guard<simple_spinlock> lg(item_locks[item_a]);
111 similarity.update_item(item_info[item_a].item_data, value_a);
113 similarity.update_item(item_info[item_a].item_data, value_a);
119 size_t local_rows_processed_total = ++rows_processed_total;
121 if(local_rows_processed_total % 1000 == 0) {
122 double percent_complete = double((400 * local_rows_processed_total) / n) / 4;
123 table.print_timed_progress_row(progress_time(), percent_complete);
133 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
134 size_t start_idx = (thread_idx * num_items) / num_threads;
135 size_t end_idx = ((thread_idx + 1) * num_items) / num_threads;
137 for(
size_t i = start_idx; i < end_idx; ++i) {
138 similarity.finalize_item(item_info[i].final_item_data, item_info[i].item_data);
142 table.print_row(progress_time(), 100);
143 table.print_footer();
155 template <
typename SimilarityType>
156 size_t calculate_item_processing_rowwise(
157 std::vector<item_processing_info<SimilarityType> >& item_info,
158 const SimilarityType& similarity,
159 const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data) {
164 const size_t n = data->size();
165 size_t num_users = 0;
172 auto reader = data->get_reader(max_num_threads);
175 TURI_ATTRIBUTE_UNUSED_NDEBUG
auto idx_cmp_f = [](
const std::pair<size_t, double>& p1,
176 const std::pair<size_t, double>& p2) {
177 return p1.first < p2.first;
180 in_parallel([&](
size_t thread_idx,
size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
182 size_t start_idx = (thread_idx * n) / num_threads;
183 size_t end_idx = ((thread_idx+1) * n) / num_threads;
185 std::vector<std::vector< std::pair<size_t, double> > > row_buffer_v(1);
187 for(
size_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
188 reader->read_rows(row_idx, row_idx+1, row_buffer_v);
190 const auto& row = row_buffer_v[0];
192 DASSERT_TRUE(std::is_sorted(row.begin(), row.end(), idx_cmp_f));
199 for(
const auto& p : row) {
206 similarity.update_item_unsafe(item_info[row_idx].item_data, p.second);
209 similarity.finalize_item(item_info[row_idx].final_item_data, item_info[row_idx].item_data);
static T atomic_increment(T &value, const U &increment=1, typename std::enable_if< std::is_integral< T >::value &&std::is_integral< U >::value >::type *=0)
static size_t cpu_count()
#define logprogress_stream
#define GL_HOT_INLINE_FLATTEN
void iterate_through_sparse_item_array(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, RowProcessFunction &&process_row)
T atomic_set_max(T &max_value, T new_value)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
#define DASSERT_TRUE(cond)