9 #include <unordered_set> 10 #include <core/data/flexible_type/flexible_type.hpp> 11 #include <model_server/lib/api/unity_sketch_interface.hpp> 12 #include <core/storage/sframe_data/sarray.hpp> 13 #include <ml/sketches/hyperloglog.hpp> 14 #include <ml/sketches/countsketch.hpp> 15 #include <ml/sketches/quantile_sketch.hpp> 16 #include <ml/sketches/streaming_quantile_sketch.hpp> 17 #include <core/parallel/atomic.hpp> 24 template <
typename T,
typename Comparator>
25 class streaming_quantile_sketch;
26 template <
typename T,
typename Comparator>
27 class quantile_sketch;
30 class space_saving_flextype;
139 static const double SKETCH_COMMIT_INTERVAL;
151 void construct_from_sarray(std::shared_ptr<unity_sarray_base> uarray,
bool background =
false,
const std::vector<flexible_type>&
keys = {});
168 size_t num_elements_processed();
179 double get_quantile(
double quantile);
196 std::vector<std::pair<flexible_type, size_t> > frequent_items();
210 std::map<flexible_type, std::shared_ptr<unity_sketch_base>> element_sub_sketch(
const std::vector<flexible_type>&
keys);
216 std::shared_ptr<unity_sketch_base> element_length_summary();
223 std::shared_ptr<unity_sketch_base> element_summary();
229 std::shared_ptr<unity_sketch_base> dict_key_summary();
235 std::shared_ptr<unity_sketch_base> dict_value_summary();
243 if (!m_is_numeric) log_and_throw(
"Mean value not available for a non-numeric column");
244 commit_global_if_out_of_date();
245 std::unique_lock<turi::mutex> global_lock(lock);
246 return m_numeric_sketch.mean;
255 if (!m_is_numeric) log_and_throw(
"Max value not available for a non-numeric column");
256 commit_global_if_out_of_date();
257 std::unique_lock<turi::mutex> global_lock(lock);
258 return m_numeric_sketch.max;
267 if (!m_is_numeric) log_and_throw(
"Min value not available for a non-numeric column");
268 commit_global_if_out_of_date();
269 std::unique_lock<turi::mutex> global_lock(lock);
270 return m_numeric_sketch.min;
279 if (!m_is_numeric) log_and_throw(
"Epsilon value not available for a non-numeric column");
280 commit_global_if_out_of_date();
281 std::unique_lock<turi::mutex> global_lock(lock);
282 return m_numeric_sketch.max;
291 if (!m_is_numeric) log_and_throw(
"Sum value not available for a non-numeric column");
292 commit_global_if_out_of_date();
293 std::unique_lock<turi::mutex> global_lock(lock);
294 return m_numeric_sketch.sum;
302 if (!m_is_numeric) log_and_throw(
"Sum value not available for a non-numeric column");
303 commit_global_if_out_of_date();
304 std::unique_lock<turi::mutex> global_lock(lock);
305 if (m_numeric_sketch.num_items == 0)
return 0.0;
306 return m_numeric_sketch.m2/m_numeric_sketch.num_items;
320 commit_global_if_out_of_date();
321 std::unique_lock<turi::mutex> global_lock(lock);
322 return m_undefined_count;
334 struct numeric_sketch_struct {
335 std::shared_ptr<sketches::streaming_quantile_sketch<double>> quantiles;
336 double min = std::numeric_limits<double>::max();
337 double max = std::numeric_limits<double>::lowest();
340 size_t num_items = 0;
346 void combine(
const numeric_sketch_struct& other);
348 void accumulate(
double dval);
353 struct discrete_sketch_struct {
354 std::shared_ptr<sketches::countsketch<flexible_type> > count;
355 std::shared_ptr<sketches::space_saving_flextype> frequent;
356 std::shared_ptr<sketches::hyperloglog> unique;
362 void combine(
const discrete_sketch_struct& other);
368 bool m_is_child_sketch =
false;
370 bool m_sketch_ready =
false;
373 bool m_is_numeric =
false;
374 bool m_is_list =
false;
380 discrete_sketch_struct m_discrete_sketch;
383 numeric_sketch_struct m_numeric_sketch;
384 size_t m_undefined_count = 0;
385 size_t m_num_elements_processed = 0;
387 turi::atomic<size_t> m_rows_processed_by_threads;
390 struct thr_local_data {
391 discrete_sketch_struct discrete_sketch;
392 numeric_sketch_struct numeric_sketch;
393 size_t undefined_count = 0;
394 size_t num_elements_processed = 0;
397 std::vector<thr_local_data> m_thrlocal;
398 std::vector<turi::mutex> m_thrlocks;
400 atomic<bool> m_cancel =
false;
401 std::future<void> m_background_future;
405 std::shared_ptr<unity_sketch> m_element_len_sketch;
406 std::shared_ptr<unity_sketch> m_element_sketch;
409 std::shared_ptr<unity_sketch> m_dict_key_sketch;
410 std::shared_ptr<unity_sketch> m_dict_value_sketch;
413 std::map<flexible_type, std::shared_ptr<unity_sketch>> m_element_sub_sketch;
419 void reset_global_sketches_and_statistics();
420 void commit_global_if_out_of_date();
423 unity_sketch(std::shared_ptr<unity_sketch> src,
bool sketch_ready) {
424 m_sketch_ready = sketch_ready;
425 m_is_numeric = src->m_is_numeric;
426 m_stored_type = src->m_stored_type;
427 m_is_list = src->m_is_list;
428 m_dict_value_sketch_type = src->m_dict_value_sketch_type;
429 m_size = src->m_size;
430 m_numeric_sketch = src->m_numeric_sketch;
431 m_discrete_sketch = src->m_discrete_sketch;
432 m_undefined_count = src->m_undefined_count;
433 m_num_elements_processed = src->m_num_elements_processed;
434 m_rows_processed_by_threads = src->m_rows_processed_by_threads;
435 m_is_child_sketch = src->m_is_child_sketch;
442 const std::unordered_set<flexible_type>& keys = std::unordered_set<flexible_type>(),
445 inline void combine_global(std::vector<turi::mutex>& thr_locks);
446 inline void accumulate_dict_value(
const flexible_type& dict_val,
size_t thr,
const std::unordered_set<flexible_type>& keys);
447 inline void accumulate_vector_value(
const flexible_type& vect_val,
size_t thr,
const std::unordered_set<flexible_type>& keys);
448 inline void accumulate_list_value(
const flexible_type& rec_val,
size_t thr);
449 inline void accumulate_one_value(
size_t thr,
const flexible_type& val,
const std::unordered_set<flexible_type>& keys = std::unordered_set<flexible_type>());
450 inline void accumulate_discrete_sketch(
size_t thr,
const flexible_type& val) {
451 m_thrlocal[thr].discrete_sketch.accumulate(val);
454 inline void accumulate_numeric_sketch(
size_t thr,
const flexible_type& val) {
455 m_thrlocal[thr].numeric_sketch.accumulate((
double)val);
458 inline void empty_sketch() {
459 m_numeric_sketch.mean = 0;
460 m_numeric_sketch.min = NAN;
461 m_numeric_sketch.max = NAN;
462 m_numeric_sketch.sum = 0;
463 m_numeric_sketch.m2 = 0;
464 m_numeric_sketch.num_items = 0;
465 m_numeric_sketch.epsilon = NAN;
468 inline void increase_nested_element_count(
unity_sketch& nested_sketch,
size_t thr,
size_t count) {
469 nested_sketch.m_thrlocal[thr].num_elements_processed += count;
470 nested_sketch.m_rows_processed_by_threads.inc(count);
std::set< Key > keys(const std::map< Key, T > &map)
A simple class that can be used for benchmarking/timing up to microsecond resolution.
#define DASSERT_TRUE(cond)