Turi Create  4.0
basic_column_statistics.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_ML2_DATA_BASIC_COLUMN_STATISTICS_H_
7 #define TURI_ML2_DATA_BASIC_COLUMN_STATISTICS_H_
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/logging/assertions.hpp>
11 #include <core/storage/serialization/serialization_includes.hpp>
12 #include <toolkits/ml_data_2/ml_data_column_modes.hpp>
13 #include <core/parallel/pthread_tools.hpp>
14 #include <boost/thread/lock_algorithms.hpp>
15 #include <mutex>
16 
17 namespace turi { namespace v2 { namespace ml_data_internal {
18 
19 extern size_t ML_DATA_STATS_PARALLEL_ACCESS_THRESHOLD;
20 
21 /**
22  * column_metadata contains "meta data" concerning indexing of a single column
23  * of an SFrame. A collection of meta_data column objects is "all" the
24  * metadata required in the ml_data container.
25  */
27 
28  public:
29 
30  ////////////////////////////////////////////////////////////
31  // Functions to access the statistics
32 
33  /** Returns the number of seen by the methods collecting the
34  * statistics.
35  */
36  size_t num_observations() const {
37  return total_row_count;
38  }
39 
40  /* The count; index here is the index obtained by one of the
41  * map_value_to_index functions previously.
42  */
43  size_t count(size_t index) const {
44  // Use this to
45  if(mode == ml_column_mode::NUMERIC
46  || mode == ml_column_mode::NUMERIC_VECTOR) {
47 
48  return total_row_count;
49  } else {
50  return index < counts.size() ? counts[index] : 0;
51  }
52  }
53 
54  /* The mean; index here is the index obtained by one of the
55  * map_value_to_index functions previously.
56  */
57  double mean(size_t index) const {
58  if(mode == ml_column_mode::CATEGORICAL
59  || mode == ml_column_mode::CATEGORICAL_VECTOR) {
60 
61  double p = double(count(index)) / (std::max(1.0, double(total_row_count)));
62  return p;
63 
64  } else {
65 
66  if(total_row_count)
67  DASSERT_TRUE(!statistics.empty());
68 
69  return index < statistics.size() ? statistics[index].mean : 0;
70  }
71  }
72 
73  /* The variance; index here is the index obtained by one of the
74  * map_value_to_index functions previously.
75  */
76  double stdev(size_t index) const {
77  if(mode == ml_column_mode::CATEGORICAL
78  || mode == ml_column_mode::CATEGORICAL_VECTOR) {
79 
80  double stdev = 0;
81  double p = mean(index);
82 
83  if (total_row_count > 1) {
84  stdev = std::sqrt(total_row_count * p * (1 - p) / (total_row_count-1));
85  } else {
86  stdev = 0;
87  }
88 
89  return stdev;
90 
91  } else {
92 
93  if(total_row_count)
94  DASSERT_TRUE(!statistics.empty());
95 
96  return index < statistics.size() ? statistics[index].stdev : 0;
97  }
98  }
99 
100  ////////////////////////////////////////////////////////////
101  // Routines for updating the statistics. This is done online, while
102  // new categories are being added, etc., so we have to be
103 
104  /// Initialize the statistics -- counting, mean, and stdev
105  void initialize();
106 
107  /// Update categorical statistics for a batch of categorical indices.
108  void update_categorical_statistics(size_t thread_idx, const std::vector<size_t>& cat_index_vect) GL_HOT_FLATTEN;
109 
110  /// Update categorical statistics for a batch of real values.
111  void update_numeric_statistics(size_t thread_idx, const std::vector<double>& value_vect) GL_HOT_FLATTEN;
112 
113  /// Update statistics after observing a dictionary.
114  void update_dict_statistics(size_t thread_idx, const std::vector<std::pair<size_t, double> >& dict) GL_HOT_FLATTEN;
115 
116  /// Perform final computations on the different statistics. Must be
117  /// called after all the data is filled.
118  void finalize();
119 
120  private:
121  /// Perform finalizations; split between the threadlocal stuff and the global stuff.
122  void _finalize_threadlocal(size_t, bool, bool);
123  void _finalize_global(size_t, bool, bool);
124 
125 
126  ////////////////////////////////////////////////////////////////////////////////
127  // Stuff for the saving and loading
128 
129  /** Returns the current serialization version of this model.
130  */
131  size_t get_version() const { return 2; }
132 
133  /**
134  * Serialize the object (save).
135  */
136  void save_impl(turi::oarchive& oarc) const;
137 
138  /**
139  * Load the object.
140  */
141  void load_version(turi::iarchive& iarc, size_t version);
142 
143  /** For debugging purposes.
144  */
145  bool is_equal(const column_statistics* other_ptr) const;
146 
147  /** Create a copy with the index cleared.
148  */
149  std::shared_ptr<column_statistics> create_cleared_copy() const;
150 
151 
152  private:
153 
154  ////////////////////////////////////////////////////////////////////////////////
155  // Functions for updating the categorical variables
156 
157  // Put all of these in one single structure, and store a vector of
158  // these structures. Since these are always accessed together, this
159  // reduces cache misses on the lookups -- we now only have one fetch
160  // instead of three.
161 
162  std::vector<size_t> counts;
163 
164  struct element_statistics : public turi::IS_POD_TYPE {
165  double mean = 0; /**< Mean of column. */
166  double stdev = 0; /**< Stdev of column. */
167  };
168 
169  std::vector<element_statistics> statistics;
170  size_t total_row_count = 0;
171 
172  // The issue with having seperate accumulators for each thread is
173  // that it can take an inordinate amount of memory. Thus we use
174  // parallel access for the first million or so items, which are
175  // likely to be the most common. For the rest, we use a larger one
176  // with locking.
177 
178  size_t parallel_threshhold = 1024*1024;
179 
180  std::vector<size_t> by_thread_row_counts;
181 
182  struct element_statistics_accumulator : public turi::IS_POD_TYPE {
183  double mean = 0; /**< Mean of column. */
184  double var_sum = 0; /**< Stdev of column. */
185  };
186 
187  std::vector<std::vector<size_t> > by_thread_element_counts;
188  std::vector<std::vector<element_statistics_accumulator> > by_thread_mean_var_acc;
189 
190  // The locks are done by
191  static constexpr size_t n_locks = 64; // Should be power of 2
192 
193  std::array<simple_spinlock, n_locks> global_element_locks;
194  std::vector<size_t> global_element_counts;
195  std::vector<element_statistics_accumulator> global_mean_var_acc;
196 
197  volatile size_t global_size = 0;
198 
199  /** Return the index of the appropriate lock.
200  *
201  */
202  inline size_t get_lock_index(size_t idx) const {
203  size_t lock_idx = (idx % n_locks);
204  return lock_idx;
205  }
206 
207  /** Possibly resize the local array.
208  */
209  template <typename T>
210  inline void check_local_array_size(size_t idx, std::vector<T>& v) {
211  DASSERT_LT(idx, parallel_threshhold);
212 
213  // See if a resize is needed.
214  if(idx >= v.size() ) {
215 
216  if(UNLIKELY(v.capacity() < idx + 1)) {
217  size_t new_capacity = std::min(parallel_threshhold, 3 * (idx + 1) / 2);
218 
219  // If it's likely to go to max capacity, just make it that to avoid future resizes.
220  if(new_capacity > parallel_threshhold / 2)
221  new_capacity = parallel_threshhold;
222 
223  v.reserve(new_capacity);
224  }
225 
226  v.resize(idx + 1);
227  }
228  }
229 
230  /** Check global array size. Possibly resize it.
231  */
232  template <typename T>
233  inline void check_global_array_size(size_t idx, std::vector<T>& v) {
234 
235  // If needed, increase the value of global_size.
236  if(global_size <= idx) {
237 
238  do {
239  size_t g = global_size;
240 
241  if(g > idx)
242  break;
243 
244  bool success = atomic_compare_and_swap(global_size, g, idx+1);
245 
246  if(!success)
247  continue;
248 
249  } while(false);
250  }
251 
252  // See if a resize is needed.
253  if(UNLIKELY(idx >= v.size() )) {
254 
255  // Grow aggressively, since a resize is really expensive.
256  size_t new_size = 2 * (parallel_threshhold + idx + 1);
257 
258  {
259  std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
260  for(size_t i = 0; i < n_locks; ++i)
261  all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
262 
263  // Ensure nothing is happening with the vector by locking all
264  // locks in a thread safe way. This prevents any thread from
265  // accessing it while we resize it.
266  boost::lock(all_locks.begin(), all_locks.end());
267 
268  // It's possible that another thread beat us to it.
269  if(v.size() < idx + 1)
270  v.resize(new_size);
271 
272  // The destructor of all_locks takes care of the unlocking.
273  }
274  }
275  }
276 
277  /** One way to set the statistics. Used by the serialization converters.
278  *
279  * "counts" -- std::vector<size_t>. Counts.
280  *
281  * "mean" -- std::vector<double>. Mean.
282  *
283  * "stdev" -- std::vector<double>. std dev.
284  *
285  * "total_row_count" -- size_t. Total row count.
286  */
287  void set_data(const std::map<std::string, variant_type>& params);
288 
289 };
290 
291 }}}
292 
293 #endif
#define GL_HOT_FLATTEN
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
void update_categorical_statistics(size_t thread_idx, const std::vector< size_t > &cat_index_vect) GL_HOT_FLATTEN
Update categorical statistics for a batch of categorical indices.
void update_dict_statistics(size_t thread_idx, const std::vector< std::pair< size_t, double > > &dict) GL_HOT_FLATTEN
Update statistics after observing a dictionary.
Inheriting from this type will force the serializer to treat the derived type as a POD type...
Definition: is_pod.hpp:16
bool atomic_compare_and_swap(T &a, T oldval, T newval)
Definition: atomic_ops.hpp:27
void initialize()
Initialize the statistics – counting, mean, and stdev.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
void update_numeric_statistics(size_t thread_idx, const std::vector< double > &value_vect) GL_HOT_FLATTEN
Update categorical statistics for a batch of real values.
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364