Turi Create  4.0
unity_sketch.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #include <vector>
7 #include <limits>
8 #include <future>
9 #include <unordered_set>
10 #include <core/data/flexible_type/flexible_type.hpp>
11 #include <model_server/lib/api/unity_sketch_interface.hpp>
12 #include <core/storage/sframe_data/sarray.hpp>
13 #include <ml/sketches/hyperloglog.hpp>
14 #include <ml/sketches/countsketch.hpp>
15 #include <ml/sketches/quantile_sketch.hpp>
16 #include <ml/sketches/streaming_quantile_sketch.hpp>
17 #include <core/parallel/atomic.hpp>
18 #include <core/logging/logger.hpp>
19 namespace turi {
20 
21 namespace sketches {
22 
23 // forward declarations
24 template <typename T, typename Comparator>
25 class streaming_quantile_sketch;
26 template <typename T, typename Comparator>
27 class quantile_sketch;
28 template <typename T>
29 class countsketch;
30 class space_saving_flextype;
31 class hyperloglog;
32 
33 } // sketches
34 
35 
36 /**
37  * Provides a query interface to a collection of statistics about an SArray
38  * accumulated via various sketching methods.
39  * The unity_sketch object contains a summary of a single SArray (a column of
40  * an SFrame). It contains sketched statistics about the Array which can be
41  * queried efficiently.
42  *
43  * The sketch computation is fast and has complexity approximately linear
44  * in the length of the Array. After which, all queryable functions in the
45  * sketch can be performed nearly instantly.
46  *
47  * The sketch's contents vary depending on whether it is a numeric array,
48  * or a non-numeric (string) array, or list type list vector/dict/recursive
49  * If numeric:
50 
51  * This is essentially a union among a collection of sketches, depend on value
52  * type of SArray, here is what's availble in the sketch for each sarray value type:
53  * numeric type (int, float):
54  * - m_numeric_sketch -- numeric summary like max/min/var/std/mean/quantile
55  * - m_discrete_sketch -- discrete summary like unique items/counts/frequent_items
56  * string type:
57  * - m_discrete_sketch -- discrete summary like unique items/counts/frequent_items
58  * dictionary type:
59  * - m_discrete_sketch -- discrete summary like unique items/counts/frequent_items
60  * - m_dict_key_sketch -- sketch summary of flattened dict keys, it is a
61  * sketch summary of string type where we treat each key as string
62  * - m_dict_value_sketch -- sketch summary of flattened dictionary values.
63  * we infer the type of dictionary value by peek into first 100 rows of
64  * of data and then decide whether or not to use numeric sketch.
65  * - m_element_sub_sketch -- optional. Available only if user explicitly asks for
66  * sketch summary for subset of dictionary keys. The sub sketch type is
67  * of the same type as m_dict_value_sketch
68  * vector(array) type:
69  * - m_discrete_sketch -- discrete summary like unique items/counts/frequent_items
70  * - m_element_sketch -- sketch summary for all values of the vector as if the
71  * values are flattened, and the element sketch is of type float (numeric sketch)
72  * - m_element_sub_sketch -- optional. If user asks for sketch summary for certain
73  * columns in the vector value, then this will be available. It is a collection
74  * of sketches for the corresponding columns and the sketch type is numeric.
75  * list(recursive) type:
76  * - m_discrete_sketch -- discrete summary like unique items/counts/frequent_items
77  * - m_element_sketch -- sketch summary for all values of the vector as if the
78  * values are flattened. The element sketch is of type string. We convert all
79  * list values to string and then do a sketch on it
80  *
81  * The following information is provided exactly:
82  * - length (\ref size())
83  * - # Missing Values (\ref num_undefined())
84  * - Min (\ref min())
85  * - Max (\ref max())
86  * - Mean (\ref mean())
87  * - Variance (\ref var())
88  *
89  * And the following information is provided approximately:
90  * - # Unique Values (\ref num_unique())
91  * - Quantiles (\ref get_quantile())
92  * - Frequent Items (\ref frequent_items())
93  * - Frequency Count of any Value (\ref frequency_count())
94 
95  * For SArray of type recursive/dict/array, additional sketch information is available:
96  * - element_length_summary() -- sketch summary of element length.
97 
98  * For SArray of type list, there is a sketch summary for all values inside the
99  * list element. Sketch summary flattens all list values and do a sketch summery
100  * over flattened values. Each value in list is casted to string for sketch summary.
101  * The summary can be retrieved by calling:
102  * - element_summary() -- sketch summary of all list elements
103 
104  * For SArray of type array(vector), there is a sketch summary for all values inside
105  * vector element. Sketch summary flattens all vector values and do a sketch summery
106  * over flattened values. The summary can be retrieved by calling:
107  * - element_summary() -- sketch summary of all vector elements
108 
109  * For SArray of type dict, additional sketch summary over the keys and values are
110  * provided. They can be retrieved by calling:
111  * - dict_key_summary() -- sketch summary of all keys in the dictionary
112  * - dict_value_summary() -- sketch summary of all values in the dictionary
113 
114  * For SArray of type dict, user can also pass in a list of dictionary keys to
115  * sketch_summary function, this would cause one sub sketch for each of the key.
116  * For example:
117  * >>> sketch = sa.sketch_summary(sub_sketch_keys=["a", "b"])
118  * Then the sub summary may be retrieved by:
119  * >>> sketch.element_sub_sketch()
120  * Or:
121  * >>> sketch.element_sub_sketch(["key1", "key2"])
122  * for subset of keys
123  *
124  * Similarly, for SArray of type vector(array), user can also pass in a list of
125  * integers which is the index into the vector to get sub sketch
126  * For example:
127  * >>> sketch = sa.sketch_summary(sub_sketch_keys=[1,3,5])
128  * Then the sub summary may be retrieved by:
129  * >>> sketch.element_sub_sketch()
130  * Or:
131  * >>> sketch.element_sub_sketch([1,3])
132  * for subset of keys
133  *
134  **/
135 
136 class unity_sketch: public unity_sketch_base {
137  public:
138 
139  static const double SKETCH_COMMIT_INTERVAL;
140 
141  inline unity_sketch() { }
142 
143  ~unity_sketch();
144 
145  /**
146  * Generates all the sketch statistics from an input SArray.
147  * If background is true, the sketch will be constructed in the background.
148  * While the sketch is being constructed in a background thread, queries can
149  * be executed on the sketch, but none of the quality guarantees will apply.
150  */
151  void construct_from_sarray(std::shared_ptr<unity_sarray_base> uarray, bool background = false, const std::vector<flexible_type>& keys = {});
152 
153  /**
154  * Returns true if the sketch is complete.
155  * If the sketch is constructed with background == false, this will always
156  * return true. If not the sketch is constructed using a background thread
157  * and this will return false until the sketch is ready.
158  */
159  bool sketch_ready();
160 
161  /**
162  * Returns the number of elements processed by the sketch is complete.
163  * If the sketch is constructed with background == false, this will always
164  * return the number of elements of the array. If the sketch is
165  * constructed using a background thread this may return a value between 0 and
166  * the length of the array.
167  */
168  size_t num_elements_processed();
169 
170  /**
171  * Returns a sketched estimate of the value at a particular quantile between
172  * 0.0 and 1.0. The quantile is guaranteed to be accurate within 1%:
173  * meaning that if you ask for the 0.55 quantile, the returned value is
174  * guaranteed to be between the true 0.54 quantile and the true
175  * 0.56 quantile. The quantiles are only defined for numeric arrays and
176  * this function will throw an exception if called on a sketch constructed
177  * for a non-numeric column.
178  */
179  double get_quantile(double quantile);
180 
181  /**
182  * Returns a sketched estimate of the number of occurances of a given
183  * element. This estimate is based on the count sketch. The element type
184  * must be of the same type as the input SArray; throws an exception otherwise.
185  */
186  double frequency_count(flexible_type value);
187 
188  /**
189  * Returns a sketched estimate of the most frequent elements in the SArray
190  * based on the SpaceSaving sketch. It is only guaranteed that all
191  * elements which appear in more than 0.01% (0.0001) rows of the array will
192  * appear in the set of returned elements. However, other elements may
193  * also appear in the result. The item counts are estimated using the
194  * CountSketch.
195  */
196  std::vector<std::pair<flexible_type, size_t> > frequent_items();
197 
198  /**
199  * Returns a sketched estimate of the number of unique values in the
200  * SArray based on the Hyperloglog sketch.
201  */
202  double num_unique();
203 
204  /**
205  ** Returns sketch summary for a given key in dictionary SArray sketch, or a given
206  ** index in SArray of vector
207  **
208  ** \param key is either an index into vector or a key in dictionary
209  **/
210  std::map<flexible_type, std::shared_ptr<unity_sketch_base>> element_sub_sketch(const std::vector<flexible_type>& keys);
211 
212  /**
213  * Returns element length sketch summary if the sarray is a list/vector/dict type
214  * raises exception otherwise
215  **/
216  std::shared_ptr<unity_sketch_base> element_length_summary();
217 
218  /**
219  * For SArray of array/list(recursive) type, returns the sketch summary for the list values
220  * the summary only works if element can be converted to string. Elements that cannot
221  * be converted to string will be ignored
222  **/
223  std::shared_ptr<unity_sketch_base> element_summary();
224 
225  /**
226  * For SArray of dictionary type, returns the sketch summary for the dictionary keys
227  * It only counts the keys if the key can be converted to string
228  **/
229  std::shared_ptr<unity_sketch_base> dict_key_summary();
230 
231  /**
232  * For SArray of dictionary type, returns the sketch summary for the dictionary values
233  * It only counts the values if the value can be converted to float
234  **/
235  std::shared_ptr<unity_sketch_base> dict_value_summary();
236 
237  /**
238  * Returns the mean of the values in the sarray. Returns 0 on an empty
239  * array. Throws an exception if called on an sarray with non-numeric
240  * type.
241  */
242  inline double mean() {
243  if (!m_is_numeric) log_and_throw("Mean value not available for a non-numeric column");
244  commit_global_if_out_of_date();
245  std::unique_lock<turi::mutex> global_lock(lock);
246  return m_numeric_sketch.mean;
247  }
248 
249  /**
250  * Returns the max of the values in the sarray. Returns NaN on an empty
251  * array. Throws an exception if called on an sarray with non-numeric
252  * type.
253  */
254  inline double max() {
255  if (!m_is_numeric) log_and_throw("Max value not available for a non-numeric column");
256  commit_global_if_out_of_date();
257  std::unique_lock<turi::mutex> global_lock(lock);
258  return m_numeric_sketch.max;
259  }
260 
261  /**
262  * Returns the min of the values in the sarray. Returns NaN on an empty
263  * array. Throws an exception if called on an sarray with non-numeric
264  * type.
265  */
266  inline double min() {
267  if (!m_is_numeric) log_and_throw("Min value not available for a non-numeric column");
268  commit_global_if_out_of_date();
269  std::unique_lock<turi::mutex> global_lock(lock);
270  return m_numeric_sketch.min;
271  }
272 
273  /**
274  * Returns the epsilon value used by the numeric sketch. Returns NaN on an
275  * empty array. Throws an exception if called on an sarray with non-numeric
276  * type.
277  */
278  inline double numeric_epsilon() {
279  if (!m_is_numeric) log_and_throw("Epsilon value not available for a non-numeric column");
280  commit_global_if_out_of_date();
281  std::unique_lock<turi::mutex> global_lock(lock);
282  return m_numeric_sketch.max;
283  }
284 
285  /**
286  * Returns the sum of the values in the sarray. Returns 0 on an empty
287  * array. Throws an exception if called on an sarray with non-numeric
288  * type.
289  */
290  inline double sum() {
291  if (!m_is_numeric) log_and_throw("Sum value not available for a non-numeric column");
292  commit_global_if_out_of_date();
293  std::unique_lock<turi::mutex> global_lock(lock);
294  return m_numeric_sketch.sum;
295  }
296  /**
297  * Returns the variance of the values in the sarray. Returns 0 on an empty
298  * array. Throws an exception if called on an sarray with non-numeric
299  * type.
300  */
301  inline double var() {
302  if (!m_is_numeric) log_and_throw("Sum value not available for a non-numeric column");
303  commit_global_if_out_of_date();
304  std::unique_lock<turi::mutex> global_lock(lock);
305  if (m_numeric_sketch.num_items == 0) return 0.0;
306  return m_numeric_sketch.m2/m_numeric_sketch.num_items;
307  }
308 
309  /**
310  * Returns the number of elements in the input SArray.
311  */
312  inline size_t size() {
313  return m_size;
314  }
315 
316  /**
317  * Returns the number of undefined elements in the input SArray.
318  */
319  inline size_t num_undefined() {
320  commit_global_if_out_of_date();
321  std::unique_lock<turi::mutex> global_lock(lock);
322  return m_undefined_count;
323  }
324 
325  /**
326  * Cancels any ongoing sketch computation.
327  */
328  void cancel();
329 
330  private:
331 
332  // formula from
333  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Incremental_Algorithm
334  struct numeric_sketch_struct {
335  std::shared_ptr<sketches::streaming_quantile_sketch<double>> quantiles;
336  double min = std::numeric_limits<double>::max();
337  double max = std::numeric_limits<double>::lowest();
338  double sum = 0.0;
339  double mean = 0.0;
340  size_t num_items = 0;
341  double m2;
342  double epsilon = -1;
343 
344  void reset();
345 
346  void combine(const numeric_sketch_struct& other);
347 
348  void accumulate(double dval);
349 
350  void finalize();
351  };
352 
353  struct discrete_sketch_struct {
354  std::shared_ptr<sketches::countsketch<flexible_type> > count;
355  std::shared_ptr<sketches::space_saving_flextype> frequent;
356  std::shared_ptr<sketches::hyperloglog> unique;
357 
358  void reset();
359 
360  void accumulate(const flexible_type& val);
361 
362  void combine(const discrete_sketch_struct& other);
363  };
364 
365  // lock on the global values
366  turi::mutex lock;
367 
368  bool m_is_child_sketch = false;
369  // for child sketch to remember whether parent sketch is ready
370  bool m_sketch_ready = false;
371 
372  // The values which are set on construction and never changed
373  bool m_is_numeric = false;
374  bool m_is_list = false;
375  double m_size = 0;
376  flex_type_enum m_stored_type;
377  flex_type_enum m_dict_value_sketch_type;
378 
379  // the sketches
380  discrete_sketch_struct m_discrete_sketch;
381 
382  // statistics
383  numeric_sketch_struct m_numeric_sketch;
384  size_t m_undefined_count = 0;
385  size_t m_num_elements_processed = 0;
386 
387  turi::atomic<size_t> m_rows_processed_by_threads;
388 
389  // make thread local version of all the sketches
390  struct thr_local_data {
391  discrete_sketch_struct discrete_sketch;
392  numeric_sketch_struct numeric_sketch;
393  size_t undefined_count = 0;
394  size_t num_elements_processed = 0;
395  };
396 
397  std::vector<thr_local_data> m_thrlocal;
398  std::vector<turi::mutex> m_thrlocks;
399 
400  atomic<bool> m_cancel = false;
401  std::future<void> m_background_future;
402  turi::timer m_commit_timer;
403 
404  // for vector/list/dict type, maintain a separate sketch for element length
405  std::shared_ptr<unity_sketch> m_element_len_sketch;
406  std::shared_ptr<unity_sketch> m_element_sketch;
407 
408  // special sketch for dictionary key and value
409  std::shared_ptr<unity_sketch> m_dict_key_sketch;
410  std::shared_ptr<unity_sketch> m_dict_value_sketch;
411 
412  // for vector/dict, allow user to specify a subset keys/index to grab subsketch
413  std::map<flexible_type, std::shared_ptr<unity_sketch>> m_element_sub_sketch;
414 
415  /*
416  * Resets the global sketches and statistics. This function does not
417  * acquire locks. The caller must acquire the global lock if necessary.
418  */
419  void reset_global_sketches_and_statistics();
420  void commit_global_if_out_of_date();
421 
422  // create a clone of current sketch object
423  unity_sketch(std::shared_ptr<unity_sketch> src, bool sketch_ready) {
424  m_sketch_ready = sketch_ready;
425  m_is_numeric = src->m_is_numeric;
426  m_stored_type = src->m_stored_type;
427  m_is_list = src->m_is_list;
428  m_dict_value_sketch_type = src->m_dict_value_sketch_type;
429  m_size = src->m_size;
430  m_numeric_sketch = src->m_numeric_sketch;
431  m_discrete_sketch = src->m_discrete_sketch;
432  m_undefined_count = src->m_undefined_count;
433  m_num_elements_processed = src->m_num_elements_processed;
434  m_rows_processed_by_threads = src->m_rows_processed_by_threads;
435  m_is_child_sketch = src->m_is_child_sketch;
436  DASSERT_TRUE(m_is_child_sketch);
437  }
438 
439  inline void init(
440  unity_sketch* parent,
441  flex_type_enum type,
442  const std::unordered_set<flexible_type>& keys = std::unordered_set<flexible_type>(),
443  std::shared_ptr<sarray<flexible_type>::reader_type> reader = NULL);
444 
445  inline void combine_global(std::vector<turi::mutex>& thr_locks);
446  inline void accumulate_dict_value(const flexible_type& dict_val, size_t thr, const std::unordered_set<flexible_type>& keys);
447  inline void accumulate_vector_value(const flexible_type& vect_val, size_t thr, const std::unordered_set<flexible_type>& keys);
448  inline void accumulate_list_value(const flexible_type& rec_val, size_t thr);
449  inline void accumulate_one_value(size_t thr, const flexible_type& val, const std::unordered_set<flexible_type>& keys = std::unordered_set<flexible_type>());
450  inline void accumulate_discrete_sketch(size_t thr, const flexible_type& val) {
451  m_thrlocal[thr].discrete_sketch.accumulate(val);
452  }
453 
454  inline void accumulate_numeric_sketch(size_t thr, const flexible_type& val) {
455  m_thrlocal[thr].numeric_sketch.accumulate((double)val);
456  }
457 
458  inline void empty_sketch() {
459  m_numeric_sketch.mean = 0;
460  m_numeric_sketch.min = NAN;
461  m_numeric_sketch.max = NAN;
462  m_numeric_sketch.sum = 0;
463  m_numeric_sketch.m2 = 0;
464  m_numeric_sketch.num_items = 0;
465  m_numeric_sketch.epsilon = NAN;
466  }
467 
468  inline void increase_nested_element_count(unity_sketch& nested_sketch, size_t thr, size_t count) {
469  nested_sketch.m_thrlocal[thr].num_elements_processed += count;
470  nested_sketch.m_rows_processed_by_threads.inc(count);
471  }
472 
473  inline flex_type_enum infer_dict_value_type(std::shared_ptr<sarray<flexible_type>::reader_type> reader);
474 };
475 } // namespace turi
std::set< Key > keys(const std::map< Key, T > &map)
Definition: stl_util.hpp:358
A simple class that can be used for benchmarking/timing up to microsecond resolution.
Definition: timer.hpp:59
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364