6 #ifndef TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP 7 #define TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP 9 #include <ml/sketches/quantile_sketch.hpp> 10 #include <core/logging/assertions.hpp> 11 #include <core/storage/serialization/serialization_includes.hpp> 102 template <
typename T,
typename Comparator>
103 class streaming_quantile_sketch {
112 init(epsilon, comparator);
121 void init(
double epsilon,
const Comparator& comparator) {
122 m_comparator = comparator;
124 m_elements_inserted = 0;
127 m_initial_sketch_size = std::max<size_t>(1, 1.0 / m_epsilon);
129 m_levels[0].init(m_initial_sketch_size, m_epsilon / 3.0, comparator);
132 m_final.init(m_initial_sketch_size, m_epsilon, comparator);
140 for (
size_t i = 0; i < m_levels.size(); ++i) {
141 sum += m_levels[i].sketch_size();
143 sum += m_final.sketch_size();
153 for (
size_t i = 0; i < m_levels.size(); ++i) {
154 sum += m_levels[i].memory_usage();
156 sum += m_final.memory_usage();
166 size_t curlevel = m_levels.size() - 1;
167 if (m_levels[curlevel].
size() >= (m_initial_sketch_size << curlevel)) {
171 m_levels[curlevel].init(m_initial_sketch_size << curlevel, m_epsilon / 3.0, m_comparator);
173 m_levels[curlevel].add(t);
174 ++m_elements_inserted;
183 m_final.init(m_elements_inserted, 2 * m_epsilon / 3.0, m_comparator);
185 for (
size_t i = 0;i < m_levels.size(); ++i) {
186 m_levels[i].finalize();
191 for (
size_t i = 0;i < m_levels.size(); ++i) {
192 m_final.m_query = m_final.merge(m_final.m_query, m_levels[i].m_query);
197 m_final.compress(m_final.m_query, 2 * m_epsilon / 3.0);
199 m_final.m_elements_inserted = m_elements_inserted;
208 return m_elements_inserted;
235 return m_final.query(rank);
255 return m_final.query_quantile(quantile);
287 return m_final.fast_query(rank);
312 return m_final.fast_query_quantile(quantile);
324 m_final.init(m_elements_inserted, m_epsilon / 3.0, m_comparator);
326 for (
size_t i = 0;i < m_levels.size(); ++i) {
327 m_levels[i].finalize();
332 for (
size_t i = 0;i < m_levels.size(); ++i) {
333 m_final.m_query = m_final.merge(m_final.m_query, m_levels[i].m_query);
335 m_final.compress(m_final.m_query, m_epsilon / 3.0);
337 m_final.m_elements_inserted = m_elements_inserted;
378 if (m_elements_inserted == 0) {
379 m_final = other.m_final;
381 m_final.m_query = m_final.merge(m_final.m_query, other.m_final.m_query);
383 m_elements_inserted += other.m_elements_inserted;
391 m_final.compress(m_final.m_query, m_epsilon / 3.0);
392 m_final.m_elements_inserted = m_elements_inserted;
393 m_final.m_epsilon = m_epsilon;
396 double get_epsilon()
const {
401 oarc << m_epsilon << m_elements_inserted
402 << m_initial_sketch_size << m_levels << m_final;
405 iarc >> m_epsilon >> m_elements_inserted
406 >> m_initial_sketch_size >> m_levels >> m_final;
410 double m_epsilon = 0.01;
411 size_t m_elements_inserted = 0;
415 size_t m_initial_sketch_size = 16;
416 std::vector<quantile_sketch<T, Comparator> > m_levels;
419 Comparator m_comparator;
424 #endif // TURI_SKETCH_STREAMING_QUANTILE_SKETCH_HPP
T fast_query(size_t rank)
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
void combine(streaming_quantile_sketch other)
T query_quantile(double quantile)
void init(double epsilon, const Comparator &comparator)
void substream_finalize()
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
streaming_quantile_sketch(double epsilon=0.005, const Comparator &comparator=Comparator())
T fast_query_quantile(double quantile)