Turi Create  4.0
sframe_reader.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_LIB_SFRAME_READER_HPP
7 #define TURI_UNITY_LIB_SFRAME_READER_HPP
8 #include <iostream>
9 #include <algorithm>
10 #include <memory>
11 #include <vector>
12 #include <core/logging/logger.hpp>
13 #include <core/data/flexible_type/flexible_type.hpp>
14 #include <core/storage/sframe_data/sarray_reader.hpp>
15 #include <core/storage/sframe_data/sframe_index_file.hpp>
16 #include <core/storage/sframe_data/sframe_constants.hpp>
17 
18 
19 namespace turi {
20 // forward declaration of th csv_line_tokenizer to avoid a
21 // circular dependency
22 struct csv_line_tokenizer;
23 class sframe;
24 
25 
26 /**
27  * \ingroup sframe_physical
28  * \addtogroup sframe_main Main SFrame Objects
29  * \{
30  */
31 
32 /**
33  * A input iterator over an SFrame.
34  *
35  * The sframe_iterator provides a simple input iterator (like forward iterator,
36  * but one pass. i.e. increment of one, invalidates all other copies.) over a
37  * segment of an sframe. It essentially exposes a column of vectors, where each
38  * vector is a row in a table.
39  *
40  * Since this class wraps several sarray_iterators, it inherits their guarantees,
41  * and is thus an input iterator.
42  */
44  public:
45  // Standard iterator stuff
46  typedef std::vector<flexible_type> value_type;
47  typedef int difference_type;
48  typedef value_type* pointer;
49  typedef value_type& reference;
50  typedef std::input_iterator_tag iterator_category;
51 
52  sframe_iterator() {}
53 
54  /**
55  * Constructs an iterator from the underlying data structure of an SFrame
56  *
57  * \param data The "SFrame" to iterate over
58  * \param segmentid The segment to read. Must be a valid segment.
59  * \param is_begin_iterator If true, constructs an iterator pointing to
60  * the first row of the given segment
61  * If false, constructs an iterator pointing to
62  * one row past the end of the given segment
63  */
65  const std::vector<std::shared_ptr<sarray_reader<flexible_type> > > &data,
66  size_t segmentid,
67  bool is_begin_iterator) : _data(&data),
68  _segmentid(segmentid) {
69 
70  // Create an SArray iterator for each column of the SFrame.
71  cur_iter.resize(_data->size());
72  cur_element.resize(_data->size());
73  for(size_t i = 0; i < _data->size(); ++i) {
74  if(is_begin_iterator) {
75  cur_iter[i] = _data->at(i)->begin(segmentid);
76  } else {
77  cur_iter[i] = _data->at(i)->end(segmentid);
78  }
79  }
80 
81  // Variables that make equality easier to check
82  segment_limit = _data->at(0)->segment_length(segmentid);
83 
84  if(is_begin_iterator) {
85  cur_segment_pos = 0;
86  } else {
87  cur_segment_pos = segment_limit;
88  }
89  }
90 
91  /**
92  * Advances the iterator to the next row of the segment
93  */
95  for(auto& i : cur_iter) {
96  ++i;
97  }
98 
99  ++cur_segment_pos;
100 
101  // Never go past the limit (one past the end of the segment)
102  if(cur_segment_pos > segment_limit) {
103  cur_segment_pos = segment_limit;
104  }
105 
106  return *this;
107  }
108 
109 
110  /*
111  * This is the post-fix increment. Returns the previous value of the
112  * iterator.
113  */
115  sframe_iterator orig = *this;
116  ++(*this);
117  return orig;
118  }
119 
120  // Default assignment operator
121  sframe_iterator& operator=(const sframe_iterator& other) = default;
122 
123  // Default copy constructor
124  sframe_iterator(const sframe_iterator& other) = default;
125 
126  /**
127  * Returns true if iterators are identical (points to the same SFrame,
128  * in the same segment, at the same position)
129  */
130  bool operator==(const sframe_iterator& other) const {
131  return _data == other._data &&
132  _segmentid == other._segmentid &&
133  cur_segment_pos == other.cur_segment_pos;
134  }
135 
136  /**
137  * Returns true if iterators are different (different SFrame, different
138  * segment, or different position)
139  */
140  bool operator!=(const sframe_iterator& other) const {
141  return _data != other._data ||
142  _segmentid != other._segmentid ||
143  cur_segment_pos != other.cur_segment_pos;
144  }
145 
146  /**
147  * Returns the current element. Value will be invalid if the iterator
148  * is past the end of the sarray (points to end)
149  */
150  const value_type& operator*() const {
151  for (size_t i = 0; i < _data->size(); ++i) {
152  cur_element[i] = *(cur_iter[i]);
153  }
154  return cur_element;
155  }
156 
157  /**
158  * Returns a pointer to the current element. Value will be invalid if
159  * iterator == end.
160  */
161  const value_type *operator->() const {
162  this->operator*();
163  return &cur_element;
164  }
165 
166  /**
167  * Returns the distance between two iterators. Both iterators must be
168  * from the same segment of the same sframe, otherwise result is undefined.
169  */
170  int operator-(const sframe_iterator& other) const {
171  return (int)(cur_segment_pos) - (int)(other.cur_segment_pos);
172  }
173  private:
174  const std::vector<std::shared_ptr<sarray_reader<flexible_type> > > *_data;
175  std::vector<sarray_iterator<flexible_type> > cur_iter;
176  // Relies on all the segments being the same length
177  size_t _segmentid = (size_t)(-1);
178  size_t cur_segment_pos = 0;
179  size_t segment_limit = 0;
180  mutable value_type cur_element;
181 };
182 
183 /**
184  * The sframe_reader provides a reading interface to an \ref sframe :
185  * an immutable on-disk set of columns, each with
186  * their own type. These types are represented as a flexible_type.
187  *
188  * The SFrame is represented as an ordered set of SArrays, each with an
189  * enforcable name and type. Each SArray in an SFrame must have the same
190  * number of segments as all other SArrays in the SFrame, which each must
191  * have the same number of elements as all other segments. A segment of an
192  * SFrame is a disjoint subset of rows with an entry from each column.
193  * Segments can be read in parallel.
194  *
195  * To read from an sframe use \ref sframe::get_reader():
196  * \code
197  * auto reader = frame.get_reader();
198  * \endcode
199  * reader will be of type sframe_reader
200  *
201  * reader can then provide input iterators from segments via the begin()
202  * and end() functions.
203  */
204 class sframe_reader : public siterable<sframe_iterator> {
205  public:
206  /// The iterator type which \ref begin and \ref end returns
208 
209  /// The value type the sframe stores
210  typedef sframe_iterator::value_type value_type;
211 
212  /**
213  * Constructs an empty sframe.
214  */
215  sframe_reader() = default;
216 
217  /// Deleted Copy constructor
218  sframe_reader(const sframe_reader& other) = delete;
219 
220  /// Deleted Assignment operator
221  sframe_reader& operator=(const sframe_reader& other) = delete;
222 
223  /**
224  * Attempts to construct an sframe_iterator which reads
225  * If the index file cannot be opened, an exception is thrown.
226  *
227  * \param array The array to read
228  * \param num_segments If num_segments == (size_t)(-1), the
229  * segmentation of the first column is used. Otherwise,
230  * the array is cut into num_segments number of
231  * logical segments which distribute the rows uniformly.
232  */
233  void init(const sframe& array, size_t num_segments = (size_t)(-1));
234 
235  /**
236  * Attempts to construct an sframe_iterator which reads from
237  * an existing sframe and uses a segmentation defined by an argument.
238  * If the index file cannot be opened, an exception is thrown.
239  * If the sum of the lengths of all the segments do not add up to the
240  * length of the sframe , an exception is thrown
241  *
242  * \param array The frame to read
243  * \param segment_lengths An array describing the lengths of each segment.
244  * This must sum up to the length of the array.
245  */
246  void init(const sframe& array, const std::vector<size_t>& segment_lengths);
247 
248  /// Return the begin iterator of the segment.
249  iterator begin (size_t segmentid) const;
250 
251  /// Return the end iterator of the segment.
252  iterator end (size_t segmentid) const;
253 
254  /**
255  * Reads a collection of rows, storing the result in out_obj.
256  * This function is independent of the begin/end iterator
257  * functions, and can be called anytime. This function is also fully
258  * concurrent.
259  * \param row_start First row to read
260  * \param row_end one past the last row to read (i.e. EXCLUSIVE). row_end can
261  * be beyond the end of the array, in which case,
262  * fewer rows will be read.
263  * \param out_obj The output array
264  * \returns Actual number of rows read. Return (size_t)(-1) on failure.
265  *
266  * \note This function is not always efficient. Different file formats
267  * implementations will have different characteristics.
268  */
269  size_t read_rows(size_t row_start,
270  size_t row_end,
271  std::vector<std::vector<flexible_type> >& out_obj);
272 
273 
274  /**
275  * Reads a collection of rows, storing the result in out_obj.
276  * This function is independent of the begin/end iterator
277  * functions, and can be called anytime. This function is also fully
278  * concurrent.
279  * \param row_start First row to read
280  * \param row_end one past the last row to read (i.e. EXCLUSIVE). row_end can
281  * be beyond the end of the array, in which case,
282  * fewer rows will be read.
283  * \param out_obj The output array
284  * \returns Actual number of rows read. Return (size_t)(-1) on failure.
285  *
286  * \note This function is not always efficient. Different file formats
287  * implementations will have different characteristics.
288  */
289  size_t read_rows(size_t row_start,
290  size_t row_end,
291  sframe_rows& out_obj);
292 
293 
294  /**
295  * Resets all the file handles. All existing iterators are invalidated.
296  */
297  void reset_iterators();
298 
299  /// Returns the number of columns in the SFrame. Does not throw.
300  inline size_t num_columns() const {
301  return index_info.ncolumns;
302  }
303 
304  /// Returns the length of each sarray.
305  inline size_t num_rows() const {
306  return index_info.nrows;
307  }
308 
309  /// Returns the length of each sarray.
310  inline size_t size() const {
311  return index_info.nrows;
312  }
313 
314  /**
315  * Returns the name of the given column. Throws an exception if the
316  * column id is out of range.
317  */
318  inline std::string column_name(size_t i) const {
319  ASSERT_LT(i, index_info.ncolumns);
320 
321  return index_info.column_names[i];
322  }
323 
324  /**
325  * Returns the type of the given column. Throws an exception if the
326  * column id is out of range.
327  */
328  inline flex_type_enum column_type(size_t i) const {
329  ASSERT_LT(i, index_info.ncolumns);
330 
331  return column_data[i]->get_type();
332  }
333 
334  /// Returns the number of segments in the SFrame. Does not throw.
335  inline size_t num_segments() const {
336  return m_num_segments;
337  }
338 
339  /**
340  * Returns the length of the given segment. Throws an exception if the
341  * segment id is out of range.
342  */
343  inline size_t segment_length(size_t segment) const {
344  ASSERT_LT(segment, num_segments());
345  if (index_info.ncolumns == 0) return 0;
346  return column_data[0]->segment_length(segment);
347  }
348 
349 
350  /**
351  * Returns true if the sframe contains the given column.
352  */
353  inline bool contains_column(const std::string& column_name) const {
354  auto iter = std::find(index_info.column_names.begin(),
355  index_info.column_names.end(),
356  column_name);
357  return iter != index_info.column_names.end();
358  }
359  /**
360  * Returns the column index of column_name.
361  *
362  * Throws an exception of the column_ does not exist.
363  */
364  inline size_t column_index(const std::string& column_name) const {
365  auto iter = std::find(index_info.column_names.begin(),
366  index_info.column_names.end(),
367  column_name);
368  if (iter != index_info.column_names.end()) {
369  return (iter) - index_info.column_names.begin();
370  } else {
371  throw (std::string("Column name " + column_name + " does not exist."));
372  }
373  }
374 
375  private:
376 
377  // Internal data storage
378  bool inited = false;
380  std::vector<std::shared_ptr<sarray_reader<flexible_type> > > column_data;
382  size_t m_num_segments = 0;
383 };
384 
385 /// \}
386 } // end of namespace turi
387 
388 
389 
390 namespace std {
391 
392 // specialization of std::distance
393 inline int distance(const turi::sframe_iterator& begin,
394  const turi::sframe_iterator& end) {
395  return end - begin;
396 }
397 
398 } // namespace std
399 
400 #endif
flex_type_enum column_type(size_t i) const
const value_type * operator->() const
size_t size() const
Returns the length of each sarray.
sframe_iterator(const std::vector< std::shared_ptr< sarray_reader< flexible_type > > > &data, size_t segmentid, bool is_begin_iterator)
std::string column_name(size_t i) const
STL namespace.
size_t segment_length(size_t segment) const
bool contains_column(const std::string &column_name) const
const value_type & operator*() const
int operator-(const sframe_iterator &other) const
bool operator==(const sframe_iterator &other) const
size_t num_rows() const
Returns the length of each sarray.
sframe_iterator & operator++()
sframe_iterator::value_type value_type
The value type the sframe stores.
size_t column_index(const std::string &column_name) const
sframe_iterator iterator
The iterator type which begin and end returns.
size_t num_segments() const
Returns the number of segments in the SFrame. Does not throw.
bool operator!=(const sframe_iterator &other) const
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.