Turi Create  4.0
sframe_reader_buffer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_READER_BUFFER
7 #define TURI_SFRAME_READER_BUFFER
8 #include <memory>
9 #include <vector>
10 #include <core/data/flexible_type/flexible_type.hpp>
11 #include <core/storage/sframe_data/sframe.hpp>
12 #include <core/storage/sframe_data/sframe_constants.hpp>
13 namespace turi {
14 class sframe;
15 
16 
17 /**
18  * \ingroup sframe_physical
19  * \addtogroup sframe_main Main SFrame Objects
20  * \{
21  */
22 
23 /**
24  * A buffered reader reading from a range of an sframe<T>.
25  *
26  * \code
27  * sframe<flexible_type> mysframe = ...;
28  *
29  * // Reader for the first thousand lines
30  * sframe_reader_buffer<flexible_type> reader(mysframe, 0, 1000);
31  *
32  * while(reader.has_next()) {
33  * flexible_type val = reader.next();
34  * ... do some thing with val ...
35  * }
36  *
37  * // Reader for the entire sframe
38  * reader = sframe_reader_buffer<flexible_type>(mysframe, 0, (size_t)(-1));
39  * ...
40  * \endcode
41  *
42  * Internally, the reader maintains a vector as buffer, and when reading
43  * reaches the end of the buffer, refill the buffer by reading from sframe.
44  */
46  public:
48 
49  sframe_reader_buffer() = default;
50 
51  /// Construct from sframe reader with begin and end row.
53  std::shared_ptr<typename sframe::reader_type> reader,
54  size_t row_start, size_t row_end,
55  size_t buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
56  init(reader, row_start, row_end, buffer_size);
57  }
58 
59  void init(std::shared_ptr<typename sframe::reader_type>& reader,
60  size_t row_start, size_t row_end,
61  size_t internal_buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
62  m_reader = reader;
63  m_buffer_pos = 0;
64  m_iter = row_start;
65  m_original_row_start = row_start;
66  m_row_start = row_start;
67  m_row_end = std::min(row_end, m_reader->size());
68  m_buffer_size = internal_buffer_size;
69  m_buffer.clear();
70  }
71 
72  /// Return the next element in the reader.
73  const sframe_rows::row& next();
74 
75  /// Returns the current element
76  const sframe_rows::row& current();
77 
78  /// Return true if the reader has more element.
79  bool has_next();
80 
81  /// Return the buffer.
82  inline sframe_rows& get_buffer() {return m_buffer;}
83 
84  /// Return the Number of elements between row_start and row_end.
85  inline size_t size() {return m_row_end - m_original_row_start;}
86 
87  /** Resets the buffer to the initial starting conditions. Reading
88  * from the buffer again will start from row_start.
89  */
90  void clear();
91 
92  private:
93  /// Refill the chunk buffer form the sframe reader.
94  void refill();
95 
97 
98  /// Buffer the prefetched elements.
99  sframe_rows m_buffer;
100 
101  /// Current value
102  sframe_rows::row m_current;
103 
104  /// The underlying reader as a data source.
105  std::shared_ptr<reader_type> m_reader;
106 
107  /// Current position of the buffer reader.
108  size_t m_buffer_pos = 0;
109  /// The initial starting point. clear() will reset row_start to here.
110  size_t m_original_row_start = 0;
111  /// Start row of the remaining chunk.
112  size_t m_row_start = 0;
113  /// End row of the chunk.
114  size_t m_row_end = 0;
115  /// The size of the buffer vector
116  size_t m_buffer_size = 0;
117  /// The current iterator location
118  size_t m_iter = 0;
119 };
120 
121 /// \}
122 //
123 /// Return the next element in the chunk.
125  if (m_buffer_pos == m_buffer.num_rows()) {
126  refill();
127  m_buffer_pos = 0;
128  }
129  DASSERT_LT(m_buffer_pos, m_buffer.num_rows());
130  ++m_iter;
131  m_current.copy_reference(m_buffer[m_buffer_pos++]);
132  return m_current;
133 }
134 
136  return m_current;
137 }
138 
139 /// Return true if the chunk has remaining element.
141  return m_iter < m_row_end;
142 }
143 
144 /// Refill the chunk buffer form the sframe reader.
145 inline void sframe_reader_buffer::refill() {
146  size_t size_of_refill = std::min<size_t>(m_row_end - m_row_start, m_buffer_size);
147  m_reader->read_rows(m_row_start, m_row_start + size_of_refill, m_buffer);
148  m_row_start += size_of_refill;
149 }
150 
151 
153  m_buffer.clear();
154  m_row_start = m_original_row_start;
155  m_iter = m_original_row_start;
156  m_buffer_pos = 0;
157 }
158 }
159 
160 #endif
size_t num_rows() const
Returns the number of rows.
Definition: sframe_rows.hpp:95
sframe_rows & get_buffer()
Return the buffer.
const sframe_rows::row & current()
Returns the current element.
const size_t DEFAULT_SARRAY_READER_BUFFER_SIZE
const sframe_rows::row & next()
Return the next element in the reader.
sframe_reader_buffer(std::shared_ptr< typename sframe::reader_type > reader, size_t row_start, size_t row_end, size_t buffer_size=DEFAULT_SARRAY_READER_BUFFER_SIZE)
Construct from sframe reader with begin and end row.
size_t size()
Return the Number of elements between row_start and row_end.
bool has_next()
Return true if the reader has more element.
void copy_reference(const row &other)