Turi Create  4.0
sarray_reader_buffer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SARRAY_READER_BUFFER
7 #define TURI_SARRAY_READER_BUFFER
8 #include <memory>
9 #include <vector>
10 #include <core/storage/sframe_data/sframe_constants.hpp>
11 namespace turi {
12 template <typename T>
13 class sarray;
14 
15 
16 
17 /**
18  * \ingroup sframe_physical
19  * \addtogroup sframe_main Main SFrame Objects
20  * \{
21  */
22 
23 /**
24  * A buffered reader reading from a range of an sarray<T>.
25  *
26  * \code
27  * sarray<flexible_type> mysarray = ...;
28  *
29  * // Reader for the first thousand lines
30  * sarray_reader_buffer<flexible_type> reader(mysarray, 0, 1000);
31  *
32  * while(reader.has_next()) {
33  * flexible_type val = reader.next();
34  * ... do some thing with val ...
35  * }
36  *
37  * // Reader for the entire sarray
38  * reader = sarray_reader_buffer<flexible_type>(mysarray, 0, (size_t)(-1));
39  * ...
40  * \endcode
41  *
42  * Internally, the reader maintains a vector as buffer, and when reading
43  * reaches the end of the buffer, refill the buffer by reading from sarray.
44  */
45 template<typename T>
46 class sarray_reader_buffer {
47  public:
48  typedef T value_type;
49 
50  sarray_reader_buffer() = default;
51 
52  /// Construct from sarray reader with begin and end row.
54  std::shared_ptr<typename sarray<T>::reader_type> reader,
55  size_t row_start, size_t row_end,
56  size_t buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
57  init(reader, row_start, row_end, buffer_size);
58  }
59 
60  /**
61  * Construction of a reader_buffer where the reader_buffer owns a
62  * reference to the underlying reader.
63  */
64  void init(std::shared_ptr<typename sarray<T>::reader_type>& reader,
65  size_t row_start, size_t row_end,
66  size_t internal_buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
67  m_reader = reader;
68  m_reader_weak = m_reader.get();
69  m_buffer_pos = 0;
70  m_iter = row_start;
71  m_original_row_start = row_start;
72  m_row_start = row_start;
73  m_row_end = std::min(row_end, m_reader_weak->size());
74  m_buffer_size = internal_buffer_size;
75  m_buffer.clear();
76  }
77 
78  /**
79  * Construction of a reader_buffer where the reader_buffer DOES NOT own
80  * a reference to the underlying reader.
81  * \note This is quite unsafe to use since its not even a weak_ptr
82  * but a raw pointer. The user has to be careful to ensure that the
83  * sarray_reader's lifespan exceeds that of the reader_buffer. This is
84  * provided only to support a circular case where the sarray_reader itself
85  * uses sarray_reader_buffer for buffering.
86  */
87  void init(typename sarray<T>::reader_type* reader_weak,
88  size_t row_start, size_t row_end,
89  size_t internal_buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
90  m_reader_weak = reader_weak;
91  m_buffer_pos = 0;
92  m_iter = row_start;
93  m_original_row_start = row_start;
94  m_row_start = row_start;
95  m_row_end = std::min(row_end, m_reader_weak->size());
96  m_buffer_size = internal_buffer_size;
97  m_buffer.clear();
98  }
99 
100  /// Return the next element in the reader.
101  value_type&& next();
102 
103  /// Return true if the reader has more element.
104  bool has_next();
105 
106  /// Return the buffer.
107  inline std::vector<value_type>& get_buffer() {return m_buffer;}
108 
109  /// Return the Number of elements between row_start and row_end.
110  inline size_t size() {return m_row_end - m_original_row_start;}
111 
112  /** Resets the buffer to the initial starting conditions. Reading
113  * from the buffer again will start from row_start.
114  */
115  void clear();
116 
117  private:
118  /// Refill the chunk buffer form the sarray reader.
119  void refill();
120 
121  typedef typename sarray<T>::reader_type reader_type;
122 
123  /// Buffer the prefetched elements.
124  std::vector<value_type> m_buffer;
125 
126  /// The underlying reader as a data source.
127  std::shared_ptr<reader_type> m_reader;
128  reader_type* m_reader_weak = nullptr;
129 
130  /// Current position of the buffer reader.
131  size_t m_buffer_pos = 0;
132  /// The initial starting point. clear() will reset row_start to here.
133  size_t m_original_row_start = 0;
134  /// Start row of the remaining chunk.
135  size_t m_row_start = 0;
136  /// End row of the chunk.
137  size_t m_row_end = 0;
138  /// The size of the buffer vector
139  size_t m_buffer_size = 0;
140  /// The current iterator location
141  size_t m_iter = 0;
142 };
143 
144 /// Return the next element in the chunk.
145 template<typename T>
147  if (m_buffer_pos == m_buffer.size()) {
148  refill();
149  m_buffer_pos = 0;
150  }
151  DASSERT_LT(m_buffer_pos, m_buffer.size());
152  ++m_iter;
153  return std::move(m_buffer[m_buffer_pos++]);
154 }
155 
156 /// \}
157 //
158 /// Return true if the chunk has remaining element.
159 template<typename T>
161  return m_iter < m_row_end;
162 }
163 
164 /// Refill the chunk buffer form the sarray reader.
165 template<typename T>
167  size_t size_of_refill = std::min<size_t>(m_row_end - m_row_start, m_buffer_size);
168  m_reader_weak->read_rows(m_row_start, m_row_start + size_of_refill, m_buffer);
169  m_row_start += size_of_refill;
170 }
171 
172 
173 template<typename T>
175  m_buffer.clear();
176  m_row_start = m_original_row_start;
177  m_iter = m_original_row_start;
178  m_buffer_pos = 0;
179 }
180 }
181 
182 #endif
void init(std::shared_ptr< typename sarray< T >::reader_type > &reader, size_t row_start, size_t row_end, size_t internal_buffer_size=DEFAULT_SARRAY_READER_BUFFER_SIZE)
std::vector< value_type > & get_buffer()
Return the buffer.
void init(typename sarray< T >::reader_type *reader_weak, size_t row_start, size_t row_end, size_t internal_buffer_size=DEFAULT_SARRAY_READER_BUFFER_SIZE)
value_type && next()
Return the next element in the reader.
bool has_next()
Return true if the reader has more element.
sarray_reader_buffer(std::shared_ptr< typename sarray< T >::reader_type > reader, size_t row_start, size_t row_end, size_t buffer_size=DEFAULT_SARRAY_READER_BUFFER_SIZE)
Construct from sarray reader with begin and end row.
const size_t DEFAULT_SARRAY_READER_BUFFER_SIZE
size_t read_rows(size_t row_start, size_t row_end, std::vector< T > &out_obj)
size_t size()
Return the Number of elements between row_start and row_end.
size_t size() const