Turi Create  4.0
topk_indexer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_TOPK_COLUMN_INDEXER_H_
7 #define TURI_TOPK_COLUMN_INDEXER_H_
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/util/hash_value.hpp>
11 #include <core/logging/assertions.hpp>
12 #include <core/util/bitops.hpp>
13 #include <core/storage/serialization/serialization_includes.hpp>
14 #include <core/generics/hopscotch_map.hpp>
15 #include <core/parallel/pthread_tools.hpp>
16 #include <core/export.hpp>
17 
18 namespace turi {
19 
20 /**
21  *
22  * Parallel top-k indexer for categorical variables (uses one-hot-encoding)
23  *
24  * Note: This implementation is intended to be general and will be moved to some
25  * place more general later.
26  *
27  * Construction
28  * -------------
29  *
30  * // Construct the indexer with the arguments.
31  * auto indexer = topk_indexer(10, 1, "column_name_for_error_messages");
32  * indexer.initialize();
33  *
34  * // Insert flexible types into the indexer
35  * for (const flexible_type& v: sa.range_iterator() {
36  * indexer.insert_or_update(v);
37  * }
38  *
39  * // Finalize mapping (drops elements by frequency/threshold)
40  * indexer.finalize();
41  *
42  * Lookups
43  * --------
44  * size_t index = indexer.lookup(v); // Returns (size_t) -1 if not present.
45  *
46  * size_t counts = indexer.lookup_counts(v); // Returns 0 if not present.
47  *
48  * flexible_type v = indexer.inverse_lookup(1) // Fails if index doesn't exist.
49  *
50  * Parallel construction
51  * -----------------------
52  *
53  * // Initialize
54  * indexer.initialize();
55  *
56  * // Perform the indexing.
57  * in_parallel([&](size_t thread_idx, size_t num_threads) {
58  *
59  * size_t start_idx = src_size * thread_idx / num_threads;
60  * size_t end_idx = src_size * (thread_idx + 1) / num_threads;
61  *
62  * for (const flexible_type& v: sa.range_iterator(start_idx, end_idx) {
63  * indexer.insert_or_update(v, thread_id);
64  * }
65  *
66  * // Finalize
67  * indexer.finalize();
68  *
69  *
70  */
71 class EXPORT topk_indexer {
72 
73  public:
74 
75  /**
76  * Default constructor
77  *
78  * \param[in] topk Topk to retain (by counts)
79  * \param[in] threshold Min count threshold to retain.
80  * \param[in] column_name Column name for display.
81  *
82  */
83  topk_indexer(const size_t& _topk = (size_t)-1,
84  const size_t& _threshold = 1,
85  const size_t& _max_threshold = (size_t) -1,
86  const std::string _column_name = "") : topk(_topk),
87  threshold(_threshold), max_threshold(_max_threshold),column_name(_column_name) {
88  }
89 
90 
91  /**
92  * Copy constructor: Don't want to risk making copies of this.
93  */
94  topk_indexer(const topk_indexer&) = delete;
95 
96 
97  /**
98  * Initialize the index mapping and setup. Should be called before
99  * starting the map.
100  */
101  void initialize();
102 
103  /**
104  * Insert
105  *
106  * \param[in] value Flexible type.
107  * \param[in] thread_idx Thread id (For parallel insertion).
108  * \param[in] count Amount to increment for this value.
109  *
110  */
111  void insert_or_update(const flexible_type& value, size_t thread_idx = 0,
112  size_t count = 1) GL_HOT;
113 
114  /**
115  * Returns the index associated with the value.
116  *
117  * \param[in] value Search for the value.
118  * \returns The index. (Returns size_t(-1) if not present).
119  */
120  size_t lookup(const flexible_type& value) const;
121 
122  /**
123  * Returns the counts associated with the value.
124  *
125  * \param[in] value Search for the value.
126  * \returns Counts (Returns 0 if not present).
127  */
128  size_t lookup_counts(const flexible_type& value) const;
129 
130 
131  /**
132  * Finalize by dropping indices that dont meet
133  * - Count requirement i.e count >= threshold.
134  * - Topk requirement.
135  */
136  void finalize();
137 
138  /**
139  * Returns the "value" associated with the index.
140  *
141  * \param[\in] idx Index associated with the feature value.
142  * \return The "value" in the original data associated with the given id.
143  */
144  flexible_type inverse_lookup(size_t idx) const;
145 
146  /**
147  * Returns the values (ordered by indices)
148  */
149  std::vector<flexible_type> get_values() const {
150  return values;
151  }
152 
153 
154  /** Returns the number of categorical variables.
155  *
156  * \return Column size.
157  */
158  inline size_t size() const {
159  return index_lookup.size();
160  }
161 
162  /**
163  * Returns the current version used for the serialization.
164  */
165  size_t get_version() const;
166 
167  /**
168  * Serialize the object (save).
169  */
170  void save_impl(turi::oarchive& oarc) const;
171 
172  /**
173  * Load the object.
174  */
175  void load_version(turi::iarchive& iarc, size_t version);
176 
177  private:
178 
179  // Private members.
180  size_t topk = (size_t) (-1);
181  size_t threshold = 0;
182  size_t max_threshold = (size_t) (-1);
183  std::string column_name = "";
184 
185  // List of Map(hash : (value, count)) per thread.
186  std::vector<hopscotch_map<hash_value, std::pair<flexible_type, size_t>>>
187  threadlocal_accumulator;
188 
189  // Index -> value/cound
190  std::vector<flexible_type> values;
191  std::vector<size_t> counts;
192 
193  // Map(value : index)
195 
196  // Private helper functions.
197  // ------------------------------------------------------------------------
198 
199  /**
200  * Retain only top_k values (by counts). Only call after finalize.
201  */
202  void retain_only_top_k_values();
203 
204  /**
205  * Retain values with counts >= threshold. Only call after finalize.
206  */
207  void retain_min_count_values();
208 
209  /**
210  * Retain values with count <= max_threshold. Only call after finalize.
211  */
212  void delete_min_count_values();
213  /**
214  * Delete everything associated with an index in the lookup.
215  */
216  void mark_for_deletion(size_t index);
217  /**
218  * Delete everything associated with an index in the lookup.
219  */
220  void delete_all_marked();
221 
222  /**
223  * Validate feature types.
224  */
225  void valdidate_types(const flexible_type& value) const;
226 };
227 } // turicreate
228 
229 
230 // Implement serialization for std::shared_ptr
231 BEGIN_OUT_OF_PLACE_SAVE(arc, std::shared_ptr<topk_indexer>, m) {
232  if(m == nullptr) {
233  arc << false;
234  } else {
235  arc << true;
236 
237  // Save the version number
238  size_t version = m->get_version();
239  arc << version;
240  // Save the object.
241  m->save_impl(arc);
242  }
243 } END_OUT_OF_PLACE_SAVE()
244 
245 
246 // Implement deserialization
247 BEGIN_OUT_OF_PLACE_LOAD(arc, std::shared_ptr<topk_indexer>, m) {
248  bool is_not_nullptr;
249  arc >> is_not_nullptr;
250  if(is_not_nullptr) {
251 
252  // Load version
253  size_t version;
254  arc >> version;
255 
256  // Load object.
257  m.reset(new topk_indexer(0,1,0,""));
258  m->load_version(arc, version);
259 
260  } else {
261  m = std::shared_ptr<topk_indexer>(nullptr);
262  }
263 } END_OUT_OF_PLACE_LOAD()
264 #endif
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
Definition: iarchive.hpp:314
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
topk_indexer(const size_t &_topk=(size_t) -1, const size_t &_threshold=1, const size_t &_max_threshold=(size_t) -1, const std::string _column_name="")
size_t size() const
std::set< T > values(const std::map< Key, T > &map)
Definition: stl_util.hpp:386
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
std::vector< flexible_type > get_values() const
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.
Definition: oarchive.hpp:346