Turi Create  4.0
quantile_sketch.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SKETCH_QUANTILE_SKETCH_HPP
7 #define TURI_SKETCH_QUANTILE_SKETCH_HPP
8 #include <cmath>
9 #include <iostream>
10 #include <vector>
11 #include <algorithm>
12 #include <tuple>
13 #include <core/logging/assertions.hpp>
14 #include <core/storage/serialization/serialization_includes.hpp>
15 #include <core/util/basic_types.hpp>
16 
17 namespace turi {
18 namespace sketches {
19 
20 //forward decl of streaming_quantile_sketch
21 template <typename T, typename Comparator = std::less<T>>
23 
24 /**
25  * \ingroup sketching
26  * This class implements a quantile sketching datastructure as described in
27  * Qi Zhang and Wei Wang. A Fast Algorithm for Approximate Quantiles in
28  * High Speed Data Streams. SSDBM 2007.
29  *
30  * It constructs a low memory sketch from a stream of known length.
31  * To summarize the basic idea, it draws samples from the stream and maintains
32  * for each sample a lower bound and an upper bound on its rank in the stream.
33  *
34  * The known length condition is not strictly true. While the sketch is
35  * constructed for particular stream length N, this datastructure does not
36  * limit the number of elements can be inserted, but then in which case the
37  * accuracy guarantee no longer applies.
38  *
39  * Usage is simple:
40  * \code
41  * // construct sketch at a particular size
42  * quantile_sketch<double> sketch(100000, 0.01);
43  * ...
44  * insert any number elements into the sketch using
45  * quality guarantees only apply when <100000 elements are inserted.
46  * sketch.add(value)
47  * ...
48  * sketch.finalize() // finalize must be called before queries can be made
49  * \endcode
50  *
51  * To query, two family of functions are provided.
52  *
53  * The regular query functions take linear time in the size of the sketch, and
54  * have accuracy guarantees when <=desired_n elements are inserted (100000
55  * elements in the above example)
56  * \code
57  * sketch.query(rank) // rank is between [0, N-1]
58  * sketch.query_quantile(quantile) // quantile is between [0, 1.0]
59  * \endcode
60  *
61  * The fast query functions take logarithmic time in the size of the sketch,
62  * and have no accuracy guarantees.
63  * \code
64  * sketch.fast_query(rank) // rank is between [0, N-1]
65  * sketch.fast_query_quantile(quantile) // quantile is between [0, 1.0]
66  * \endcode
67  *
68  * The sketch structure also support parallel sketching:
69  *
70  * Example:
71  * If I have 16 data streams, each of size N/16
72  * I can construct 16 quantile_sketches, each initialized with
73  * desired_n = N (note. N here! not N/16)
74  *
75  * Each sketch can then be ran on each data stream independently.
76  * Then all can be combined into one:
77  *
78  * \code
79  * quantile_sketch sketch(N)
80  * for each sub-stream sketch:
81  * sketch.combine(sub-stream-sketch)
82  * sketch.finalize()
83  * \endcode
84  *
85  * \tparam Type to contain in the sketch. type must be less-than-comparable,
86  * constructible, copy constructible, assignable.
87  *
88  * Technical Details
89  * -----------------
90  * The basic mechanic of the sketch lies in making a hierarchy of sketches, each
91  * having size m_b. The first sketch has error 0. The second sketch has error
92  * 1/b, the 3rd sketch has error 2/b and so on. These sketches are treated
93  * like a binary sequence:
94  * - When a buffer of size b has been accumulated, it gets sorted and
95  * inserted into sketch 1
96  * - If sketch 1 exists, the two are merged (no increase in error) and
97  * compressed (increase error by 1/b) and the result tries to get placed as
98  * sketch 2.
99  * - If sketch 2 exists, this repeats.
100  *
101  * The procedure thus seem like binary addition.
102  *
103  * By picking the size b appropriately for a given value of N and \epsilon, At
104  * the end, all the sketches can be merged to get a sketch of error \epsilon.
105  *
106  * Here is where we deviate from the paper slightly. The "compress" procedure
107  * has the advantage of decreasing the total amount of memory required,
108  * By finishing/finalizing with a simple merge, means that the resultant array
109  * could be much bigger than desired. The way the binary addition works, also
110  * means the resultant sketch size is somewhat unpredictable.
111  * Thus here lies the modification.
112  *
113  * We compute the sketch as usual, but sizing "b" to end up with a final merged
114  * error of \epsilon / 2. We then perform one addition compress(\epsilon/2)
115  * phase to compress the final sketch into requiring only O(1/epsilon) memory.
116  */
117 template <typename T, typename Comparator = std::less<T>>
119  public:
120 
121  /**
122  * Default constructor. Does nothing. init() MUST be called before
123  * any operations are performed.
124  */
126 
127  /**
128  * Constructs a quantile_sketch with a desired target number of elements,
129  * and a desired accuracy. See init() for details.
130  */
131  explicit quantile_sketch(size_t desired_n, double epsilon = 0.005, const Comparator& comparator = Comparator()) {
132  init(desired_n, epsilon, comparator);
133  }
134 
135  /**
136  * Initializes the quantile sketch, resetting it.
137  * epsilon is the desired accuracy of the quantiles.
138  * An \epsilon-approximate quantile is an element in the data stream
139  * whose approximate rank r' is within [r-\epsilon n and r + \epsilon n]
140  * i.e. a 0.0001 approximate median in a 1,000,000 element array may have
141  * true rank [499,000, and 501,000].
142  *
143  * The runtime memory requirements of this sketch is
144  * O((1/\epsilon) (log \epsilon N)^2)
145  * The final memory requirements after finalization is O(1/\epsilon).
146  *
147  * To give a sense of the runtime memory requirements,
148  * at \epsilon = 0.01
149  * \verbatim
150  * N = 100,000: sketch has size = 45528 bytes
151  * N = 1,000,000: sketch has size = 96144 bytes
152  * N = 10,000,000: sketch has size = 123504 bytes
153  * N = 100,000,000: sketch has size = 316536 bytes
154  * \endverbatim
155  *
156  * (Increasing N by 10 roughly increases the sketch size by 2. Note that
157  * the growth function is a little wierd. this is due to the structure of the
158  * algorithm which maintains multi-level sketches, then during the final
159  * phase, just merging all those sketches together. This results in somewhat
160  * oddly behaved memory utilization as a function of N.)
161  *
162  * At N = 1,000,000
163  * \verbatim
164  * epsilon = 0.1: sketch has size = 12480 bytes
165  * epsilon = 0.01: sketch has size = 96144 bytes
166  * epsilon = 0.001: sketch has size = 442776 bytes
167  * epsilon = 0.0001: sketch has size = 3271440 bytes
168  * \endverbatim
169  *
170  * (decreasing epsilon by 10 roughly increases the sketch size by 5)
171  *
172  *
173  * \param desired_n An UPPER BOUND on the number of elements to be inserted,
174  * to guarantee epsilon accuracy. The epsilon accuracy guarantee is only true
175  * if you do not insert more than n elements.
176  * \param epsilon The desired accuracy
177  */
178  void init(size_t desired_n, double epsilon, const Comparator& comparator = Comparator()) {
179  m_comparator = element_less_than(comparator);
180  m_n = desired_n;
181  size_t l = (epsilon * m_n);
182  // if m_n or epsilon is too small, l could be < 2 then log2(l) implodes
183  // wondrously.
184  if (l == 0) l = 2;
185  m_b = 2 * std::floor(std::log2(l) / epsilon);
186  /*
187  * Note that the m_b formula here is twice what was recommended in the
188  * paper. This is an extension here. By sketching with half the error,
189  * we can introduce an addition \epsilon/2 compression stage at the end
190  * after finalization (which the original paper did not do) which allows
191  * the final memory utilization to be substantially lower.
192  */
193  // too small n... Lets store everything
194  if (m_b == 0) m_b = desired_n;
195  m_epsilon = epsilon;
196  m_elements_inserted = 0;
197 
198  m_levels.clear();
199  m_levels.resize(1);
200  m_query.clear();
201  }
202 
203  /**
204  * Returns the number of elements stored in the sketch
205  */
206  size_t sketch_size() {
207  size_t sum = m_query.size();
208  for (size_t i = 0; i < m_levels.size(); ++i) {
209  sum += m_levels[i].size();
210  }
211  return sum;
212  }
213 
214 
215  /**
216  * Returns the approximate current memory usage of the sketch in bytes.
217  */
218  size_t memory_usage() {
219  return sketch_size() * sizeof(element);
220  }
221 
222  /**
223  * Inserts a value into the sketch. Not safe to use in parallel.
224  */
225  void add(T t) {
226  // finalize not yet called
227  DASSERT_EQ(m_query.size(), 0);
228  // insert into level 0
229  m_levels[0].emplace_back(t);
230  ++m_elements_inserted;
231 
232  if (m_levels[0].size() == m_b) {
233  // once level 0 is full, insert into the multilevel sketch
234  sort_level_0();
235  compress(m_levels[0], 1.0 / m_b);
236  std::vector<element> sc = std::move(m_levels[0]);
237  m_levels[0].clear();
238  compact(sc);
239  }
240  }
241 
242  /**
243  * Finalizes the sketch datastructure.
244  * Must be called before queries are made. Once finalize() is called,
245  * elements may no longer be inserted.
246  */
247  void finalize() {
248  sort_level_0();
249  m_query = recursive_merge_of_all_levels(0, m_levels.size());
250  // we perform one more compression at the end to further reduce memory
251  // utilization
252  compress(m_query, m_epsilon / 2.0);
253  // sort m_query by the center of the rank range [r_min, r_max]
254  std::sort(m_query.begin(),
255  m_query.end(),
256  rank_center_comparator);
257  m_levels.clear();
258  m_n = m_elements_inserted;
259  }
260 
261  /**
262  * Returns the number of elements inserted into the sketch.
263  */
264  size_t size() const {
265  return m_elements_inserted;
266  }
267 
268  /**
269  * Queries for the value at rank k of all the elements inserted.
270  * Assuming N elements are inserted. Rank 0 will be the minimum value,
271  * rank N-1, the maximum value.
272  *
273  * Note that this rank is in relation to the total number of elements
274  * inserted. For instance,
275  * \code
276  * quantile_sketch<double> sketch(1000);
277  * ...
278  * insert 10,000 elements into the sketch
279  * \endcode
280  * sketch.query(9999) will be the maximum.
281  *
282  * \note finalize() must be called prior to using this function.
283  *
284  * \note This function is guaranteed to provide an epsilon accurate value if
285  * only the desired number of elements (as set in the constructor) are
286  * inserted. In all other cases, this may not.
287  *
288  * \note The complexity of this function can be logarithmic in the sketch size
289  * in the best case (appears to be all the time, empirically), or linear in
290  * the worst case. Essentially, it first tests the result of the fast_query
291  * to see if that fits the bound requirements. If it does not, it takes a full
292  * linear search. The fast_query seems to work pretty much all the time on
293  * randomized inputs of varying distributions.
294  */
295  T query(size_t rank) const {
296  if (m_query.size() == 0) return T();
297  // finalize called. I have a queryable value
298  ++rank;
299  // special handling for min and max
300  if (rank <= 1) {
301  return m_query[0].val;
302  }
303  if (rank >= m_elements_inserted) {
304  return m_query[m_query.size() - 1].val;
305  }
306  int lower_index = (int)rank - m_elements_inserted * m_epsilon;
307  int upper_index = (int)rank + m_elements_inserted * m_epsilon;
308  lower_index = std::max<int>(lower_index, 0);
309  // see if fast_query can give us a good answer now
310  auto fast_query_iter = fast_query_iterator(rank - 1);
311  static_assert(std::is_same<size_t, decltype(fast_query_iter->rmin)>::value,
312  "rmin expected to have type size_t");
313  static_assert(std::is_same<size_t, decltype(fast_query_iter->rmax)>::value,
314  "rmax expected to have type size_t");
315  if (truncate_check<int64_t>(fast_query_iter->rmin) >= lower_index &&
316  truncate_check<int64_t>(fast_query_iter->rmax) <= upper_index) {
317  return fast_query_iter->val;
318  }
319 
320  size_t tightest = -1;
321  size_t tightest_range = -1;
322  // the algorithm provided in the paper tries to find the element such that
323  // lower_index <= rmin rmax <= upper_index
324  // However, it is not entirely obvious from the algorithm, that such an
325  // element always exists. Furthermore, in the event that I have inserted
326  // less than, or greater than the required number of elements, it is not
327  // obvious that this will succeed either.
328  for (size_t i = 0;i < m_query.size(); ++i) {
329  static_assert(std::is_same<size_t, decltype(m_query[i].rmin)>::value,
330  "rmin expected to have type size_t");
331  static_assert(std::is_same<size_t, decltype(m_query[i].rmax)>::value,
332  "rmax expected to have type size_t");
333  if (truncate_check<int64_t>(m_query[i].rmin) >= lower_index &&
334  truncate_check<int64_t>(m_query[i].rmax) <= upper_index){
335  size_t center = (m_query[i].rmax + m_query[i].rmin) / 2;
336  if (std::fabs(center - rank) < tightest_range) {
337  tightest = i;
338  tightest_range = std::fabs(center - rank);
339  }
340  }
341  }
342  if (tightest == (size_t)(-1)){
343  // in the event that we are unable to find an element, we fall back
344  // to the fast query method.
345  return fast_query_iter->val;
346  } else {
347  return m_query[tightest].val;
348  }
349  }
350 
351 
352  /**
353  * Returns the value at a quantile. For instance:
354  * \code
355  * sketch.query_quantile(0.01) // returns the 1% quantile
356  * sketch.query_quantile(0.50) // returns the median
357  * sketch.query_quantile(1.00) // returns the max
358  * \endcode
359  *
360  * quantile values below 0 will be interpreted as 0 (hence the minimum).
361  * quantile values above 1 will be interpreted as 1 (hence the maximum).
362  *
363  * \note finalize() must be called prior to using this function.
364  *
365  * \note This function is guaranteed to provide an epsilon accurate value if
366  * only the desired number of elements (as set in the constructor) are
367  * inserted. In all other cases, this may not.
368  *
369  * \note The complexity of this function can be logarithmic in the sketch size
370  * in the best case (appears to be all the time, empirically), or linear in
371  * the worst case. Essentially, it first tests the result of the fast_query
372  * to see if that fits the bound requirements. If it does not, it takes a full
373  * linear search. The fast_query seems to work pretty much all the time on
374  * randomized inputs of varying distributions.
375  */
376  T query_quantile(double quantile) const {
377  if (quantile < 0) quantile = 0;
378  if (quantile > 1) quantile = 1;
379  return query(quantile * m_elements_inserted);
380  }
381 
382  /**
383  * Quickly queries for the value at rank k of all the elements inserted.
384  * Assuming N elements are inserted. Rank 0 will be the minimum value,
385  * rank N-1, the maximum value.
386  *
387  * Note that this rank is in relation to the total number of elements
388  * inserted. For instance,
389  * \code
390  * quantile_sketch<double> sketch(1000);
391  * ...
392  * insert 10,000 elements into the sketch
393  * \endcode
394  * sketch.fast_query(9999) will be the maximum.
395  *
396  * Unlike query(), this function is never guaranteed to provide an
397  * epsilon error bound. Instead, this function assumes that each element in
398  * the sketch has a point estimate of its rank (the center of its rmin/rmax
399  * range), and looks for the element closest to the desired rank.
400  *
401  * \note finalize() must be called prior to using this function.
402  *
403  * \note The complexity of this function is logarithmic in the sketch size.
404  */
405  T fast_query(size_t rank) const {
406  return fast_query_iterator(rank)->val;
407  }
408 
409 
410  /**
411  * Quickly returns the value at a quantile. For instance:
412  * \code
413  * sketch.fast_query_quantile(0.01) // returns the 1% quantile
414  * sketch.fast_query_quantile(0.50) // returns the median
415  * sketch.fast_query_quantile(1.00) // returns the max
416  * \endcode
417  *
418  * quantile values below 0 will be interpreted as 0 (hence the minimum).
419  * quantile values above 1 will be interpreted as 1 (hence the maximum).
420  *
421  * Unlike query_quantile(), this function is never guaranteed to provide an
422  * epsilon error bound. Instead, this function assumes that each element in
423  * the sketch has a point estimate of its rank (the center of its rmin/rmax
424  * range), and looks for the element closest to the desired rank.
425  *
426  * \note finalize() must be called prior to using this function.
427  *
428  * \note The complexity of this function is logarithmic in the sketch size.
429  */
430  T fast_query_quantile(double quantile) const {
431  if (quantile < 0) quantile = 0;
432  if (quantile > 1) quantile = 1;
433  return fast_query(quantile * m_elements_inserted);
434  }
435 
436  /**
437  * Merges two unfinalized quantile_sketch into one quantile sketch.
438  * Both this and the other quantile sketch must not be finalized().
439  * This allows two quantile sketches to be generated on two disjoint streams
440  * of data, and then combined at the end. For quality guarantees to apply,
441  * BOTH sketches must be constructed with the final number of elements.
442  *
443  * Example:
444  * If I have 16 data streams, each of size N/16
445  * I can construct 16 quantile_sketches, each initialized with
446  * desired_n = N (note. N here! not N/16)
447  *
448  * Each sketch can then be ran on each data stream independently.
449  * Then all can be combined into one:
450  *
451  * \code
452  * quantile_sketch sketch(N)
453  * for each sub-stream sketch:
454  * sketch.combine(sub-stream-sketch)
455  * sketch.finalize()
456  * \endcode
457  *
458  * \note The predictions produced by the combined sketch is not generally
459  * going to be the same as the sequentially produced sketch
460  */
461  void combine(const quantile_sketch& other) {
462  ASSERT_EQ(m_query.size(), 0);
463  ASSERT_EQ(other.m_query.size(), 0);
464  if (m_levels.size() < other.m_levels.size()) {
465  m_levels.resize(other.m_levels.size());
466  }
467  if (other.m_levels.size() > 0) {
468  // merge all levels above level 0
469  for (size_t i = 1; i < other.m_levels.size(); ++i) {
470  if (other.m_levels[i].size() > 0) {
471  // compact modifies sc
472  std::vector<element> sc = other.m_levels[i];
473  compact(sc, i);
474  }
475  }
476  // insert level 0 the regular way
477  for (size_t i = 0; i < other.m_levels[0].size(); ++i) {
478  add(other.m_levels[0][i].val);
479  }
480  // update the number of elements inserted.
481  // (note we have to substract off the level 0 elements inserted the
482  // regular way)
483  m_elements_inserted += other.m_elements_inserted - other.m_levels[0].size();
484  }
485  }
486 
487 
488  void save(oarchive& oarc) const {
489  oarc << m_n << m_b << m_elements_inserted << m_epsilon
490  << m_levels << m_query;
491  }
492  void load(iarchive& iarc) {
493  iarc >> m_n >> m_b >> m_elements_inserted >> m_epsilon
494  >> m_levels >> m_query;
495  }
496 
497  private:
498  /**
499  * The storage for each value. Contains the value, the lower bound on the
500  * rank, and the upper bound of the rank.
501  */
502  struct element: public IS_POD_TYPE {
503  element() {}
504  explicit element(T val)
505  :val(val), rmin(-1), rmax(-1) {}
506  explicit element(T val, size_t rmin, size_t rmax)
507  :val(val), rmin(rmin), rmax(rmax){}
508 
509  // returns the center of the rmin/rmax range
510  float rcenter() const {
511  return ((float)(rmin) + rmax) / 2;
512  }
513 
514  T val; // The value of the element
515  size_t rmin = -1; // The lower bound on the rank of the element
516  size_t rmax = -1; // The upper bound on the rank of the element
517  };
518 
519  struct element_less_than {
520  element_less_than() {};
521  element_less_than(const Comparator& comparator) {
522  m_comparator = comparator;
523  }
524 
525  bool operator()(const element& e1, const element& e2) {
526  return m_comparator(e1.val, e2.val);
527  }
528 
529  Comparator m_comparator;
530  };
531 
532 
533  size_t m_n = 0; // total number of elements expected
534  size_t m_b = 0; // size of each bucket
535  size_t m_elements_inserted = 0; // number of elements inserted so far
536  double m_epsilon = 0.01; // desired accuracy
537 
538  /// The multi-level sketch. Used when inserting into the sketch structure.
539  std::vector<std::vector<element> > m_levels;
540 
541  /** After the last element is inserted, the final sketch is
542  * sorted and constructed here.
543  */
544  std::vector<element> m_query;
545 
546  /* The comparator to sort elements */
547  element_less_than m_comparator;
548 
549  /**
550  * Sort the initial insertion buffer (level 0) and sets up its rmin/rmax so
551  * that it can be used in the rest of the sketch.
552  */
553  void sort_level_0() {
554  std::sort(m_levels[0].begin(), m_levels[0].end(), m_comparator);
555  for (size_t i = 0;i < m_levels[0].size(); ++i) {
556  // paper uses ranks from 1 to N. We unfortunately need to keep to this
557  // to ensure that the r_min computation and r_max computation in
558  // merge() works correctly and matches the paper
559  m_levels[0][i].rmin = i + 1;
560  m_levels[0][i].rmax = i + 1;
561  }
562  }
563 
564  /**
565  * Compresses the vector by introducing some amount of additional error.
566  */
567  void compress(std::vector<element>& vec, double additional_error) {
568  double b = 1.0 / additional_error;
569  size_t num_values = std::ceil(2.0 * b) + 1;
570  if (num_values < 2) num_values = 2; // we must get at least min and max
571  compress_to_size(vec, num_values);
572  }
573 
574  /**
575  * Decreases the size of a sketch by selecting a subset of elements
576  */
577  void compress_to_size(std::vector<element>& vec, size_t selection_size) {
578  if (selection_size >= vec.size()) return;
579  double step_size = (double)vec.size() / selection_size;
580  for (size_t i = 0;i < selection_size - 1; ++i) {
581  size_t targ = step_size * i;
582  if (targ >= vec.size()) targ = vec.size() - 1;
583  vec[i] = vec[targ];
584  }
585  vec[selection_size - 1] = vec[vec.size() - 1];
586  vec.resize(selection_size);
587  }
588 
589  /**
590  * Performs a "merge" on two sorted sketches, updating r_min / r_max as
591  * we go along.
592  */
593  std::vector<element> merge(std::vector<element>& left,
594  std::vector<element>& right) {
595  // trivial cases
596  if (left.size() == 0) return right;
597  if (right.size() == 0) return left;
598 
599  // output array
600  std::vector<element> ret(left.size() + right.size());
601  // left iteration index
602  size_t leftidx = 0;
603  // right iteration index
604  size_t rightidx = 0;
605  // output index
606  size_t outidx = 0;
607  // parallel iteration over left and right
608  while(leftidx < left.size() && rightidx < right.size()) {
609  if (m_comparator(left[leftidx], right[rightidx]) ||
610  (left[leftidx].val == right[rightidx].val)) {
611  // left is smaller
612  ret[outidx] = element(
613  std::move(left[leftidx].val),
614  left[leftidx].rmin + (rightidx > 0 ? right[rightidx - 1].rmin : 0),
615  left[leftidx].rmax + (right[rightidx].rmax > 0 ? right[rightidx].rmax - 1 : 0));
616  ++leftidx;
617  } else {
618  // right is smaller
619  ret[outidx] = element(
620  std::move(right[rightidx].val),
621  right[rightidx].rmin + (leftidx > 0 ? left[leftidx - 1].rmin : 0),
622  right[rightidx].rmax + (left[leftidx].rmax > 0 ? left[leftidx].rmax - 1 : 0));
623  ++rightidx;
624  }
625  ++outidx;
626  }
627  // right is finished. left is remaining
628  while(leftidx < left.size()) {
629  ret[outidx] = element(
630  left[leftidx].val,
631  // note that the rmax formula here is different
632  left[leftidx].rmin + right[rightidx - 1].rmin,
633  left[leftidx].rmax + right[rightidx - 1].rmax);
634  ++leftidx;
635  ++outidx;
636  }
637  // left is finished. right is remaining
638  while(rightidx < right.size()) {
639  ret[outidx] = element(
640  right[rightidx].val,
641  // note that the rmax formula here is different
642  right[rightidx].rmin + left[leftidx - 1].rmin,
643  right[rightidx].rmax + left[leftidx - 1].rmax);
644  ++rightidx;
645  ++outidx;
646  }
647  return ret;
648  }
649 
650  /**
651  * Inserts level 0 into the remainder of the multilevel sketch.
652  * Corresponds to algorithm 1 in the paper. Called "update".
653  *
654  * Essentially, attempts to insert sc into the starting level.
655  * If there is no sketch at that level, moves it in and returns.
656  * If there is a sketch at that level, merge + compresses it with the sketch
657  * at that level, clears the sketch at that level then attempts to insert
658  * the new sketch at the next higher level.
659  * (it sort of resembles binary addition)
660  */
661  void compact(std::vector<element>& sc, size_t starting_level = 1) {
662  for (size_t i = starting_level;i < m_levels.size(); ++i) {
663  if (m_levels[i].size() == 0) {
664  m_levels[i] = std::move(sc);
665  return;
666  } else {
667  sc = merge(sc, m_levels[i]);
668  compress(sc, 1.0 / m_b);
669  m_levels[i].clear();
670  }
671  }
672  m_levels.push_back(sc);
673  }
674 
675  /**
676  * Constructs the query array through a binary recursive merge of all
677  * levels of the multilevel sketch.
678  */
679  std::vector<element> recursive_merge_of_all_levels(size_t start, size_t end) {
680  if (end - start == 1) return std::move(m_levels[start]);
681  else if (end - start == 2) return merge(m_levels[start], m_levels[start + 1]);
682  else {
683  size_t midpoint = start + (end - start) / 2;
684  std::vector<element> left = recursive_merge_of_all_levels(start, midpoint);
685  std::vector<element> right = recursive_merge_of_all_levels(midpoint, end);
686  return merge(left, right);
687  }
688  }
689 
690  /**
691  * Compares two element objects by the center of the rmin/rmax range,
692  * returning true if the center of the left range, is lesser than the
693  * center of the right range.
694  */
695  static bool rank_center_comparator(const element& left,
696  const element& right) {
697  float center_left = left.rcenter();
698  float center_right = right.rcenter();
699  return center_left < center_right;
700  }
701 
702 
703 
704  typename std::vector<element>::const_iterator fast_query_iterator(size_t rank) const {
705  ++rank;
706  // special handling for min and max
707  if (rank <= 1) {
708  return m_query.begin();
709  }
710  if (rank >= m_elements_inserted) {
711  return m_query.begin() + (m_query.size() - 1);
712  }
713  element search_elem(T(), rank, rank);
714  // This will find the first element >= to the query
715  auto iter = std::lower_bound(m_query.begin(), m_query.end(),
716  search_elem, rank_center_comparator);
717  if (iter == m_query.end()) {
718  // at the end, must be the last element
719  return m_query.begin() + (m_query.size() - 1);
720  } else if (iter == m_query.begin()) {
721  // at the start, must be the first element
722  return iter;
723  } else {
724  // closest could be current, or one left.
725  auto leftiter = iter - 1;
726  float left_distance = std::fabs(leftiter->rcenter() - rank);
727  float right_distance = std::fabs(iter->rcenter() - rank);
728  return left_distance < right_distance ? leftiter : iter;
729  }
730  }
731 
732  friend class streaming_quantile_sketch<T, Comparator>;
733 };
734 
735 } // namespace sketch
736 } // namespace turi
737 #endif
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
T query_quantile(double quantile) const
void combine(const quantile_sketch &other)
Inheriting from this type will force the serializer to treat the derived type as a POD type...
Definition: is_pod.hpp:16
quantile_sketch(size_t desired_n, double epsilon=0.005, const Comparator &comparator=Comparator())
T fast_query_quantile(double quantile) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
void init(size_t desired_n, double epsilon, const Comparator &comparator=Comparator())