Turi Create  4.0
streaming_quantile_sketch.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP
7 #define TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP
8 #include <vector>
9 #include <ml/sketches/quantile_sketch.hpp>
10 #include <core/logging/assertions.hpp>
11 #include <core/storage/serialization/serialization_includes.hpp>
12 namespace turi {
13 namespace sketches {
14 
15 /**
16  * \ingroup sketching
17  * This class implements the *streaming* quantile sketching datastructure as
18  * described in Qi Zhang and Wei Wang. A Fast Algorithm for Approximate
19  * Quantiles in High Speed Data Streams. SSDBM 2007.
20  *
21  * This sketch is basically a streaming version of the \ref quantile_sketch
22  *
23  * It constructs a low memory sketch from a stream of unknown length.
24  * To summarize the basic idea, it draws samples from the stream and maintains
25  * for each sample a lower bound and an upper bound on its rank in the stream.
26  *
27  * Usage:
28  * \code
29  * // construct a sketch at a particular accuracy
30  * quantile_sketch<double> sketch(0.01);
31  * ...
32  * insert any number elements into the sketch
33  * sketch.add(value)
34  * ...
35  * sketch.finalize() // finalize must be called before queries can be made
36  * \endcode
37  *
38  * To query, two family of functions are provided.
39  *
40  * The regular query functions take linear time in the size of the sketch, and
41  * have accuracy guarantees
42  * \code
43  * sketch.query(rank) // rank is between [0, N-1]
44  * sketch.query_quantile(quantile) // quantile is between [0, 1.0]
45  * \endcode
46  *
47  * The fast query functions take logarithmic time in the size of the sketch,
48  * and have no accuracy guarantees.
49  * \code
50  * sketch.fast_query(rank) // rank is between [0, N-1]
51  * sketch.fast_query_quantile(quantile) // quantile is between [0, 1.0]
52  * \endcode
53  *
54  * The sketch structure also support parallel sketching:
55  *
56  * Example:
57  * If I have 16 data streams, I can construct 16 streaming_quantile_sketches:
58  *
59  * Each sketch can then be ran on each data stream independently.
60  * Then all can be combined into one. Note that the process for doing this on
61  * the streaming_quantile_sketch is somewhat more complicated. It involves a
62  * partial finalization of each substream (\ref sub_stream_finalize() ), before
63  * combining. And after combining, a different finalization call must be made
64  * (\ref combine_finalize())
65  *
66  * \code
67  * // add stuff to each sub stream
68  * for each sub-stream sketch:
69  * sub-stream-sketch.add(...)
70  *
71  * // pre-finalize each sub stream
72  * for each sub-stream sketch:
73  * sub-stream-sketch.substream_finalize();
74  *
75  * // combine into one sketch
76  * quantile_sketch sketch(0.01)
77  * for each sub-stream sketch:
78  * sketch.combine(sub-stream-sketch)
79  *
80  * // finalize the main sketch
81  * sketch.combine_finalize()
82  * \endcode
83  *
84  * \tparam Type to contain in the sketch. type must be less-than-comparable,
85  * constructible, copy constructible, assignable.
86  *
87  * Technical Details
88  * -----------------
89  * We create a sequence of fixed length sketches, with each subsequent sketch
90  * sized to sketch twice the amount of data as the previous one.
91  * Each sketch is built to have a final finalized error of \epsilon / 3.
92  *
93  * If no sub-streams are used, the finalize() phase will merge all sketches
94  * and recompress to introduce an additional error of (2 * \epsilon / 3)
95  *
96  * If sub-streams are used, the sub-stream finalize will merge all the
97  * sub-stream sketches to introduce an additional error of (\epsilon / 3).
98  * Then each of the substream sketches are merged into the final sketch and
99  * compressed again to introduce an additional error of (\epsilon / 3)
100  *
101  */
102 template <typename T, typename Comparator>
103 class streaming_quantile_sketch {
104  public:
105 
106  /**
107  * Constructs a quantile_sketch with a desired target number of elements,
108  * and a desired accuracy. See init() for details.
109  */
110  explicit streaming_quantile_sketch(double epsilon = 0.005, const Comparator& comparator = Comparator())
111  {
112  init(epsilon, comparator);
113  }
114 
115  /**
116  * Reinitializes the quantile sketch, resetting it.
117  * epsilon is the desired accuracy of the quantiles.
118  *
119  * \param epsilon The desired accuracy
120  */
121  void init(double epsilon, const Comparator& comparator) {
122  m_comparator = comparator;
123  m_epsilon = epsilon;
124  m_elements_inserted = 0;
125  m_levels.clear();
126  m_levels.resize(1);
127  m_initial_sketch_size = std::max<size_t>(1, 1.0 / m_epsilon);
128  // we construct each intermediate sketch with epsilon / 3.0 error
129  m_levels[0].init(m_initial_sketch_size, m_epsilon / 3.0, comparator);
130  // init the final sketch to clear it. Really doesn't matter what
131  // size we init it with at this point. We just need it to be initted
132  m_final.init(m_initial_sketch_size, m_epsilon, comparator);
133  }
134 
135  /**
136  * Returns the number of elements stored in the sketch
137  */
138  size_t sketch_size() {
139  size_t sum = 0;
140  for (size_t i = 0; i < m_levels.size(); ++i) {
141  sum += m_levels[i].sketch_size();
142  }
143  sum += m_final.sketch_size();
144  return sum;
145  }
146 
147 
148  /**
149  * Returns the approximate current memory usage of the sketch in bytes.
150  */
151  size_t memory_usage() {
152  size_t sum = 0;
153  for (size_t i = 0; i < m_levels.size(); ++i) {
154  sum += m_levels[i].memory_usage();
155  }
156  sum += m_final.memory_usage();
157  return sum;
158  }
159 
160  /**
161  * Inserts a value into the sketch. Not safe to use in parallel.
162  */
163  void add(T t) {
164  // maximum size of level i is m_initial_sketch_size * 2^i
165  // are we full?
166  size_t curlevel = m_levels.size() - 1;
167  if (m_levels[curlevel].size() >= (m_initial_sketch_size << curlevel)) {
168  // we are full. make a new level
169  m_levels.push_back(quantile_sketch<T, Comparator>());
170  ++curlevel;
171  m_levels[curlevel].init(m_initial_sketch_size << curlevel, m_epsilon / 3.0, m_comparator);
172  }
173  m_levels[curlevel].add(t);
174  ++m_elements_inserted;
175  }
176 
177  /**
178  * Finalizes the sketch datastructure.
179  * Must be called before queries are made. Once finalize() is called,
180  * elements may no longer be inserted.
181  */
182  void finalize() {
183  m_final.init(m_elements_inserted, 2 * m_epsilon / 3.0, m_comparator);
184  m_final.finalize();
185  for (size_t i = 0;i < m_levels.size(); ++i) {
186  m_levels[i].finalize();
187  }
188  // we need intrusive access to the sketch. Basically, we will merge
189  // all the sketch data into one.
190 
191  for (size_t i = 0;i < m_levels.size(); ++i) {
192  m_final.m_query = m_final.merge(m_final.m_query, m_levels[i].m_query);
193  }
194  // since each intermediate sketch was constructed with epsilon / 2.0 error,
195  // we can compress the final sketch with epsilon / 2.0 error to
196  // make up a sketch with epsilon error.
197  m_final.compress(m_final.m_query, 2 * m_epsilon / 3.0);
198 
199  m_final.m_elements_inserted = m_elements_inserted;
200 
201  m_levels.clear();
202  }
203 
204  /**
205  * Returns the number of elements inserted into the sketch.
206  */
207  size_t size() const {
208  return m_elements_inserted;
209  }
210 
211  /**
212  * Queries for the value at rank k of all the elements inserted.
213  * Assuming N elements are inserted. Rank 0 will be the minimum value,
214  * rank N-1, the maximum value.
215  *
216  * Note that this rank is in relation to the total number of elements
217  * inserted. For instance,
218  * \code
219  * quantile_sketch<double> sketch(1000);
220  * ...
221  * insert 10,000 elements into the sketch
222  * \endcode
223  * sketch.query(9999) will be the maximum.
224  *
225  * \note finalize() must be called prior to using this function.
226  *
227  * \note The complexity of this function can be logarithmic in the sketch size
228  * in the best case (appears to be all the time, empirically), or linear in
229  * the worst case. Essentially, it first tests the result of the fast_query
230  * to see if that fits the bound requirements. If it does not, it takes a full
231  * linear search. The fast_query seems to work pretty much all the time on
232  * randomized inputs of varying distributions.
233  */
234  T query(size_t rank) {
235  return m_final.query(rank);
236  }
237 
238 
239  /**
240  * Returns the value at a quantile. For instance:
241  * \code
242  * sketch.query_quantile(0.01) // returns the 1% quantile
243  * sketch.query_quantile(0.50) // returns the median
244  * sketch.query_quantile(1.00) // returns the max
245  * \endcode
246  *
247  * quantile values below 0 will be interpreted as 0 (hence the minimum).
248  * quantile values above 1 will be interpreted as 1 (hence the maximum).
249  *
250  * \note finalize() must be called prior to using this function.
251  *
252  * \note The complexity of this function is linear in the sketch size.
253  */
254  T query_quantile(double quantile) {
255  return m_final.query_quantile(quantile);
256  }
257 
258  /**
259  * Quickly queries for the value at rank k of all the elements inserted.
260  * Assuming N elements are inserted. Rank 0 will be the minimum value,
261  * rank N-1, the maximum value.
262  *
263  * Note that this rank is in relation to the total number of elements
264  * inserted. For instance,
265  * \code
266  * quantile_sketch<double> sketch(1000);
267  * ...
268  * insert 10,000 elements into the sketch
269  * \endcode
270  * sketch.fast_query(9999) will be the maximum.
271  *
272  * Unlike query(), this function is never guaranteed to provide an
273  * epsilon error bound. Instead, this function assumes that each element in
274  * the sketch has a point estimate of its rank (the center of its rmin/rmax
275  * range), and looks for the element closest to the desired rank.
276  *
277  * \note finalize() must be called prior to using this function.
278  *
279  * \note The complexity of this function can be logarithmic in the sketch size
280  * in the best case (appears to be all the time, empirically), or linear in
281  * the worst case. Essentially, it first tests the result of the fast_query
282  * to see if that fits the bound requirements. If it does not, it takes a full
283  * linear search. The fast_query seems to work pretty much all the time on
284  * randomized inputs of varying distributions.
285  */
286  T fast_query(size_t rank) {
287  return m_final.fast_query(rank);
288  }
289 
290 
291  /**
292  * Quickly returns the value at a quantile. For instance:
293  * \code
294  * sketch.fast_query_quantile(0.01) // returns the 1% quantile
295  * sketch.fast_query_quantile(0.50) // returns the median
296  * sketch.fast_query_quantile(1.00) // returns the max
297  * \endcode
298  *
299  * quantile values below 0 will be interpreted as 0 (hence the minimum).
300  * quantile values above 1 will be interpreted as 1 (hence the maximum).
301  *
302  * Unlike query_quantile(), this function is never guaranteed to provide an
303  * epsilon error bound. Instead, this function assumes that each element in
304  * the sketch has a point estimate of its rank (the center of its rmin/rmax
305  * range), and looks for the element closest to the desired rank.
306  *
307  * \note finalize() must be called prior to using this function.
308  *
309  * \note The complexity of this function is logarithmic in the sketch size.
310  */
311  T fast_query_quantile(double quantile) {
312  return m_final.fast_query_quantile(quantile);
313  }
314 
315 
316  /**
317  * Finalizes the sketch datastructure, in preparation for combining
318  * with other streaming quantile sketches.
319  * Once substream_finalize() is called, elements may no longer be inserted.
320  *
321  * \see combine_finalize() combine()
322  */
324  m_final.init(m_elements_inserted, m_epsilon / 3.0, m_comparator);
325  m_final.finalize();
326  for (size_t i = 0;i < m_levels.size(); ++i) {
327  m_levels[i].finalize();
328  }
329  // we need intrusive access to the sketch. Basically, we will merge
330  // all the sketch data into one.
331 
332  for (size_t i = 0;i < m_levels.size(); ++i) {
333  m_final.m_query = m_final.merge(m_final.m_query, m_levels[i].m_query);
334  }
335  m_final.compress(m_final.m_query, m_epsilon / 3.0);
336 
337  m_final.m_elements_inserted = m_elements_inserted;
338 
339  m_levels.clear();
340  }
341 
342  /**
343  * Merges two substream_finalized quantile_sketch into one quantile sketch.
344  * Both this and the other quantile sketch must be substream_finalized().
345  * This allows two quantile sketches to be generated on two disjoint streams
346  * of data, and then combined at the end.
347  *
348  * Example:
349  * If I have 16 data streams, I can construct 16 quantile_sketches,
350  *
351  * Each sketch can then be ran on each data stream independently.
352  * Then all can be combined into one:
353  *
354  * \code
355  * // add stuff to each sub stream
356  * for each sub-stream sketch:
357  * sub-stream-sketch.add(...)
358  *
359  * // pre-finalize each sub stream
360  * for each sub-stream sketch:
361  * sub-stream-sketch.substream_finalize();
362  *
363  * // combine into one sketch
364  * quantile_sketch sketch(0.01)
365  * for each sub-stream sketch:
366  * sketch.combine(sub-stream-sketch)
367  *
368  * // finalize the main sketch
369  * sketch.combine_finalize()
370  * \endcode
371  *
372  * \note The predictions produced by the combined sketch is not generally
373  * going to be the same as the sequentially produced sketch
374  *
375  * \see combine_finalize() substream_finalize()
376  */
378  if (m_elements_inserted == 0) {
379  m_final = other.m_final;
380  } else {
381  m_final.m_query = m_final.merge(m_final.m_query, other.m_final.m_query);
382  }
383  m_elements_inserted += other.m_elements_inserted;
384  }
385 
386  /**
387  * Stops combining, and finishes the final sketch making it available for
388  * querying. \see combine() substream_finalize()
389  */
391  m_final.compress(m_final.m_query, m_epsilon / 3.0);
392  m_final.m_elements_inserted = m_elements_inserted;
393  m_final.m_epsilon = m_epsilon;
394  }
395 
396  double get_epsilon() const {
397  return m_epsilon;
398  }
399 
400  void save(oarchive& oarc) const {
401  oarc << m_epsilon << m_elements_inserted
402  << m_initial_sketch_size << m_levels << m_final;
403  }
404  void load(iarchive& iarc) {
405  iarc >> m_epsilon >> m_elements_inserted
406  >> m_initial_sketch_size >> m_levels >> m_final;
407  }
408 
409  private:
410  double m_epsilon = 0.01; // desired accuracy
411  size_t m_elements_inserted = 0;
412 
413  // basically, we create a series of exponentially larger sketches.
414  // starting at something sane... like 16.
415  size_t m_initial_sketch_size = 16;
416  std::vector<quantile_sketch<T, Comparator> > m_levels;
418 
419  Comparator m_comparator;
420 };
421 
422 } // namespace sketch
423 } // namespace turi
424 #endif // TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
void combine(streaming_quantile_sketch other)
void init(double epsilon, const Comparator &comparator)
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
streaming_quantile_sketch(double epsilon=0.005, const Comparator &comparator=Comparator())