Turi Create  4.0
item_processing.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_SPARSE_SIMILARITY_ITEM_STATISTICS_H
7 #define TURI_UNITY_SPARSE_SIMILARITY_ITEM_STATISTICS_H
8 
9 #include <toolkits/sparse_similarity/similarities.hpp>
10 #include <toolkits/sparse_similarity/utilities.hpp>
11 #include <core/parallel/pthread_tools.hpp>
12 #include <core/util/dense_bitset.hpp>
13 #include <core/parallel/atomic_ops.hpp>
14 #include <core/logging/table_printer/table_printer.hpp>
15 #include <core/storage/sframe_data/sarray.hpp>
16 #include <core/data/flexible_type/flexible_type.hpp>
17 #include <map>
18 #include <vector>
19 #include <string>
20 
21 namespace turi { namespace sparse_sim {
22 
23 ////////////////////////////////////////////////////////////////////////////////
24 
25 /* For each item, we track several values and statistics needed for
26  * the processing -- the number of users, an item_data_type entry,
27  * and a final_item_data_type entry. These latter types are defined
28  * by the SimilarityType structure.
29  */
30 template <typename SimilarityType>
31 struct item_processing_info {
32  typedef typename SimilarityType::item_data_type item_data_type;
33  typedef typename SimilarityType::final_item_data_type final_item_data_type;
34 
35  size_t num_users = 0;
36  item_data_type item_data = item_data_type();
37  final_item_data_type final_item_data = final_item_data_type();
38 };
39 
40 
41 /** Creates an array of item_processing_info and populates it with
42  * the appropriate item statistics. Accepts as input a sparse
43  * SArray in which each row represents a "user" and each column an
44  * item. This takes an sarray of vectors of (index, value) pairs.
45  *
46  * It's expensive if the number of items is not known ahead of time,
47  * and this is typically known, so we require it as a parameter.
48  *
49  * items_per_user, if not null, is set to a vector recording the
50  * number of items each user rates.
51  */
52 template <typename SimilarityType>
53 void calculate_item_processing_colwise(
54  std::vector<item_processing_info<SimilarityType> >& item_info,
55  const SimilarityType& similarity,
56  const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data,
57  size_t num_items,
58  std::vector<size_t>* items_per_user = nullptr) {
59 
60  item_info.resize(num_items);
61 
62  const size_t n = data->size();
63 
64  ////////////////////////////////////////////////////////////////////////////////
65  // Setup all the containers.
66 
67  static constexpr bool use_item_locking = SimilarityType::require_item_locking();
68 
69  std::vector<simple_spinlock> item_locks;
70 
71  if(use_item_locking) {
72  item_locks.resize(num_items);
73  }
74 
75  if(items_per_user != nullptr) {
76  items_per_user->assign(n, 0);
77  }
78 
79  logprogress_stream << "Gathering per-item and per-user statistics." << std::endl;
80 
81  table_printer table( { {"Elapsed Time (Item Statistics)", 0}, {"% Complete", 0} } );
82 
83  table.print_header();
84 
85  atomic<size_t> rows_processed_total = 0;
86  mutex print_lock;
87 
88  ////////////////////////////////////////////////////////////////////////////////
89  // Now, iterate through the data in parallel.
90 
91  auto process_row_f = [&](size_t thread_idx, size_t row_idx,
92  const std::vector<std::pair<size_t, double> >& item_list)
93  GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
94 
95  if(items_per_user != nullptr) {
96  (*items_per_user)[row_idx] = item_list.size();
97  }
98 
99  for(size_t idx_a = 0; idx_a < item_list.size(); idx_a++) {
100 
101  const size_t item_a = item_list[idx_a].first;
102  const auto& value_a = item_list[idx_a].second;
103 
104  ////////////////////////////////////////////////////////////////////////////////
105  // Apply the vertex function of the similarity.
106  DASSERT_LT(item_a, item_info.size());
107 
108  if(use_item_locking) {
109  DASSERT_LT(item_a, item_locks.size());
110  std::lock_guard<simple_spinlock> lg(item_locks[item_a]);
111  similarity.update_item(item_info[item_a].item_data, value_a);
112  } else {
113  similarity.update_item(item_info[item_a].item_data, value_a);
114  }
115 
116  atomic_increment(item_info[item_a].num_users);
117  }
118 
119  size_t local_rows_processed_total = ++rows_processed_total;
120 
121  if(local_rows_processed_total % 1000 == 0) {
122  double percent_complete = double((400 * local_rows_processed_total) / n) / 4;
123  table.print_timed_progress_row(progress_time(), percent_complete);
124  }
125  };
126 
127  // Now, just do the iteration.
128  iterate_through_sparse_item_array(data, process_row_f);
129 
130  ////////////////////////////////////////////////////////////////////////////////
131  // Now, finalize the vertices.
132 
133  in_parallel([&](size_t thread_idx, size_t num_threads) {
134  size_t start_idx = (thread_idx * num_items) / num_threads;
135  size_t end_idx = ((thread_idx + 1) * num_items) / num_threads;
136 
137  for(size_t i = start_idx; i < end_idx; ++i) {
138  similarity.finalize_item(item_info[i].final_item_data, item_info[i].item_data);
139  }
140  });
141 
142  table.print_row(progress_time(), 100);
143  table.print_footer();
144 }
145 
146 /** A version like the previous one, but has the columns and rows
147  * reversed. That is, each row is an item and each column is a
148  * user.
149  *
150  * Currently, this function does not calculate the item_counts and
151  * user_counts.
152  *
153  * Returns the total number of users.
154  */
155 template <typename SimilarityType>
156 size_t calculate_item_processing_rowwise(
157  std::vector<item_processing_info<SimilarityType> >& item_info,
158  const SimilarityType& similarity,
159  const std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > >& data) {
160 
161  ////////////////////////////////////////////////////////////////////////////////
162  // Setup all the containers.
163 
164  const size_t n = data->size();
165  size_t num_users = 0;
166 
167  item_info.resize(n);
168 
169  // Do a single pass through the data to build all of the vertex
170  // statistics.
171  const size_t max_num_threads = thread::cpu_count();
172  auto reader = data->get_reader(max_num_threads);
173 
174  // Comparing the indices in the row to verify it is indeed sorted.
175  TURI_ATTRIBUTE_UNUSED_NDEBUG auto idx_cmp_f = [](const std::pair<size_t, double>& p1,
176  const std::pair<size_t, double>& p2) {
177  return p1.first < p2.first;
178  };
179 
180  in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
181 
182  size_t start_idx = (thread_idx * n) / num_threads;
183  size_t end_idx = ((thread_idx+1) * n) / num_threads;
184 
185  std::vector<std::vector< std::pair<size_t, double> > > row_buffer_v(1);
186 
187  for(size_t row_idx = start_idx; row_idx < end_idx; ++row_idx) {
188  reader->read_rows(row_idx, row_idx+1, row_buffer_v);
189 
190  const auto& row = row_buffer_v[0];
191 
192  DASSERT_TRUE(std::is_sorted(row.begin(), row.end(), idx_cmp_f));
193 
194  // Update the number of dimensions with the largest one here.
195  if(!row.empty()) {
196  atomic_set_max(num_users, row.back().first + 1);
197  }
198 
199  for(const auto& p : row) {
200 
201  ////////////////////////////////////////////////////////////////////////////////
202  // Apply the vertex function of the similarity. Can use the
203  // unsafe version as each row is isolated to a thread (this
204  // version does not do columns).
205 
206  similarity.update_item_unsafe(item_info[row_idx].item_data, p.second);
207  }
208 
209  similarity.finalize_item(item_info[row_idx].final_item_data, item_info[row_idx].item_data);
210  }
211  });
212 
213  return num_users;
214 }
215 
216 }}
217 
218 #endif
static T atomic_increment(T &value, const U &increment=1, typename std::enable_if< std::is_integral< T >::value &&std::is_integral< U >::value >::type *=0)
Definition: atomic_ops.hpp:292
static size_t cpu_count()
#define logprogress_stream
Definition: logger.hpp:325
#define GL_HOT_INLINE_FLATTEN
void iterate_through_sparse_item_array(const std::shared_ptr< sarray< std::vector< std::pair< size_t, T > > > > &data, RowProcessFunction &&process_row)
Definition: utilities.hpp:279
T atomic_set_max(T &max_value, T new_value)
Definition: atomic_ops.hpp:176
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364