Turi Create  4.0
column_statistics.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_DML_DATA_COLUMN_STATISTICS_H_
7 #define TURI_DML_DATA_COLUMN_STATISTICS_H_
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/logging/assertions.hpp>
11 #include <core/storage/serialization/serialization_includes.hpp>
12 #include <ml/ml_data/ml_data_column_modes.hpp>
13 #include <core/parallel/pthread_tools.hpp>
14 #include <boost/thread/lock_algorithms.hpp>
15 #include <set>
16 
17 namespace turi { namespace ml_data_internal {
18 
19 extern size_t ML_DATA_STATS_PARALLEL_ACCESS_THRESHOLD;
20 
21 
22 
23 /**
24  * \ingroup mldata
25  * \internal
26  * \addtogroup mldata ML Data Internal
27  * \{
28  */
29 
30 /**
31  * column_metadata contains "meta data" concerning indexing of a single column
32  * of an SFrame. A collection of meta_data column objects is "all" the
33  * metadata required in the ml_data container.
34  */
36 
37  public:
38 
40 
41  /**
42  * Default constructor.
43  */
44  column_statistics(std::string column_name,
45  ml_column_mode mode,
46  flex_type_enum original_column_type);
47 
48  ////////////////////////////////////////////////////////////
49  // Functions to access the statistics
50 
51  /** Returns the number of seen by the methods collecting the
52  * statistics.
53  */
54  size_t num_observations() const {
55  return total_row_count;
56  }
57 
58  /* The count; index here is the index obtained by one of the
59  * map_value_to_index functions previously.
60  */
61  size_t count(size_t index=0) const {
62  // Use this to
63  if(mode == ml_column_mode::NUMERIC
64  || mode == ml_column_mode::NUMERIC_VECTOR) {
65 
66  return total_row_count;
67  } else {
68  size_t ret = index < counts.size() ? counts[index] : 0;
69  DASSERT_LE(ret, total_row_count);
70  return ret;
71  }
72  }
73 
74  /* The mean; index here is the index obtained by one of the
75  * map_value_to_index functions previously.
76  */
77  double mean(size_t index=0) const {
78  if(mode == ml_column_mode::CATEGORICAL
79  || mode == ml_column_mode::CATEGORICAL_VECTOR) {
80 
81  double p = double(count(index)) / (std::max(1.0, double(total_row_count)));
82 
83  DASSERT_LE(p, 1.0);
84  DASSERT_GE(p, 0.0);
85 
86  return p;
87 
88  } else {
89 
90  if(total_row_count)
91  DASSERT_TRUE(!statistics.empty());
92 
93  return index < statistics.size() ? statistics[index].mean : 0;
94  }
95  }
96 
97  /* The variance; index here is the index obtained by one of the
98  * map_value_to_index functions previously.
99  */
100  double stdev(size_t index=0) const {
101  if(mode == ml_column_mode::CATEGORICAL
102  || mode == ml_column_mode::CATEGORICAL_VECTOR) {
103 
104  double stdev = 0;
105  double p = mean(index);
106 
107  DASSERT_LE(p, 1.0);
108  DASSERT_GE(p, 0.0);
109 
110  if (total_row_count > 1) {
111  stdev = std::sqrt(total_row_count * p * (1 - p) / (total_row_count-1));
112  } else {
113  stdev = 0;
114  }
115 
116  DASSERT_FALSE(std::isnan(stdev));
117 
118  return stdev;
119 
120  } else {
121 
122  if(total_row_count) {
123  DASSERT_TRUE(!statistics.empty());
124  }
125 
126  return index < statistics.size() ? statistics[index].stdev : 0;
127  }
128  }
129 
130  ////////////////////////////////////////////////////////////
131  // Routines for updating the statistics. This is done online, while
132  // new categories are being added, etc., so we have to be
133 
134  /// Initialize the statistics -- counting, mean, and stdev
135  void initialize();
136 
137  /// Update categorical statistics for a batch of categorical indices.
138  void update_categorical_statistics(size_t thread_idx, const std::vector<size_t>& cat_index_vect) GL_HOT {
139 
140  DASSERT_TRUE(mode == ml_column_mode::CATEGORICAL
141  || mode == ml_column_mode::CATEGORICAL_VECTOR);
142 
143  std::vector<size_t>& counts = by_thread_element_counts[thread_idx];
144 
145  // The input indices are assured to be sorted
146  DASSERT_TRUE(std::is_sorted(cat_index_vect.begin(), cat_index_vect.end()));
147 
148  size_t cv_idx = 0;
149 
150  for(cv_idx = 0;
151  cv_idx < cat_index_vect.size()
152  && cat_index_vect[cv_idx] < parallel_threshhold;
153  ++cv_idx) {
154 
155  size_t idx = cat_index_vect[cv_idx];
156 
157  check_local_array_size(idx, counts);
158 
159  if(cv_idx == 0 || (idx != cat_index_vect[cv_idx - 1]) ) {
160  ++(counts[idx]);
161  }
162  }
163 
164  if(cv_idx != cat_index_vect.size() ) {
165  for(; cv_idx < cat_index_vect.size(); ++cv_idx) {
166  size_t idx = cat_index_vect[cv_idx] - parallel_threshhold;
167 
168  check_global_array_sizes(idx, global_element_counts);
169 
170  if(cv_idx == 0 || (idx != cat_index_vect[cv_idx - 1]) ) {
171  std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
172  ++(global_element_counts[idx]);
173  }
174  }
175  }
176 
177  ++by_thread_row_counts[thread_idx];
178  }
179 
180  /// Update categorical statistics for a batch of real values.
181  void update_numeric_statistics(size_t thread_idx, const std::vector<double>& value_vect) GL_HOT {
182 
183  DASSERT_TRUE(mode == ml_column_mode::NUMERIC
184  || mode == ml_column_mode::NUMERIC_VECTOR
185  || mode == ml_column_mode::NUMERIC_ND_VECTOR);
186 
187  // Silently ignore columns of empty vectors. Note that all the
188  // vectors in a column must be empty for this to work.
189  if(value_vect.empty()) {
190  return;
191  }
192 
193  // Numeric statistics are always cached on a thread-local basis
194  // and ignore the parallel_threshhold parameter.
195  size_t& count = by_thread_row_counts[thread_idx];
196  auto& stats = by_thread_mean_var_acc[thread_idx];
197 
198  if(stats.empty()) {
199 
200  DASSERT_EQ(size_t(count), 0);
201 
202  stats.resize(value_vect.size());
203 
204  for(size_t i = 0; i < value_vect.size(); ++i) {
205  stats[i].mean = value_vect[i];
206  stats[i].var_sum = 0;
207  }
208 
209  } else {
210  DASSERT_EQ(stats.size(), value_vect.size());
211 
212  // Efficient and Stable Mean+Stdev computation
213  // --------------------------------------------------------------------
214  // This way of computing stdev goes back to a 1962 paper by B. P.
215  // Welford and is presented in Donald Knuth's Art of Computer
216  // Programming, Vol 2, page 232, 3rd edition. It is way more accurate
217  // and stable than computing it "directly".
218  //
219  // Online update rule:
220  //
221  // M_k = M_{k-1} + (x_k - M_{k-1})/k
222  // S_k = S_{k-1} + (x_k - M_{k-1})*(x_k - M_k).
223  //
224  // Here: M_k is the estimate of the mean and S_k/(k-1) is the estimate
225  // of the variance:
226  //
227  for(size_t i = 0; i < value_vect.size(); i++){
228  double& mean = stats[i].mean;
229  double& var_sum = stats[i].var_sum;
230 
231  double old_mean = mean;
232  double v = value_vect[i];
233 
234  mean += (v - old_mean) / (count + 1);
235 
236  // stdev here is actually the variance times the counts; it's
237  // converted to the stdev later
238  var_sum += (v - old_mean) * (v - mean);
239  }
240  }
241 
242  ++count;
243  }
244 
245  /// Update statistics after observing a dictionary.
246  void update_dict_statistics(size_t thread_idx, const std::vector<std::pair<size_t, double> >& dict) GL_HOT {
247 
248  DASSERT_TRUE(mode == ml_column_mode::DICTIONARY);
249 
250  // Does not compile on windows; comment out for now
251  // // The input array is sorted here.
252  // DASSERT_TRUE(std::is_sorted(dict.begin(), dict.end(),
253  // [](std::pair<size_t, double> p1, std::pair<size_t, double> p2) {
254  // return p1.first < p2.first;
255  // }));
256 
257  size_t& row_count = by_thread_row_counts[thread_idx];
258 
259  // The update function for all this
260  auto update_f = [&](size_t& count, double& mean, double& var_sum, double v) {
261 
262  // Efficient and Stable Mean+Stdev computation on Sparse data
263  // --------------------------------------------------------------------
264  // We combine means and variances of 2 samples as follows. The
265  // first sample contains all the non-zeros while the second
266  // contains only zeros.
267  //
268  // M_{a+b} = m * M_a / (m + n)
269  // S_{a+b} = S_{a} + M_a^2 - M_{a+b}^2 * (a+b)
270  //
271  // This computation is O(m) (insted of being O(n+m)).
272  //
273 
274  if(count == 0) {
275  count = 1;
276  mean = v;
277  var_sum = 0;
278 
279  } else {
280  double old_mean = mean;
281 
282  ++count;
283  mean += (v - old_mean) / count;
284  var_sum += (v - old_mean) * (v - mean);
285  }
286  };
287 
288  // First, go through and work with the common elements.
289  std::vector<size_t>& counts = by_thread_element_counts[thread_idx];
290  std::vector<element_statistics_accumulator>& stats = by_thread_mean_var_acc[thread_idx];
291 
292  size_t d_idx = 0;
293 
294  for(; d_idx < dict.size() && dict[d_idx].first < parallel_threshhold; ++d_idx) {
295  size_t idx = dict[d_idx].first;
296  double v = dict[d_idx].second;
297 
298  check_local_array_size(idx, counts);
299  check_local_array_size(idx, stats);
300 
301  update_f(counts[idx], stats[idx].mean, stats[idx].var_sum, v);
302  }
303 
304  // Okay, there are infrequent elements here, so we need to look at the global update deal.
305  if(d_idx != dict.size()) {
306 
307  for(; d_idx < dict.size(); ++d_idx) {
308 
309  size_t idx = dict[d_idx].first - parallel_threshhold;
310  double v = dict[d_idx].second;
311 
312  check_global_array_sizes(idx, global_element_counts, global_mean_var_acc);
313 
314  std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
315 
316  update_f(global_element_counts[idx],
317  global_mean_var_acc[idx].mean,
318  global_mean_var_acc[idx].var_sum, v);
319  }
320  }
321 
322  ++row_count;
323  }
324 
325  /// Perform final computations on the different statistics. Must be
326  /// called after all the data is filled.
327  void finalize();
328 
329  /** Reindex, typically according to a global map of things.
330  */
331  void reindex(const std::vector<size_t>& new_index_map, size_t new_column_size);
332 
333  /** Merges in statistics from another column_statistics object.
334  */
335  void merge_in(const column_statistics& other);
336 
337  private:
338  /// Perform finalizations; split between the threadlocal stuff and the global stuff.
339  std::pair<bool, bool> _get_using_flags() const;
340 
341  void _finalize_threadlocal(size_t, bool, bool);
342  void _finalize_global(size_t, bool, bool);
343 
344 
345  public:
346  ////////////////////////////////////////////////////////////////////////////////
347  // Stuff for the saving and loading
348 
349  /** Returns the current serialization version of this model.
350  */
351  size_t get_version() const { return 3; }
352 
353  /**
354  * Serialize the object (save).
355  */
356  void save_impl(turi::oarchive& oarc) const;
357 
358  /**
359  * Load the object.
360  */
361  void load_version(turi::iarchive& iarc, size_t version);
362 
363  /** For debugging purposes.
364  */
365  bool is_equal(const column_statistics* other_ptr) const;
366 
367  /**
368  * Equality testing -- slow! Use for debugging/testing
369  */
370  bool operator==(const column_statistics& other) const;
371 
372  /**
373  * Inequality testing -- slow! Use for debugging/testing
374  */
375  bool operator!=(const column_statistics& other) const;
376 
377 #ifndef NDEBUG
378  void _debug_check_is_approx_equal(std::shared_ptr<column_statistics> other) const;
379 #else
380  void _debug_check_is_approx_equal(std::shared_ptr<column_statistics> other) const {}
381 #endif
382 
383  private:
384 
385  // Store the basic column data. This allows us to do error checking
386  // and error reporting intelligently.
387 
388  std::string column_name;
389  ml_column_mode mode;
390  flex_type_enum original_column_type;
391 
392  ////////////////////////////////////////////////////////////////////////////////
393  // The categorical variables
394 
395  // Put all of these in one single structure, and store a vector of
396  // these structures. Since these are always accessed together, this
397  // reduces cache misses on the lookups -- we now only have one fetch
398  // instead of three.
399 
400  std::vector<size_t> counts;
401 
402  struct element_statistics : public turi::IS_POD_TYPE {
403  double mean = 0; /**< Mean of column. */
404  double stdev = 0; /**< Stdev of column. */
405  };
406 
407  std::vector<element_statistics> statistics;
408  size_t total_row_count = 0;
409 
410  // The issue with having seperate accumulators for each thread is
411  // that it can take an inordinate amount of memory. Thus we use
412  // parallel access for the first million or so items, which are
413  // likely to be the most common. For the rest, we use a larger one
414  // with locking.
415 
416  size_t parallel_threshhold = 1024*1024;
417 
418  std::vector<size_t> by_thread_row_counts;
419 
420  struct element_statistics_accumulator : public turi::IS_POD_TYPE {
421  double mean = 0; /**< Mean of column. */
422  double var_sum = 0; /**< Stdev of column. */
423  };
424 
425  std::vector<std::vector<size_t> > by_thread_element_counts;
426  std::vector<std::vector<element_statistics_accumulator> > by_thread_mean_var_acc;
427 
428  // The locks are done by locking the lock given by the write index
429  // modulus this value.
430  static constexpr size_t n_locks = 64; // Should be power of 2
431 
432  std::array<simple_spinlock, n_locks> global_element_locks;
433  std::vector<size_t> global_element_counts;
434  std::vector<element_statistics_accumulator> global_mean_var_acc;
435 
436  volatile size_t global_size = 0;
437  volatile size_t global_array_buffer_size = 0;
438  std::mutex _array_resize_lock;
439 
440  /** Return the index of the appropriate lock.
441  *
442  */
443  inline size_t get_lock_index(size_t idx) const {
444  size_t lock_idx = (idx % n_locks);
445  return lock_idx;
446  }
447 
448  /** Possibly resize the local array.
449  */
450  template <typename T>
451  inline void check_local_array_size(size_t idx, std::vector<T>& v) {
452  DASSERT_LT(idx, parallel_threshhold);
453 
454  // See if a resize is needed.
455  if(idx >= v.size() ) {
456 
457  if(UNLIKELY(v.capacity() < idx + 1)) {
458  size_t new_capacity = std::min(parallel_threshhold, 3 * (idx + 1) / 2);
459 
460  // If it's likely to go to max capacity, just make it that to avoid future resizes.
461  if(new_capacity > parallel_threshhold / 2)
462  new_capacity = parallel_threshhold;
463 
464  v.reserve(new_capacity);
465  }
466 
467  v.resize(idx + 1);
468  }
469  }
470 
471  /** Check global array size. Possibly resize them.
472  */
473  template <typename... V>
474  inline void check_global_array_sizes(size_t idx, V&... vv) {
475 
476  // If needed, increase the value of global_size.
477  if(UNLIKELY(idx >= global_size)) {
478  atomic_set_max(global_size, idx + 1);
479  }
480 
481  if(UNLIKELY(idx >= global_array_buffer_size)) {
482  resize_global_arrays(idx, vv...);
483  }
484  }
485 
486 
487  template <typename T>
488  inline void __resize_global_array(size_t new_size, std::vector<T>& v) {
489  // This condition should always be true as the
490  DASSERT_EQ(v.size(), global_array_buffer_size);
491  v.resize(new_size);
492  }
493 
494  template <typename V1, typename... VV>
495  inline void __resize_global_array(size_t new_size, V1& v, VV&... other_v) {
496  __resize_global_array(new_size, v);
497  __resize_global_array(new_size, other_v...);
498  }
499 
500 
501  template <typename... V>
502  void resize_global_arrays(size_t idx, V&... vv) {
503 
504  // Grow aggressively, since a resize is really expensive.
505  size_t new_size = 2 * (parallel_threshhold + idx + 1);
506 
507  // First, lock a global lock while the resize is happening, as many threads are likely to
508  // hit this at once. This prevents multiple threads from locking all the locks in the full
509  // array when there is contention.
510  std::lock_guard<std::mutex> lg(_array_resize_lock);
511 
512  // Do we still need to resize it, or has another array hit this already?
513  if(global_array_buffer_size <= idx) {
514  std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
515 
516  for(size_t i = 0; i < n_locks; ++i) {
517  all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
518  }
519 
520  // Ensure nothing is happening with the vector by locking all
521  // locks in a thread safe way. This prevents any thread from
522  // accessing it while we resize it.
523  boost::lock(all_locks.begin(), all_locks.end());
524 
525  __resize_global_array(new_size, vv...);
526 
527  global_array_buffer_size = new_size;
528  }
529  }
530 
531 };
532 
533 /// \}
534 }}
535 
536 
537 BEGIN_OUT_OF_PLACE_SAVE(arc, std::shared_ptr<ml_data_internal::column_statistics>, m) {
538  if(m == nullptr) {
539  arc << false;
540  } else {
541  arc << true;
542 
543  // Save the version number
544  size_t version = m->get_version();
545  arc << version;
546 
547  m->save_impl(arc);
548  }
549 
550 } END_OUT_OF_PLACE_SAVE()
551 
552 
553 BEGIN_OUT_OF_PLACE_LOAD(arc, std::shared_ptr<ml_data_internal::column_statistics>, m) {
554  bool is_not_nullptr;
555  arc >> is_not_nullptr;
556  if(is_not_nullptr) {
557 
559 
560  size_t version;
561  arc >> version;
562 
563  m->load_version(arc, version);
564 
565  } else {
566  m = std::shared_ptr<ml_data_internal::column_statistics>(nullptr);
567  }
568 } END_OUT_OF_PLACE_LOAD()
569 
570 
571 #endif
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
Definition: iarchive.hpp:314
void update_numeric_statistics(size_t thread_idx, const std::vector< double > &value_vect) GL_HOT
Update categorical statistics for a batch of real values.
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
void merge_in(const column_statistics &other)
void save_impl(turi::oarchive &oarc) const
void reindex(const std::vector< size_t > &new_index_map, size_t new_column_size)
Inheriting from this type will force the serializer to treat the derived type as a POD type...
Definition: is_pod.hpp:16
void load_version(turi::iarchive &iarc, size_t version)
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
void update_categorical_statistics(size_t thread_idx, const std::vector< size_t > &cat_index_vect) GL_HOT
Update categorical statistics for a batch of categorical indices.
void initialize()
Initialize the statistics – counting, mean, and stdev.
bool operator!=(const column_statistics &other) const
T atomic_set_max(T &max_value, T new_value)
Definition: atomic_ops.hpp:176
bool operator==(const column_statistics &other) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
void update_dict_statistics(size_t thread_idx, const std::vector< std::pair< size_t, double > > &dict) GL_HOT
Update statistics after observing a dictionary.
bool is_equal(const column_statistics *other_ptr) const
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.
Definition: oarchive.hpp:346