6 #ifndef TURI_SKETCH_QUANTILE_SKETCH_HPP 7 #define TURI_SKETCH_QUANTILE_SKETCH_HPP 13 #include <core/logging/assertions.hpp> 14 #include <core/storage/serialization/serialization_includes.hpp> 15 #include <core/util/basic_types.hpp> 21 template <
typename T,
typename Comparator = std::less<T>>
117 template <
typename T,
typename Comparator = std::less<T>>
131 explicit quantile_sketch(
size_t desired_n,
double epsilon = 0.005,
const Comparator& comparator = Comparator()) {
132 init(desired_n, epsilon, comparator);
178 void init(
size_t desired_n,
double epsilon,
const Comparator& comparator = Comparator()) {
179 m_comparator = element_less_than(comparator);
181 size_t l = (epsilon * m_n);
185 m_b = 2 * std::floor(std::log2(l) / epsilon);
194 if (m_b == 0) m_b = desired_n;
196 m_elements_inserted = 0;
207 size_t sum = m_query.size();
208 for (
size_t i = 0; i < m_levels.size(); ++i) {
209 sum += m_levels[i].size();
227 DASSERT_EQ(m_query.size(), 0);
229 m_levels[0].emplace_back(t);
230 ++m_elements_inserted;
232 if (m_levels[0].
size() == m_b) {
235 compress(m_levels[0], 1.0 / m_b);
236 std::vector<element> sc = std::move(m_levels[0]);
249 m_query = recursive_merge_of_all_levels(0, m_levels.size());
252 compress(m_query, m_epsilon / 2.0);
256 rank_center_comparator);
258 m_n = m_elements_inserted;
265 return m_elements_inserted;
296 if (m_query.size() == 0)
return T();
301 return m_query[0].val;
303 if (rank >= m_elements_inserted) {
304 return m_query[m_query.size() - 1].val;
306 int lower_index = (int)rank - m_elements_inserted * m_epsilon;
307 int upper_index = (int)rank + m_elements_inserted * m_epsilon;
308 lower_index = std::max<int>(lower_index, 0);
310 auto fast_query_iter = fast_query_iterator(rank - 1);
311 static_assert(std::is_same<
size_t, decltype(fast_query_iter->rmin)>::value,
312 "rmin expected to have type size_t");
313 static_assert(std::is_same<
size_t, decltype(fast_query_iter->rmax)>::value,
314 "rmax expected to have type size_t");
315 if (truncate_check<int64_t>(fast_query_iter->rmin) >= lower_index &&
316 truncate_check<int64_t>(fast_query_iter->rmax) <= upper_index) {
317 return fast_query_iter->val;
320 size_t tightest = -1;
321 size_t tightest_range = -1;
328 for (
size_t i = 0;i < m_query.size(); ++i) {
329 static_assert(std::is_same<
size_t, decltype(m_query[i].rmin)>::value,
330 "rmin expected to have type size_t");
331 static_assert(std::is_same<
size_t, decltype(m_query[i].rmax)>::value,
332 "rmax expected to have type size_t");
333 if (truncate_check<int64_t>(m_query[i].rmin) >= lower_index &&
334 truncate_check<int64_t>(m_query[i].rmax) <= upper_index){
335 size_t center = (m_query[i].rmax + m_query[i].rmin) / 2;
336 if (std::fabs(center - rank) < tightest_range) {
338 tightest_range = std::fabs(center - rank);
342 if (tightest == (
size_t)(-1)){
345 return fast_query_iter->val;
347 return m_query[tightest].val;
377 if (quantile < 0) quantile = 0;
378 if (quantile > 1) quantile = 1;
379 return query(quantile * m_elements_inserted);
406 return fast_query_iterator(rank)->val;
431 if (quantile < 0) quantile = 0;
432 if (quantile > 1) quantile = 1;
433 return fast_query(quantile * m_elements_inserted);
462 ASSERT_EQ(m_query.size(), 0);
463 ASSERT_EQ(other.m_query.size(), 0);
464 if (m_levels.size() < other.m_levels.size()) {
465 m_levels.resize(other.m_levels.size());
467 if (other.m_levels.size() > 0) {
469 for (
size_t i = 1; i < other.m_levels.size(); ++i) {
470 if (other.m_levels[i].size() > 0) {
472 std::vector<element> sc = other.m_levels[i];
477 for (
size_t i = 0; i < other.m_levels[0].size(); ++i) {
478 add(other.m_levels[0][i].val);
483 m_elements_inserted += other.m_elements_inserted - other.m_levels[0].size();
489 oarc << m_n << m_b << m_elements_inserted << m_epsilon
490 << m_levels << m_query;
493 iarc >> m_n >> m_b >> m_elements_inserted >> m_epsilon
494 >> m_levels >> m_query;
504 explicit element(T val)
505 :val(val), rmin(-1), rmax(-1) {}
506 explicit element(T val,
size_t rmin,
size_t rmax)
507 :val(val), rmin(rmin), rmax(rmax){}
510 float rcenter()
const {
511 return ((
float)(rmin) + rmax) / 2;
519 struct element_less_than {
520 element_less_than() {};
521 element_less_than(
const Comparator& comparator) {
522 m_comparator = comparator;
525 bool operator()(
const element& e1,
const element& e2) {
526 return m_comparator(e1.val, e2.val);
529 Comparator m_comparator;
535 size_t m_elements_inserted = 0;
536 double m_epsilon = 0.01;
539 std::vector<std::vector<element> > m_levels;
544 std::vector<element> m_query;
547 element_less_than m_comparator;
553 void sort_level_0() {
554 std::sort(m_levels[0].begin(), m_levels[0].end(), m_comparator);
555 for (
size_t i = 0;i < m_levels[0].size(); ++i) {
559 m_levels[0][i].rmin = i + 1;
560 m_levels[0][i].rmax = i + 1;
567 void compress(std::vector<element>& vec,
double additional_error) {
568 double b = 1.0 / additional_error;
569 size_t num_values = std::ceil(2.0 * b) + 1;
570 if (num_values < 2) num_values = 2;
571 compress_to_size(vec, num_values);
577 void compress_to_size(std::vector<element>& vec,
size_t selection_size) {
578 if (selection_size >= vec.size())
return;
579 double step_size = (double)vec.size() / selection_size;
580 for (
size_t i = 0;i < selection_size - 1; ++i) {
581 size_t targ = step_size * i;
582 if (targ >= vec.size()) targ = vec.size() - 1;
585 vec[selection_size - 1] = vec[vec.size() - 1];
586 vec.resize(selection_size);
593 std::vector<element> merge(std::vector<element>& left,
594 std::vector<element>& right) {
596 if (left.size() == 0)
return right;
597 if (right.size() == 0)
return left;
600 std::vector<element> ret(left.size() + right.size());
608 while(leftidx < left.size() && rightidx < right.size()) {
609 if (m_comparator(left[leftidx], right[rightidx]) ||
610 (left[leftidx].val == right[rightidx].val)) {
612 ret[outidx] = element(
613 std::move(left[leftidx].val),
614 left[leftidx].rmin + (rightidx > 0 ? right[rightidx - 1].rmin : 0),
615 left[leftidx].rmax + (right[rightidx].rmax > 0 ? right[rightidx].rmax - 1 : 0));
619 ret[outidx] = element(
620 std::move(right[rightidx].val),
621 right[rightidx].rmin + (leftidx > 0 ? left[leftidx - 1].rmin : 0),
622 right[rightidx].rmax + (left[leftidx].rmax > 0 ? left[leftidx].rmax - 1 : 0));
628 while(leftidx < left.size()) {
629 ret[outidx] = element(
632 left[leftidx].rmin + right[rightidx - 1].rmin,
633 left[leftidx].rmax + right[rightidx - 1].rmax);
638 while(rightidx < right.size()) {
639 ret[outidx] = element(
642 right[rightidx].rmin + left[leftidx - 1].rmin,
643 right[rightidx].rmax + left[leftidx - 1].rmax);
661 void compact(std::vector<element>& sc,
size_t starting_level = 1) {
662 for (
size_t i = starting_level;i < m_levels.size(); ++i) {
663 if (m_levels[i].
size() == 0) {
664 m_levels[i] = std::move(sc);
667 sc = merge(sc, m_levels[i]);
668 compress(sc, 1.0 / m_b);
672 m_levels.push_back(sc);
679 std::vector<element> recursive_merge_of_all_levels(
size_t start,
size_t end) {
680 if (end - start == 1)
return std::move(m_levels[start]);
681 else if (end - start == 2)
return merge(m_levels[start], m_levels[start + 1]);
683 size_t midpoint = start + (end - start) / 2;
684 std::vector<element> left = recursive_merge_of_all_levels(start, midpoint);
685 std::vector<element> right = recursive_merge_of_all_levels(midpoint, end);
686 return merge(left, right);
695 static bool rank_center_comparator(
const element& left,
696 const element& right) {
697 float center_left = left.rcenter();
698 float center_right = right.rcenter();
699 return center_left < center_right;
704 typename std::vector<element>::const_iterator fast_query_iterator(
size_t rank)
const {
708 return m_query.begin();
710 if (rank >= m_elements_inserted) {
711 return m_query.begin() + (m_query.size() - 1);
713 element search_elem(T(), rank, rank);
715 auto iter = std::lower_bound(m_query.begin(), m_query.end(),
716 search_elem, rank_center_comparator);
717 if (iter == m_query.end()) {
719 return m_query.begin() + (m_query.size() - 1);
720 }
else if (iter == m_query.begin()) {
725 auto leftiter = iter - 1;
726 float left_distance = std::fabs(leftiter->rcenter() - rank);
727 float right_distance = std::fabs(iter->rcenter() - rank);
728 return left_distance < right_distance ? leftiter : iter;
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
T query_quantile(double quantile) const
void combine(const quantile_sketch &other)
Inheriting from this type will force the serializer to treat the derived type as a POD type...
quantile_sketch(size_t desired_n, double epsilon=0.005, const Comparator &comparator=Comparator())
T fast_query_quantile(double quantile) const
T fast_query(size_t rank) const
T query(size_t rank) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
void init(size_t desired_n, double epsilon, const Comparator &comparator=Comparator())