Turi Create  4.0
space_saving.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SKETCHES_SPACE_SAVING_SKETCH_HPP
7 #define TURI_SKETCHES_SPACE_SAVING_SKETCH_HPP
8 
9 #include <vector>
10 #include <map>
11 #include <cmath>
12 #include <set>
13 #include <core/generics/value_container_mapper.hpp>
14 
15 
16 namespace turi {
17 namespace sketches {
18 
19 /**
20  * \ingroup sketching
21  * This class implements the Space Saving Sketch as described in
22  * Ahmed Metwally † Divyakant Agrawal Amr El Abbadi. Efficient Computation of
23  * Frequent and Top-k Elements in Data Streams.
24  *
25  * It provides an efficient one pass scan of all the data and provides an
26  * estimate all the frequently occuring elements, with guarantees that all
27  * elements with occurances >= \epsilon N will be reported.
28  *
29  * \code
30  * space_saving ss;
31  * // repeatedly call
32  * ss.add(stuff);
33  * // will return an array containing all the elements tracked
34  * // not all elements may be truly frequent items
35  * ss.frequent_items()
36  * // will return an array containing all the elements tracked which are
37  * // guaranteed to have occurances >= \epsilon N
38  * \endcode
39  *
40  */
41 template <typename T>
42 class space_saving {
43  public:
44  /**
45  * Constructs a save saving sketch using 1 / epsilon buckets.
46  * The resultant hyperloglog datastructure will 1 / epsilon memory, and
47  * guarantees that all elements with occurances >= \epsilon N will be reported.
48  */
49  space_saving(double epsilon = 0.0001)
50  {
51  initialize(epsilon);
52  init_data_structures();
53  }
54 
55  /**
56  * Initalizes a save saving sketch using 1 / epsilon buckets. The
57  * resultant hyperloglog datastructure will use O(1 / epsilon)
58  * memory, and guarantees that all elements with occurances >=
59  * \epsilon N will be reported.
60  */
61  void initialize(double epsilon = 0.0001) {
62  clear();
63 
64  // capacity = 1.0 / epsilon. add one to avoid rounding problems around the
65  // value of \epsilon N
66  m_max_capacity = size_t(std::ceil(1.0 / epsilon) + 1);
67  m_epsilon = epsilon;
68  }
69 
70  /** Clears everything out.
71  */
72  void clear() {
73  m_size = 0;
74  n_entries = 0;
75  base_level = 1;
76  base_level_candidate_count = 0;
77  cached_insertion_key = hashkey();
78  cached_insertion_value = nullptr;
79  global_map.clear();
80  }
81 
82  /**
83  * Adds an item with a specified count to the sketch.
84  */
85  void add(const T& t, size_t count = 1) {
86  add_impl(t, count, 0);
87  }
88 
89  /**
90  * Returns the number of elements inserted into the sketch.
91  */
92  size_t size() const {
93  return m_size;
94  }
95 
96  /**
97  * Returns all the elements tracked by the sketch as well as an
98  * estimated count. The estimated can be a large overestimate.
99  */
100  std::vector<std::pair<T, size_t> > frequent_items() const {
101  std::vector<std::pair<T, size_t> > ret;
102 
103  if(n_entries < entries.size()) {
104  // We have all the items exactly; just return them.
105  ret.resize(n_entries);
106  for(size_t i = 0; i < n_entries; ++i) {
107  const entry& e = entries[i];
108  ret[i] = {e.value(), e.count};
109  }
110  } else {
111 
112  size_t threshhold = std::max(size_t(1), size_t(m_epsilon * size()));
113 
114  for(size_t i = 0; i < n_entries; ++i) {
115  const entry& e = entries[i];
116 
117  if (e.count >= threshhold)
118  ret.push_back(std::make_pair(e.value(), e.count));
119  }
120  }
121 
122  return ret;
123  }
124 
125 
126  /**
127  * Returns all the elements tracked by the sketch as well as an
128  * estimated count. All elements returned are guaranteed to have
129  * occurance >= epsilon * m_size
130  */
131  std::vector<std::pair<T, size_t> > guaranteed_frequent_items() const {
132  std::vector<std::pair<T, size_t> > ret;
133 
134  if(n_entries < entries.size()) {
135  // We have all the items exactly; just return them.
136  ret.resize(n_entries);
137  for(size_t i = 0; i < n_entries; ++i) {
138  const entry& e = entries[i];
139  ret[i] = {e.value(), e.count};
140  }
141  } else {
142  size_t threshhold = std::max(size_t(1), size_t(m_epsilon * size()));
143 
144  for(size_t i = 0; i < n_entries; ++i) {
145  const entry& e = entries[i];
146 
147  if (e.count - e.error >= threshhold)
148  ret.push_back(std::make_pair(e.value(), e.count));
149  }
150  }
151 
152  return ret;
153  }
154 
155  /**
156  * Merges a second space saving sketch into the current sketch
157  */
158  template <typename U>
159  typename std::enable_if<std::is_convertible<U, T>::value, void>::type
160  /* void */ combine(const space_saving<U>& other) {
161  _combine(other);
162  }
163 
164  ~space_saving() { }
165 
166  private:
167 
168  template <class U> friend class space_saving;
169 
170  // Total count added thus far
171  size_t m_size = 0;
172 
173  // --- Other Tracking Internal Variables you don't have to care about ---
174  // number of unique values to track
175  size_t m_max_capacity = 0;
176  double m_epsilon = 1;
177 
178  struct entry;
179 
181  typedef typename global_map_type::hashkey hashkey;
183 
184  // The container structure for the entry
185  struct entry {
186  typedef typename global_map_type::hashkey_and_value hashkey_and_value;
187 
188  size_t count = 0;
189  size_t error = 0;
190 
191  entry(){}
192 
193  entry(hashkey_and_value&& _kv)
194  : kv(_kv)
195  {}
196 
197  hashkey_and_value kv;
198 
199  inline const hashkey_and_value& get_hashkey_and_value() const { return kv; }
200 
201  inline const T& value() const { return kv.value(); }
202  };
203 
204  /** The entries are arranged so that the base level is built and
205  * maintained as a sequential partition starting at 0, and the rest
206  * are maintained as a sequential partition starting at n-1 and
207  * going down. If the base_level is empty when it is needed, the
208  * other partition is scanned and things are rearranged so that the
209  * base level is now the lowest level present and the base level
210  * partition is correct.
211  *
212  * The number of unique elements is thus (base_level_end +
213  * (m_max_capacity - overflow_level_start) ).
214  */
215  size_t n_entries = 0;
216  std::vector<entry> entries;
217 
218  // These control the base levels
219  size_t base_level = 1;
220 
221  std::vector<entry*> base_level_candidates;
222  size_t base_level_candidate_count = 0;
223 
224  global_map_type global_map;
225 
226  // Need to eliminate the reference in some way.
227  hashkey cached_insertion_key;
228  entry* cached_insertion_value;
229 
230  /** A generic index buffer. Used in a couple of places.
231  */
232  std::vector<size_t> index_buffer;
233 
234 
235  /** Initialize the data structures.
236  */
237  void init_data_structures() {
238  entries.resize(m_max_capacity);
239  n_entries = 0;
240 
241  index_buffer.resize(m_max_capacity);
242 
243  base_level = 1;
244  base_level_candidates.resize(m_max_capacity);
245  base_level_candidate_count = 0;
246 
247  /** Turns out it handles the deletion and insertion dynamics
248  * better with just a little extra space.
249  */
250  global_map.reserve(5 * m_max_capacity / 4);
251 
252  cached_insertion_value = nullptr;
253  }
254 
255  /** Called when the base level is needed but it's empty.
256  */
257  void regenerate_base_level() GL_HOT_NOINLINE_FLATTEN {
258 
259  _debug_check_level_integrity();
260 
261  DASSERT_EQ(base_level_candidate_count, 0);
262 
263  ////////////////////////////////////////////////////////////
264  // State explicitly some of the assumptions.
265 
266  size_t min_value = std::numeric_limits<size_t>::max();
267 
268  for(size_t i = 0; i < m_max_capacity; ++i) {
269 
270  size_t count = entries[i].count;
271 
272  if(count < min_value) {
273  base_level_candidate_count = 0;
274  min_value = count;
275  base_level_candidates[base_level_candidate_count++] = &entries[i];
276  } else if(count == min_value) {
277  base_level_candidates[base_level_candidate_count++] = &entries[i];
278  }
279  }
280 
281  ////////////////////////////////////////////////////////////////////////////////
282  // Now need to pull all of them together
283 
284  base_level = min_value;
285 
286  _debug_check_level_integrity();
287  }
288 
289  /** Increment the value of one of the elements.
290  */
291  entry* increment_element(
292  entry* element, size_t count_incr, size_t error_incr) GL_HOT_INLINE_FLATTEN {
293 
294  _debug_check_level_integrity();
295 
296  element->count += count_incr;
297  element->error += error_incr;
298  return element;
299  }
300 
301  /** Insert a new element.
302  */
303  entry* insert_element(const hashkey& key, const T& t,
304  size_t count, size_t error) GL_HOT_NOINLINE_FLATTEN {
305 
306  DASSERT_LT(n_entries, m_max_capacity);
307 
308  size_t dest_index = n_entries;
309  ++n_entries;
310 
311  entries[dest_index] = entry(hashkey_and_value(key, t));
312 
313  entries[dest_index].count = count;
314  entries[dest_index].error = error;
315 
316  if(count == base_level)
317  base_level_candidates[base_level_candidate_count++] = &(entries[dest_index]);
318 
319  global_map.insert(entries[dest_index].get_hashkey_and_value(), &(entries[dest_index]));
320 
321  // Make sure we're okay here
322  _debug_check_level_integrity();
323 
324  return &(entries[dest_index]);
325  }
326 
327  /** Change and increment the count of a value.
328  */
329  entry* change_and_increment_value(
330  const hashkey& key, const T& t, size_t count, size_t error) GL_HOT_NOINLINE_FLATTEN {
331 
332  if(base_level_candidate_count > 0
333  && base_level_candidates.back()->count != base_level) {
334 
335  while(base_level_candidate_count > 0
336  && base_level_candidates[base_level_candidate_count - 1]->count != base_level) {
337  --base_level_candidate_count;
338  }
339  }
340 
341  if(UNLIKELY(base_level_candidate_count == 0)) {
342  regenerate_base_level();
343  }
344 
345  // We're started here.
346  DASSERT_GT(base_level_candidate_count, 0);
347 
348  size_t idx = base_level_candidate_count - 1;
349 
350  entry* e = base_level_candidates[idx];
351 
352  DASSERT_EQ(e->count, base_level);
353 
354  global_map.invalidate(e->get_hashkey_and_value(), e);
355 
356  e->error = e->count + error;
357  e->count += count;
358  e->kv = hashkey_and_value(key, t);
359 
360  global_map.insert(key, e);
361 
362  --base_level_candidate_count;
363 
364  _debug_check_level_integrity();
365 
366  return e;
367  }
368 
369  /** Add in an element.
370  */
371  void add_impl(const T& t, size_t count = 1, size_t error = 0) GL_HOT {
372  _debug_check_level_integrity();
373 
374  m_size += count;
375 
376  hashkey key(t);
377 
378  // One optimization -- if the key is the same as the previous one,
379  // avoid a hash table lookup. This will be seen a lot in sorted arrays, etc.
380  if(cached_insertion_value != nullptr
381  && cached_insertion_key == key
382  && (hashkey::key_is_exact()
383  || (cached_insertion_value->value() == t) ) ) {
384 
385  increment_element(cached_insertion_value, count, error);
386 
387  } else {
388 
389  cached_insertion_key = key;
390  entry *ee_ptr = global_map.find(key, t);
391 
392  if(ee_ptr != nullptr) {
393  increment_element(ee_ptr, count, error);
394  cached_insertion_value = ee_ptr;
395  } else {
396 
397  /* If we're full. */
398  if(n_entries == m_max_capacity) {
399  cached_insertion_value = change_and_increment_value(key, t, count, error);
400  } else {
401  cached_insertion_value = insert_element(key, t, count, error);
402  }
403  }
404  }
405 
406 #ifndef NDEBUG
407  if(cached_insertion_value != nullptr) {
408  const entry* e = global_map.find(cached_insertion_key,
409  cached_insertion_value->value());
410  DASSERT_TRUE(e == cached_insertion_value);
411  }
412 #endif
413  }
414 
415  /* For combining entries and casting between types. we need to cast
416  * and recreate the key if they are different.
417  */
418  template <typename OtherEntry>
419  inline entry* _find_eptr(const OtherEntry& e,
420  typename std::enable_if<std::is_same<OtherEntry, entry>::value >::type* = 0) {
421  return global_map.find(e.get_hashkey_and_value());
422  }
423 
424  template <typename OtherEntry>
425  inline entry* _find_eptr(const OtherEntry& e,
426  typename std::enable_if<!std::is_same<OtherEntry, entry>::value >::type* = 0) {
427  return global_map.find(hashkey(e.value()), T(e.value()));
428  }
429 
430  /* For combining entries and casting between types. we need to cast
431  * and recreate the key if they are different.
432  */
433  template <typename OtherEntry>
434  inline const entry& _cast_entry(const OtherEntry& e,
435  typename std::enable_if<std::is_same<OtherEntry, entry>::value >::type* = 0) {
436  return e;
437  }
438 
439  template <typename OtherEntry>
440  inline entry _cast_entry(const OtherEntry& e,
441  typename std::enable_if<!std::is_same<OtherEntry, entry>::value >::type* = 0) {
442  entry ret;
443  ret.count = e.count;
444  ret.error = e.error;
445  ret.kv = hashkey_and_value(T(e.value()));
446  return ret;
447  }
448 
449  /** Combine this heap with another one.
450  */
451  template <typename U>
452  GL_HOT_NOINLINE_FLATTEN
453  void _combine(const space_saving<U>& other) {
454  /**
455  * Pankaj K. Agarwal, Graham Cormode, Zengfeng Huang,
456  * Jeff M. Phillips, Zhewei Wei, and Ke Yi. Mergeable Summaries.
457  * 31st ACM Symposium on Principals of Database Systems (PODS). May 2012.
458  */
459 
460  _debug_check_level_integrity(true);
461  other._debug_check_level_integrity(true);
462 
463  if(other.m_size == 0)
464  return;
465 
466  // Start by getting the sizes for the partitions based on the
467  // current entries.
468  std::map<size_t, size_t> part_sizes;
469 
470  // First, go through and update the partition sizes from the other
471  // sketch, copying over the counts and errors where they deal with
472  // these.
473  for(size_t i = 0; i < other.n_entries; ++i) {
474 
475  entry* e_ptr = _find_eptr(other.entries[i]);
476 
477  if(e_ptr != nullptr) {
478  e_ptr->count += other.entries[i].count;
479  e_ptr->error += other.entries[i].error;
480  } else {
481  ++part_sizes[other.entries[i].count];
482  }
483  }
484 
485  for(size_t i = 0; i < n_entries; ++i) {
486  ++part_sizes[entries[i].count];
487  }
488 
489  // Okay, now we have all the partition sizes. Now just need to go
490  // backwards and count out the partitions.
491 
492  size_t current_position = m_max_capacity;
493 
494  for(auto it = part_sizes.rbegin(); it != part_sizes.rend(); ++it) {
495 
496  size_t level = it->first;
497  size_t part_size = it->second;
498 
499  if(part_size >= current_position) {
500  base_level = level;
501  break;
502  } else {
503  current_position -= part_size;
504  }
505  }
506 
507  if(base_level == 0) {
508  // None here.
509  base_level = part_sizes.begin()->first;
510  current_position += part_sizes.begin()->second;
511  }
512 
513  size_t num_base_entries_left = current_position;
514  size_t write_position = 0;
515 
516  // Now, go through and move everything over
517  std::vector<entry> alt_entries(m_max_capacity);
518 
519  auto write_entry = [&](const entry& e) {
520 
521  size_t lvl = e.count;
522 
523  if(lvl < base_level)
524  return;
525 
526  if(lvl == base_level) {
527  if(num_base_entries_left == 0)
528  return;
529  --num_base_entries_left;
530  }
531 
532  DASSERT_LT(write_position, m_max_capacity);
533  alt_entries[write_position] = e;
534  ++write_position;
535  };
536 
537  for(size_t i = 0; i < n_entries; ++i)
538  write_entry(_cast_entry(entries[i]));
539 
540  for(size_t i = 0; i < other.n_entries; ++i) {
541  if(_find_eptr(other.entries[i]) == nullptr)
542  write_entry(_cast_entry(other.entries[i]));
543  }
544 
545  // Go through and update the base level stuff
546 
547  n_entries = write_position;
548  m_size += other.m_size;
549 
550  entries = std::move(alt_entries);
551 
552  // Now, go through and fill up the hash map correctly
553  global_map.clear();
554 
555  for(size_t i = 0; i < n_entries; ++i) {
556  global_map.insert(entries[i].get_hashkey_and_value(), &(entries[i]));
557  }
558 
559  cached_insertion_value = nullptr;
560  base_level_candidate_count = 0;
561 
562  _debug_check_level_integrity(true);
563  other._debug_check_level_integrity(true);
564  }
565 
566  ////////////////////////////////////////////////////////////////////////////////
567  // Debug stuff
568 
569  void _debug_check_level_integrity(bool force_check = false) const GL_HOT_INLINE_FLATTEN {
570 #ifndef NDEBUG
571 #ifdef ENABLE_SKETCH_CONSISTENCY_CHECKS
572  force_check = true;
573 #endif
574 
575  DASSERT_LE(n_entries, m_size);
576 
577  if(!force_check)
578  return;
579 
580  std::set<const entry*> bl_set(base_level_candidates.begin(),
581  base_level_candidates.begin() + base_level_candidate_count);
582 
583  // Tests the invariant conditions on the entries vector and the level partitions
584 
585  for(size_t i = 0; i < n_entries; ++i) {
586  ASSERT_GE(entries[i].count, base_level);
587 
588  // Make sure all of them are tracked (even if they are not all
589  // valid).
590  if(entries[i].count == base_level && base_level_candidate_count != 0)
591  ASSERT_TRUE(bl_set.find(&entries[i]) != bl_set.end());
592 
593  const entry* e = global_map.find(entries[i].get_hashkey_and_value());
594 
595  ASSERT_TRUE(e != nullptr);
596  ASSERT_EQ(e, &(entries[i]));
597  }
598 
599  // okay, we're all good!
600 #endif
601  }
602 
603 };
604 
605 } // sketch
606 } // namespace turi
607 #endif
std::vector< std::pair< T, size_t > > guaranteed_frequent_items() const
std::enable_if< std::is_convertible< U, T >::value, void >::type combine(const space_saving< U > &other)
ValueContainer * find(const hashkey_and_value &hv) GL_HOT_INLINE_FLATTEN
void initialize(double epsilon=0.0001)
#define GL_HOT_INLINE_FLATTEN
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
std::vector< std::pair< T, size_t > > frequent_items() const
void insert(const hashkey_and_value &hv, ValueContainer *v_ptr) GL_HOT_INLINE_FLATTEN
space_saving(double epsilon=0.0001)
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
void invalidate(const hashkey_and_value &hv, ValueContainer *v_ptr) GL_HOT_INLINE_FLATTEN
void add(const T &t, size_t count=1)