Turi Create  4.0
sarray_reader.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_SFRAME_SARRAY_READER_HPP
7 #define TURI_UNITY_SFRAME_SARRAY_READER_HPP
8 #include <set>
9 #include <iterator>
10 #include <type_traits>
11 #include <functional>
12 #include <core/logging/assertions.hpp>
13 #include <core/storage/serialization/iarchive.hpp>
14 #include <core/parallel/pthread_tools.hpp>
15 #include <core/storage/sframe_data/siterable.hpp>
16 #include <core/data/flexible_type/flexible_type.hpp>
17 #include <core/storage/sframe_data/sarray_file_format_v2.hpp>
18 #include <core/storage/sframe_data/sframe_constants.hpp>
19 #include <core/storage/fileio/file_ownership_handle.hpp>
20 #include <core/storage/sframe_data/sarray_reader_buffer.hpp>
21 namespace turi {
22 
23 template <typename T>
24 class sarray;
25 
26 
27 
28 /**
29  * \ingroup sframe_physical
30  * \addtogroup sframe_main Main SFrame Objects
31  * \{
32  */
33 
34 /**
35  * Implements a simple input iterator over an sarray.
36  *
37  * The sarray_iterator provides a simple input iterator (like forward iterator,
38  * but one pass. i.e. increment of one, invalidates all other copies.) over a
39  * segment of an sarray.
40  */
41 template <typename T>
43  public:
44  // standard iterator stuff
45  typedef T value_type;
46  typedef int difference_type;
47  typedef T* pointer;
48  typedef T& reference;
49  typedef std::input_iterator_tag iterator_category;
50 
51  /**
52  * Default constructor. Makes an iterator which points nowhere.
53  * Use of this iterator will produce undefined results.
54  */
56  :reader(NULL), segmentid(0),
57  current_element(T()), current_element_counter(0) {}
58 
59  /// Default copy constructor
60  sarray_iterator(const sarray_iterator& other) = default;
61 
62  /// Default assignment operator
63  sarray_iterator& operator=(const sarray_iterator& other) = default;
64 
65  /**
66  * Constructs an iterator from an input archive.
67  * Creates a start iterator to the segment.
68  *
69  * \param reader The file format reader to use
70  * \param segmentid The segment to read. Must be a valid segment
71  * \param is_start_iterator If true, creates a start iterator to the segment.
72  * If false, creates an end iterator to the segment.
73  */
75  size_t segmentid,
76  bool is_start_iterator)
77  :reader(reader), segmentid(segmentid),
78  current_element(T()), current_element_counter(0) {
79  num_elements = reader->size();
80  if (is_start_iterator) {
81  if (num_elements > 0) {
82  current_element = reader->next();
83  }
84  current_element_counter = 0;
85  } else {
86  // one past the end
87  current_element_counter = num_elements;
88  }
89  }
90 
91  /**
92  * Advances the iterator to the next element
93  */
95  if (reader->has_next()) {
96  current_element = reader->next();
97  ++current_element_counter;
98  } else {
99  current_element_counter = num_elements;
100  reader->clear();
101  }
102  return *this;
103  }
104 
105  /**
106  * Equivalent to other operator++ for input iterators
107  */
108  void operator++(int) {
109  /// forward to the other operator++
110  ++(*this);
111  }
112 
113  /**
114  * Returns true if the iterators are identical (points to the same
115  * element in the same sarray).
116  */
117  bool operator==(const sarray_iterator& other) const {
118  return (reader == other.reader) &&
119  (segmentid == other.segmentid) &&
120  (current_element_counter == other.current_element_counter);
121  }
122 
123  /**
124  * Returns true if the iterators are different (points to different
125  * elements, or in different sarrays)
126  */
127  bool operator!=(const sarray_iterator& other) const {
128  return !((*this) == other);
129  }
130 
131  /**
132  * Returns the current element. Value will be invalid if the iterator
133  * is past the end of the sarray (points to end)
134  */
135  const value_type& operator*() const {
136  return current_element;
137  }
138 
139  /**
140  * \overload
141  */
142  value_type& operator*() {
143  return current_element;
144  }
145 
146  /**
147  * Returns the distance between two iterators. Both iterators must be
148  * from the same segment of the same sarray, otherwise result is undefined.
149  */
150  int operator-(const sarray_iterator& other) const {
151  return (int)(current_element_counter) - (int)(other.current_element_counter);
152  }
153 
154  private:
155  /// The reader we are reading from
156  sarray_reader_buffer<T>* reader;
157  /// The segment we are reading from
158  size_t segmentid;
159  /// The last element read (returned by the deref operator)
160  T current_element;
161  /// Number of elements successfully read so far.
162  size_t current_element_counter;
163  /// Total number of elements
164  size_t num_elements;
165 };
166 
167 
168 
169 /**
170  * The SArray reader provides a reading interface to
171  * an immutable, on disk, sequence of objects T.
172  *
173  * The SArray is an immutable sequence of objects of type T, and is internally
174  * represented as a collection of files. The sequence is cut up into a
175  * collection of segments (not necessarily of equal length), where each segment
176  * covers a disjoint subset of the sequence. Each segment can then be read
177  * in parallel.
178  *
179  * To read from an sarray<T> use \ref sarray::get_reader():
180  * \code
181  * auto reader = array.get_reader();
182  * \endcode
183  * reader will be of type sarray_reader<T>
184  *
185  * reader can then provide input iterators from segments via the begin()
186  * and end() functions.
187  */
188 template <typename T>
189 class sarray_reader: public siterable<sarray_iterator<T> > {
190  public:
191  /// The iterator type which \ref begin and \ref end returns
193 
194  /// The value type the sarray stores
195  typedef typename iterator::value_type value_type;
196 
197  /**
198  * Default constructor. Does nothing. Use init()
199  */
200  sarray_reader() = default;
201 
202  /// Deleted Copy constructor
203  sarray_reader(const sarray_reader& other) = delete;
204 
205  /// Assignment operator
206  sarray_reader& operator=(const sarray_reader& other) = delete;
207 
208 
209 
210 
211  ~sarray_reader() {
212  if (reader) delete reader;
213  }
214 
215 
216  /**
217  * Attempts to construct an sarray_iterator which reads from
218  * an existing sarray.
219  * If the index file cannot be opened, an exception is thrown.
220  *
221  * \param array The array to read
222  * \param num_segments If num_segments == (size_t)(-1), the
223  * original file segmentation is used. Otherwise,
224  * the array is cut into num_segments number of
225  * logical segments which distribute the rows uniformly.
226  */
227  void init(const sarray<T>& array, size_t num_segments = (size_t)(-1)) {
228  ASSERT_MSG(!reader, "Reader already inited");
229  open_format_reader(array);
230  // row start and row end of each segment
231  std::vector<std::pair<size_t, size_t> > segment_row_start_end;
232  if (num_segments == (size_t)(-1)) {
233  // we use the original sarray file layout
234  auto index_info = array.get_index_info();
235  // the cumulative sum across segment lengths
236  size_t current_row_index = 0;
237  // figure out the start and end position for each segment
238  for (size_t i = 0;i < index_info.nsegments; ++i) {
239  size_t row_start = current_row_index;
240  size_t row_end = row_start + index_info.segment_sizes[i];
241  segment_row_start_end.push_back({row_start, row_end});
242  current_row_index = row_end;
243  }
244  } else {
245  // we equally divide the data across the segments
246  // compute the segment lengths
247  ASSERT_GT(num_segments, 0);
248  size_t totallength = size();
249  // figure out the start and end position for each segment
250  for (size_t i = 0;i < num_segments; ++i) {
251  size_t row_start = ((i * totallength) / num_segments);
252  size_t row_end = (((i + 1) * totallength) / num_segments);
253  segment_row_start_end.push_back({row_start, row_end});
254  }
255  }
256  create_segment_read_buffers(segment_row_start_end);
257  files_managed = array.files_managed;
258  }
259 
260  /**
261  * Attempts to construct an sarray_iterator which reads from
262  * an existing sarray and uses a segmentation defined by an argument.
263  * If the index file cannot be opened, an exception is thrown.
264  * If the sum of the lengths of all the segments do not add up to the
265  * length of the sarray, an exception is thrown
266  *
267  * \param array The array to read
268  * \param segment_lengths An array describing the lengths of each segment.
269  * This must sum up to the length of the array.
270  */
271  void init(const sarray<T>& array, const std::vector<size_t>& segment_lengths) {
272  ASSERT_MSG(!reader, "Reader already inited");
273  open_format_reader(array);
274  // check that lengths add up
275  size_t sum = 0;
276  for(size_t s: segment_lengths) sum += s;
277  ASSERT_EQ(sum, size());
278 
279  // row start and row end of each segment
280  std::vector<std::pair<size_t, size_t> > segment_row_start_end;
281 
282  // the cumulative sum across segment lengths
283  size_t current_row_index = 0;
284  // figure out the start and end position for each segment
285  for (size_t i = 0;i < segment_lengths.size(); ++i) {
286  size_t row_start = current_row_index;
287  size_t row_end = row_start + segment_lengths[i];
288  segment_row_start_end.push_back({row_start, row_end});
289  current_row_index = row_end;
290  }
291  create_segment_read_buffers(segment_row_start_end);
292  }
293 
294  /**
295  * Return the number of segments in the collection.
296  * Will throw an exception if the sarray is invalid (there is an error
297  * reading files)
298  */
299  size_t num_segments() const {
300  ASSERT_NE(reader, NULL);
301  return m_num_segments;
302  }
303 
304  /**
305  * Return the number of rows in the segment.
306  * Will throw an exception if the sarray is invalid (there is an error
307  * reading files)
308  */
309  size_t segment_length(size_t segment) const {
310  ASSERT_NE(reader, NULL);
311  return m_segment_lengths[segment];
312  }
313 
314  /**
315  * Return the file prefix of the sarray (paramter on construction)
316  */
317  std::string get_index_file() const {
318  ASSERT_NE(reader, NULL);
319  return reader->get_index_file();
320  }
321 
322 
323  /**
324  * Returns the collection of files storing the sarray.
325  * For instance: [file_prefix].sidx, [file_prefix].0001, etc.
326  */
327  std::vector<std::string> get_file_names() const {
328  ASSERT_NE(reader, NULL);
329  return reader->get_index_info().segment_files;
330  }
331 
332  /**
333  * Reads the value of a key associated with the sarray.
334  * Returns true on success, false on failure.
335  */
336  bool get_metadata(std::string key, std::string &val) const {
337  ASSERT_NE(reader, NULL);
338  if (reader == NULL) log_and_throw(std::string("Invalid sarray"));
339  bool ret = false;
340  std::tie(ret, val) = get_metadata(key);
341  return ret;
342  }
343 
344  /**
345  * Reads the value of a key associated with the sarray.
346  * Returns a pair of (true, value) on success, and (false, empty_string)
347  * on failure.
348  */
349  std::pair<bool, std::string> get_metadata(std::string key) const {
350  ASSERT_NE(reader, NULL);
351  const index_file_information& index_info = reader->get_index_info();
352  if (index_info.metadata.count(key)) {
353  return std::pair<bool, std::string>(true,
354  index_info.metadata.at(key));
355  } else {
356  return std::pair<bool, std::string>(false, "");
357  }
358  }
359 
360  /**
361  * Returns the number of elements in the SArray
362  */
363  size_t size() const {
364  ASSERT_NE(reader, NULL);
365  size_t total = 0;
366  const index_file_information& info = reader->get_index_info();
367  for (size_t i = 0;i < info.segment_sizes.size(); ++i) {
368  total += info.segment_sizes[i];
369  }
370  return total;
371  }
372 
373 
374  /**
375  * Return the begin iterator of the segment.
376  * The iterator (\ref sarray_iterator) is of the input iterator type and
377  * has value_type T. See \ref end() to get the end iterator of the segment.
378  *
379  * The iterator is invalid once the originating sarray is destroyed.
380  * Accessing the iterator after the sarray is destroyed is undefined behavior.
381  *
382  * \code
383  * // example to print segment 1 to screen
384  * auto iter = sarr.begin(1);
385  * auto enditer =sarr.end(1);
386  * while(iter != enditer) {
387  * std::cout << *iter << "\n";
388  * ++iter;
389  * }
390  * \endcode
391  *
392  * Will throw an exception if the sarray is invalid (there is an error
393  * reading files) Also segmentid must be a valid segment ID. Will throw an
394  * exception otherwise.
395  */
396  iterator begin(size_t segmentid) const {
397  std::lock_guard<mutex> lck(lock);
398  if (opened_segments.count(segmentid) == 0) {
399  opened_segments.insert(segmentid);
400  } else {
401  log_and_throw(std::string("Must reset sarray iterators!"));
402  }
403  if (reader == NULL) log_and_throw(std::string("Invalid sarray"));
404  if (segmentid >= num_segments()) log_and_throw(std::string("Invalid segment ID"));
405  return iterator(&(m_read_buffers[segmentid]), segmentid, true);
406  }
407 
408  /** Return the end iterator of the segment.
409  * The iterator (\ref sarray_iterator) is of the input iterator type and
410  * has value_type T. See \ref end() to get the end iterator of the segment.
411  *
412  * The iterator is invalid once the originating sarray is destroyed.
413  * Accessing the iterator after the sarray is destroyed is undefined behavior.
414  *
415  * \code
416  * // example to print segment 1 to screen
417  * auto iter = sarr.begin(1);
418  * auto enditer =sarr.end(1);
419  * while(iter != enditer) {
420  * std::cout << *iter << "\n";
421  * ++iter;
422  * }
423  * \endcode
424  *
425  * Will throw an exception if the sarray is invalid (there is an error
426  * reading files) Also segmentid must be a valid segment ID. Will throw an
427  * exception otherwise.
428  */
429  iterator end(size_t segmentid) const {
430  ASSERT_NE(reader, NULL);
431  ASSERT_LT(segmentid, num_segments());
432  return iterator(&(m_read_buffers[segmentid]), segmentid, false);
433  }
434 
435 
436  /**
437  * Reads a collection of rows, storing the result in out_obj.
438  * This function is independent of the open_segment/read_segment/close_segment
439  * functions, and can be called anytime. This function is also fully
440  * concurrent.
441  * \param row_start First row to read
442  * \param row_end one past the last row to read (i.e. EXCLUSIVE). row_end can
443  * be beyond the end of the array, in which case,
444  * fewer rows will be read.
445  * \param out_obj The output array
446  * \returns Actual number of rows read. Return (size_t)(-1) on failure.
447  *
448  * \note This function is not always efficient. Different file formats
449  * implementations will have different characteristics.
450  */
451  size_t read_rows(size_t row_start,
452  size_t row_end,
453  std::vector<T>& out_obj) {
454  DASSERT_NE(reader, NULL);
455  return reader->read_rows(row_start, row_end, out_obj);
456  }
457 
458  /**
459  * Reads a collection of rows, storing the result in out_obj.
460  * This function is independent of the open_segment/read_segment/close_segment
461  * functions, and can be called anytime. This function is also fully
462  * concurrent.
463  * \param row_start First row to read
464  * \param row_end one past the last row to read (i.e. EXCLUSIVE). row_end can
465  * be beyond the end of the array, in which case,
466  * fewer rows will be read.
467  * \param out_obj The output array
468  * \returns Actual number of rows read. Return (size_t)(-1) on failure.
469  *
470  * \note This function is not always efficient. Different file formats
471  * implementations will have different characteristics.
472  */
473  size_t read_rows(size_t row_start,
474  size_t row_end,
475  sframe_rows& out_obj);
476 
477 
478  /**
479  * Resets all the file handles. All existing iterators are invalidated.
480  */
482  std::lock_guard<mutex> lck(lock);
483  opened_segments.clear();
484  for (auto& buf: m_read_buffers) buf.clear();
485  }
486 
487 
488  /**
489  * Returns the type of the SArray (as set by
490  * \ref swriter<flexible_type>::set_type). If the type of the SArray was
491  * not set, this returns \ref flex_type_enum::UNDEFINED, in which case
492  * each row can be of arbitrary type.
493  *
494  * This function should only be used for sarray<flexible_type> and
495  * will fail fatally otherwise.
496  */
498  if (!std::is_same<T, flexible_type>::value) {
499  ASSERT_MSG(false,
500  "Use of get_type() in SArray which "
501  "does not contain flexible_types");
502  }
503  ASSERT_NE(reader, NULL);
504  const index_file_information& index_info = reader->get_index_info();
505  if (index_info.metadata.count("__type__")) {
507  }
508  return flex_type_enum(std::stoi(index_info.metadata.at("__type__")));
509  }
510 
511 
512  private:
513  mutable sarray_format_reader<T>* reader = NULL;
514  mutable mutex lock;
515  size_t m_num_segments = 0;
516  mutable std::set<size_t> opened_segments;
517  std::vector<size_t> m_segment_lengths;
518  // take a reference to all the handled files so that deletion of the sarray
519  // does not cause the reader to become invalidated.
520  std::vector<std::shared_ptr<fileio::file_ownership_handle>> files_managed;
521 
522  mutable std::vector<sarray_reader_buffer<T> > m_read_buffers;
523 
524  /**
525  * Construct the format reader object from the sarray.
526  * Only called by the init() functions.
527  */
528  void open_format_reader(const sarray<T>& array) {
529  // redirect to the appropriate file reader
530  // for now, we only have version 0
531  size_t file_version = array.get_index_info().version;
532 
533  switch(file_version) {
534  case 0:
535  log_and_throw("Format version 0 deprecated");
536  break;
537  case 1:
538  log_and_throw("Format version 1 deprecated");
539  break;
540  case 2:
541  reader = new sarray_format_reader_v2<T>();
542  reader->open(array.get_index_info());
543  break;
544  default:
545  reader = NULL;
546  log_and_throw("Invalid file format version");
547  break;
548  }
549  }
550 
551  /**
552  * Construct the format reader object from the sarray.
553  * Only called by the init() functions. Also sets up the m_segment_lengths
554  * and m_num_segments variables.
555  */
556  void create_segment_read_buffers(
557  const std::vector<std::pair<size_t, size_t> >& segment_row_start_end) {
558  // set up the read buffers
559  m_num_segments = segment_row_start_end.size();
560  m_segment_lengths.resize(m_num_segments);
561  m_read_buffers.resize(m_num_segments);
562 
563  for (size_t i = 0;i < m_segment_lengths.size(); ++i) {
564  m_segment_lengths[i] =
565  segment_row_start_end[i].second - segment_row_start_end[i].first;
566  m_read_buffers[i].init(this,
567  segment_row_start_end[i].first,
568  segment_row_start_end[i].second);
569  }
570  }
571 };
572 
573 template <typename T>
574 inline size_t sarray_reader<T>::read_rows(size_t row_start,
575  size_t row_end,
576  sframe_rows& out_obj) {
577  ASSERT_MSG(false, "read_rows() to sframe_rows not implemented for "
578  "non-flexible_type templatizations of sarray");
579  return 0;
580 }
581 
582 
583 template <>
584 inline size_t sarray_reader<flexible_type>::read_rows(size_t row_start,
585  size_t row_end,
586  sframe_rows& out_obj) {
587  DASSERT_NE(reader, NULL);
588  return reader->read_rows(row_start, row_end, out_obj);
589 }
590 
591 /// \}
592 
593 } // namespace turi
594 
595 namespace std {
596 
597 // specialization of std::distance
598 template <typename T>
599 inline int distance(const turi::sarray_iterator<T>& begin,
600  const turi::sarray_iterator<T>& end) {
601  return end - begin;
602 }
603 
604 } // namespace std
605 
606 #endif
iterator begin(size_t segmentid) const
sarray_iterator< T > iterator
The iterator type which begin and end returns.
void init(const sarray< T > &array, const std::vector< size_t > &segment_lengths)
int operator-(const sarray_iterator &other) const
STL namespace.
bool operator==(const sarray_iterator &other) const
std::pair< bool, std::string > get_metadata(std::string key) const
const value_type & operator*() const
value_type && next()
Return the next element in the reader.
sarray_iterator & operator++()
const index_file_information get_index_info() const
Definition: sarray.hpp:448
size_t read_rows(size_t row_start, size_t row_end, std::vector< T > &out_obj)
value_type & operator*()
sarray_iterator(sarray_reader_buffer< T > *reader, size_t segmentid, bool is_start_iterator)
std::string get_index_file() const
std::vector< std::string > get_file_names() const
flex_type_enum get_type() const
size_t segment_length(size_t segment) const
size_t num_segments() const
bool get_metadata(std::string key, std::string &val) const
size_t size()
Return the Number of elements between row_start and row_end.
size_t size() const
void init(const sarray< T > &array, size_t num_segments=(size_t)(-1))
iterator::value_type value_type
The value type the sarray stores.
std::vector< size_t > segment_sizes
The length of each segment (number of entries).
sarray_iterator & operator=(const sarray_iterator &other)=default
Default assignment operator.
std::map< std::string, std::string > metadata
Any additional metadata stored with the array.
iterator end(size_t segmentid) const
bool operator!=(const sarray_iterator &other) const