Turi Create  4.0
column_indexer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_DML_DATA_COLUMN_INDEXER_H_
7 #define TURI_DML_DATA_COLUMN_INDEXER_H_
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/util/hash_value.hpp>
11 #include <core/logging/assertions.hpp>
12 #include <core/util/bitops.hpp>
13 #include <ml/ml_data/ml_data_column_modes.hpp>
14 #include <core/storage/serialization/serialization_includes.hpp>
15 #include <core/generics/hopscotch_map.hpp>
16 #include <core/parallel/pthread_tools.hpp>
17 
18 namespace turi {
19 
20 namespace ml_data_internal {
21 
22 /**
23  * \ingroup mldata
24  * \internal
25  * \addtogroup mldata ML Data Internal
26  * \{
27  */
28 
29 
30 /** Use a two-level hash table to store the index mappings. The
31  * first level is constant size and unlocked, determined by an n-bit
32  * hash. Each leaf in this one contains a hash table and lock. This
33  * significantly reduces lock contention. This
34  * _column_metadata_first_level_lookup_size_n_bits gives the number
35  * of bits used for this first lookup.
36  */
38 
39 /**
40  * column_metadata contains "meta data" concerning indexing of a single column
41  * of an SFrame. A collection of meta_data column objects is "all" the
42  * metadata required in the ml_data container.
43  */
45 
46  public:
47 
48  column_indexer() {}
49 
50  /**
51  * Default constructor; does nothing;
52  */
53  column_indexer(std::string column_name,
54  ml_column_mode mode,
55  flex_type_enum original_column_type);
56 
57  /**
58  * Copy constructor: Don't want to risk making copies of this.
59  */
60  column_indexer(const column_indexer&) = delete;
61 
62 
63  /** Initialize the index mapping and setup. There are certain
64  * internal parallel things that need to be set up before
65  * map_value_to_index works. Call this before looping over
66  * map_value_to_index, then call finalize() when done.
67  */
68  void initialize();
69 
70  /** Returns the index associated with the "feature" value.
71  *
72  * \note Only used if is_categorical is true.
73  *
74  * If the value in the feature column was already seen, then the index
75  * already associated with that value is returned. If not, a new unique
76  * index is added and associated with this feature value.
77  *
78  * This method is completely threadsafe and is meant to be called by
79  * multiple threads in contention.
80  *
81  * \param[in] feature The value in the feature column to map to the index.
82  * \return An index (possibly new) associated with the given value.
83  */
84  size_t map_value_to_index(size_t thread_idx, const flexible_type& feature) GL_HOT {
86  mode == ml_column_mode::CATEGORICAL
87  || mode == ml_column_mode::CATEGORICAL_VECTOR
88  || mode == ml_column_mode::DICTIONARY);
89 
90  DASSERT_FALSE(values_by_index_threadlocal_accumulator.empty());
91  DASSERT_LT(thread_idx, values_by_index_threadlocal_accumulator.size());
92 
93  // Check value
94  if( ! (feature.get_type() == flex_type_enum::STRING
95  || feature.get_type() == flex_type_enum::INTEGER
96  || feature.get_type() == flex_type_enum::UNDEFINED) ) {
97 
98  auto throw_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
99  log_and_throw(std::string("Value encountered in column '")
100  + column_name + "' is of type '"
101  + flex_type_enum_to_name(feature.get_type()) +
102  "' cannot be mapped to a categorical value." +
103  " Categorical values must be integer, strings, or None.");
104  };
105  throw_error();
106  }
107 
108  hash_value wt(feature);
109 
110  // Lock the first level
111  size_t first_index = wt.n_bit_index(_column_indexer_first_level_lookup_size_n_bits);
112  DASSERT_LT(first_index, index_by_values_lookup.size());
113  auto& lock_ht_pair = index_by_values_lookup[first_index];
114 
115  std::lock_guard<simple_spinlock> lg(lock_ht_pair.first);
116  auto it = lock_ht_pair.second.find(wt);
117 
118  size_t index;
119 
120  if(it == lock_ht_pair.second.end()) {
121  index = (++_column_size) - 1;
122  values_by_index_threadlocal_accumulator[thread_idx].push_back({index, feature});
123  lock_ht_pair.second[wt] = index;
124  } else {
125  index = it->second;
126  }
127 
128  return index;
129  }
130 
131  /** Returns the index associated with the "feature" value.
132  *
133  * \note Only used if is_categorical is true.
134  *
135  * If the value in the feature column was already seen, then the
136  * index already associated with that value is returned. If not,
137  * size_t(-1) is returned.
138  *
139  * \param[in] feature The value in the feature column to map to the index.
140  * \return An index associated with the given value. If the index is not
141  * present. We return size_t(-1).
142  */
143  size_t immutable_map_value_to_index(const flexible_type& feature) const {
144 
145  DASSERT_TRUE(
146  mode == ml_column_mode::CATEGORICAL
147  || mode == ml_column_mode::CATEGORICAL_VECTOR
148  || mode == ml_column_mode::DICTIONARY);
149 
150  // Check value
151  if( ! (feature.get_type() == flex_type_enum::STRING
152  || feature.get_type() == flex_type_enum::INTEGER
153  || feature.get_type() == flex_type_enum::UNDEFINED) ) {
154 
155  auto throw_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
156  log_and_throw(std::string("Value encountered in column '")
157  + column_name + "' is of type '"
158  + flex_type_enum_to_name(feature.get_type()) +
159  "' cannot be mapped to a categorical value." +
160  " Categorical values must be integer, strings, or None.");
161  };
162  throw_error();
163  }
164 
165  hash_value wt(feature);
166 
167  // Lock the first level
168  size_t first_index = wt.n_bit_index(_column_indexer_first_level_lookup_size_n_bits);
169  DASSERT_LT(first_index, index_by_values_lookup.size());
170  auto& lock_ht_pair = index_by_values_lookup[first_index];
171 
172  auto it = lock_ht_pair.second.find(wt);
173 
174  if(it == lock_ht_pair.second.end()) {
175 
176  // Value not found.
177  return (size_t)(-1);
178 
179  } else {
180 
181  // Value found. Returning the index.
182  return it->second;
183  }
184  }
185 
186  /** Some of the ml_data tests currently depend on the order of
187  * insertion into the index, which is now done in parallel and
188  * thus not deterministic. This function allows the user to
189  * remove that randomness by inserting all indices in a specified
190  * order.
191  *
192  * NOTE: This function is not thread safe; only call it from one
193  * thread.
194  */
195  void insert_values_into_index(const std::vector<flexible_type>& features);
196 
197  /** Call this when all calls to map_value_to_index are completed.
198  */
199  void finalize();
200 
201  /** Returns the feature "value" associated an index.
202  *
203  * \note Only used if is_categorical is true.
204  *
205  * \param[\in] idx Index associated with the feature value.
206  * \return The "value" in the original data associated with the given id.
207  */
208  const flexible_type& map_index_to_value(size_t idx) const {
209 
210  DASSERT_TRUE(mode == ml_column_mode::CATEGORICAL
211  || mode == ml_column_mode::CATEGORICAL_VECTOR
212  || mode == ml_column_mode::DICTIONARY);
213 
214  DASSERT_MSG(idx != size_t(-1),
215  "Index not tracked in metadata table!");
216 
217  DASSERT_MSG(idx < values_by_index_lookup.size(),
218  "Index not in metadata table; using correct metadata?");
219 
220  return values_by_index_lookup[idx];
221  }
222 
223  /** Calculates the type of the values held in the index. This may
224  * be different from original_column_type -- if the
225  * original_column_type is a DICT or LIST, this will return a set
226  * of the actual types present. If the values are
227  * inconsistent, then an error is raised.
228  *
229  * This method is useful when a metadata built with a dictionary is
230  * also used to map simple categorical variables.
231  */
232  std::set<flex_type_enum> extract_key_types() const;
233 
234  /** Returns the size of the column.
235  *
236  * Numeric : 1
237  * Categorical : # Unique categories
238  * Vector : Size of the vector.
239  *
240  * \return Column size.
241  */
242  inline size_t indexed_column_size() const {
243  return _column_size;
244  }
245 
246  /** Returns the current version used for the serialization.
247  */
248  size_t get_version() const;
249 
250  /**
251  * Serialize the object (save).
252  */
253  void save_impl(turi::oarchive& oarc) const;
254 
255  /**
256  * Load the object.
257  */
258  void load_version(turi::iarchive& iarc, size_t version);
259 
260  /** Purges and returns all the values; The result is an indexer that
261  * contains no values, but metadata like name, mode, and type are
262  * preserved.
263  */
264  std::vector<flexible_type> reset_and_return_values();
265 
266  /** Sets the indices and creates all the index maps.
267  */
268  void set_indices(std::vector<flexible_type>&& values);
269 
270  /** Checks that the indices are equal across machines.
271  */
273 
274  void debug_check_is_equal(std::shared_ptr<column_indexer> other) const;
275 
276  const std::string& name() const { return column_name; }
277 
278  const ml_column_mode& column_mode() const { return mode;}
279 
280  const flex_type_enum& column_type() const { return original_column_type;}
281  private:
282 
283  /** The name of the column.
284  */
285  std::string column_name;
286 
287  /** The mode of the column;
288  */
289  ml_column_mode mode;
290 
291  /** Original column type
292  */
293  flex_type_enum original_column_type;
294 
295  std::vector<std::pair<simple_spinlock, hopscotch_map<hash_value, size_t> > >
296  index_by_values_lookup;
297 
298  std::vector<std::vector<std::pair<size_t, flexible_type> > >
299  values_by_index_threadlocal_accumulator;
300 
301  std::vector<flexible_type> values_by_index_lookup;
302 
303  // If we have numeric
304  atomic<size_t> _column_size = 0;
305 
306  mutex index_modification_lock;
307 };
308 
309 /// \}
310 }}
311 
312 BEGIN_OUT_OF_PLACE_SAVE(arc, std::shared_ptr<ml_data_internal::column_indexer>, m) {
313  if(m == nullptr) {
314  arc << false;
315  } else {
316  arc << true;
317 
318  // Save the version number
319  size_t version = m->get_version();
320  arc << version;
321 
322  m->save_impl(arc);
323  }
324 
325 } END_OUT_OF_PLACE_SAVE()
326 
327 
328 BEGIN_OUT_OF_PLACE_LOAD(arc, std::shared_ptr<ml_data_internal::column_indexer>, m) {
329  bool is_not_nullptr;
330  arc >> is_not_nullptr;
331  if(is_not_nullptr) {
333 
334  size_t version;
335  arc >> version;
336  m->load_version(arc, version);
337 
338  } else {
339  m = std::shared_ptr<ml_data_internal::column_indexer>(nullptr);
340  }
341 } END_OUT_OF_PLACE_LOAD()
342 
343 #endif
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
Definition: iarchive.hpp:314
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
size_t n_bit_index(size_t n_bits) const
Returns the top number of bits in the hash.
Definition: hash_value.hpp:55
size_t immutable_map_value_to_index(const flexible_type &feature) const
static constexpr int _column_indexer_first_level_lookup_size_n_bits
const char * flex_type_enum_to_name(flex_type_enum en)
size_t map_value_to_index(size_t thread_idx, const flexible_type &feature) GL_HOT
void save_impl(turi::oarchive &oarc) const
std::set< flex_type_enum > extract_key_types() const
flex_type_enum get_type() const
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
std::vector< flexible_type > reset_and_return_values()
void set_indices(std::vector< flexible_type > &&values)
void load_version(turi::iarchive &iarc, size_t version)
std::set< T > values(const std::map< Key, T > &map)
Definition: stl_util.hpp:386
const flexible_type & map_index_to_value(size_t idx) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.
Definition: oarchive.hpp:346
void insert_values_into_index(const std::vector< flexible_type > &features)