Turi Create  4.0
sframe_iterators.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_ITERATORS_H_
7 #define TURI_SFRAME_ITERATORS_H_
8 
9 #include <vector>
10 #include <core/storage/sframe_data/sframe.hpp>
11 #include <core/logging/assertions.hpp>
12 
13 namespace turi {
14 
15 class parallel_sframe_iterator;
16 
17 
18 /**
19  * \ingroup sframe_physical
20  * \addtogroup sframe_main Main SFrame Objects
21  * \{
22  */
23 
24 /**
25  * Utlity to provide parallel iteration over an SFrame
26  */
28  public:
29 
30  /** Create an sframe iterator initializer initialized with a single
31  * sframe. The sframe is divided into num_threads blocks of
32  * approximately equal size. This iterator claims the thread_idx
33  * block.
34  *
35  * \param[in] data_sources Collection of SFrames
36  * \param[in] row_start First row to read
37  * \param[in] row_end One past last row to read (i.e. EXCLUSIVE).
38  * Row_end can be beyond the end of the array, in which
39  * case, fewer rows will be read.
40  * Default -1 reads all rows.
41  *
42  */
44  const size_t& _row_start=0,
45  const size_t& _row_end =-1)
46  : parallel_sframe_iterator_initializer(std::vector<sframe>{data}, _row_start, _row_end)
47  {}
48 
49 
50  /** Initialize the sframe iterator with a vector of sframes. Each
51  * sframe is divided into num_threads blocks of approximately equal
52  * size. This iterator claims the thread_idx block.
53  *
54  * With multiple sframes, elements in the current row can be
55  * accessed by it.value(sframe_index, column_index), where
56  * sframe_index refers to the index in data_sources, and
57  * column_index refers to the index of the column within that
58  * sframe.
59  *
60  * \param[in] data_sources Collection of SFrames
61  * \param[in] row_start First row to read
62  * \param[in] row_end One past last row to read (i.e. EXCLUSIVE).
63  * Row_end can be beyond the end of the array, in which
64  * case, fewer rows will be read.
65  * Default -1 reads all rows.
66  */
67  explicit parallel_sframe_iterator_initializer(const std::vector<sframe>& data_sources,
68  const size_t& _row_start=0,
69  const size_t& _row_end =-1);
70 
71  /**
72  * Set the global block to read. This allows us to create the initializer
73  * only once and change the row_start and row_end multiple times.
74  *
75  * \param[in] row_start First row to read
76  * \param[in] row_end One past last row to read (i.e. EXCLUSIVE).
77  * Row_end can be beyond the end of the array, in which
78  * case, fewer rows will be read.
79  * Default -1 reads all rows.
80  */
81  void set_global_block(size_t _row_start=0, size_t _row_end=-1);
82 
83  private:
84 
85  friend class parallel_sframe_iterator;
86  size_t row_start = 0; /**< Row start for global block.*/
87  size_t row_end = -1; /**< Row end of the global block.*/
88 
89  size_t global_block_size; /**< Global block size being read.*/
90  size_t sf_size; /**< SFrame size. */
91 
92  std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > sources;
93  std::vector<size_t> column_offsets;
94 
95 };
96 
97 /**
98  * A simple convienience iterator for doing parallel iteration over
99  * the rows of one or more sframes. It is designed for easy
100  * integration with the in_parallel function.
101  *
102  * This iterator class provides two features:
103  *
104  * 1. The ability to easily and efficiently iterate over multiple
105  * sections of an sframe, divided evenly by thread.
106  *
107  * 2. The ability to easily iterate over multiple sframes of the same
108  * length simultaneously.
109  *
110  *
111  * To use this iterator:
112  *
113  * parallel_sframe_iterator_initializer it_init(data);
114  *
115  * in_parallel([&](size_t thread_idx, size_t num_threads) {
116  * for(parallel_sframe_iterator it(it_init, thread_idx, num_threads); !it.done(); ++it) {
117  * size_t row_idx = it.row_index();
118  * double value_0 = it.value(0);
119  * double value_1 = it.value(1);
120  * ...
121  * }
122  * });
123  *
124  */
126 
127  public:
128 
129  /**
130  * Default empty constructor.
131  */
133  : current_idx(0), start_idx(0), end_idx(0)
134  , block_start_idx(0), block_end_idx(0), max_block_size(0)
135  {}
136 
137 
138  /**
139  * Initialize the sframe iterator with a single sframe. The sframe
140  * is divided into num_threads blocks of approximately equal size.
141  * This iterator claims the thread_idx block.
142  *
143  * \param[in] data SFrame
144  * \param[in] thread_idx Thread id (Ranges from 0-num_threads)
145  * \param[in] num_threads Number of threads in the turi::thread_pool
146  *
147  */
148  explicit parallel_sframe_iterator(sframe data, size_t thread_idx = 0, size_t num_threads = 1)
149  : parallel_sframe_iterator(parallel_sframe_iterator_initializer(data), thread_idx, num_threads)
150  {}
151 
152  /** Initialize the sframe iterator with a vector of sframes. The
153  * sframe is divided into num_threads blocks of approximately equal
154  * size. This iterator claims the thread_idx block.
155  *
156  * \param[in] data SFrame
157  * \param[in] thread_idx Thread id (Ranges from 0-num_threads)
158  * \param[in] num_threads Number of threads in the turi::thread_pool
159  *
160  */
161  explicit parallel_sframe_iterator(const std::vector<sframe>& data, size_t thread_idx = 0, size_t num_threads = 1)
162  : parallel_sframe_iterator(parallel_sframe_iterator_initializer(data), thread_idx, num_threads)
163  {}
164 
165  /**
166  * Initialize the parallel SFrame iterator.
167  *
168  * \param[in] data Parallel sframe initializer
169  * \param[in] thread_idx Thread id (Ranges from 0-num_threads)
170  * \param[in] num_threads Number of threads in the turi::thread_pool
171  */
173  size_t thread_idx, size_t num_threads);
174 
175  /**
176  * Increments the parallel SFrame iterator to the next row.
177  */
179  DASSERT_GE(current_idx, start_idx);
180  DASSERT_LT(current_idx, end_idx);
181 
182  ++current_idx;
183 
184  if(current_idx != end_idx && current_idx == block_end_idx)
185  load_current_block();
186 
187  return *this;
188  }
189 
190  /**
191  * Check if the iterator is done (applies to the global block)
192  * \returns True if the iterator is done.
193  */
194  bool done() const {
195  DASSERT_GE(current_idx, start_idx);
196  DASSERT_LE(current_idx, end_idx);
197  return current_idx == end_idx;
198  }
199 
200  /**
201  * Resets the iterator to the state it had upon initialization.
202  */
203  void reset() {
204  current_idx = start_idx;
205  block_start_idx = start_idx;
206  block_end_idx = start_idx;
207 
208  load_current_block();
209  }
210 
211  /**
212  * Returns the current row index that the iterator is at.
213  */
214  size_t row_index() const {
215  return current_idx;
216  }
217 
218  /**
219  * Fills a vector x with the current row of data, If there are
220  * multiple sframes provided initially, then values from all columns
221  * are concatenated into a single vector of length
222  * data_sources[0].num_columns() + data_sources[1].num_columns() +
223  * ...
224  *
225  * \param[in,out] x Fills the std::vector with the contents of the SFrame
226  */
227  void fill(std::vector<flexible_type>& x) const {
228  DASSERT_GE(current_idx, block_start_idx);
229  DASSERT_LT(current_idx, block_end_idx);
230 
231  x.resize(buffers.size());
232 
233  size_t idx = current_idx - block_start_idx;
234  for(size_t i = 0; i < buffers.size(); ++i)
235  x[i] = buffers[i][idx];
236  }
237 
238  /**
239  * Fills a vector x with the current row of data from
240  * data_sources[idx].
241  *
242  * \param[in] idx SFrame idx to use.
243  * \param[in,out] x Fills the std::vector with the contents of the SFrame
244  */
245  void fill(size_t sframe_idx, std::vector<flexible_type>& x) const {
246  DASSERT_LT(sframe_idx, column_offsets.size() - 1);
247  DASSERT_GE(current_idx, block_start_idx);
248  DASSERT_LT(current_idx, block_end_idx);
249 
250  size_t start_col_idx = column_offsets[sframe_idx];
251  size_t end_col_idx = column_offsets[sframe_idx + 1];
252 
253  x.resize(end_col_idx - start_col_idx);
254 
255  size_t idx = current_idx - block_start_idx;
256  size_t i = 0;
257  for(size_t col_idx = start_col_idx; col_idx < end_col_idx; ++col_idx, ++i)
258  x[i] = buffers[col_idx][idx];
259  }
260 
261  /**
262  * Returns the current value in sframe data_sources[sframe_idx],
263  * column column_idx.
264  *
265  * \param[in] sframe_idx SFrame idx.
266  * \param[in] column_idx Column idx.
267  *
268  * \returns Value corresponding to SFrame[sframe_idx][column_idx]
269  */
270  const flexible_type& value(size_t sframe_idx, size_t column_idx) const {
271  DASSERT_LT(sframe_idx, column_offsets.size() - 1);
272 
273  size_t row_idx = column_offsets[sframe_idx] + column_idx;
274 
275  DASSERT_LT(row_idx, column_offsets[sframe_idx + 1]);
276 
277  DASSERT_GE(current_idx, block_start_idx);
278  DASSERT_LT(current_idx, block_end_idx);
279 
280  return buffers[row_idx][current_idx - block_start_idx];
281  }
282 
283  /**
284  * Returns the current value in column_idx of the first sframe 0.
285  * If multiple sframes are provided at initialization time, then
286  * this indexes the values as if all the columns were concatenated
287  * (in similar fashion to fill(x); )
288  *
289  * \param[in] sframe_idx SFrame idx.
290  *
291  * \returns Value corresponding to SFrame[0][column_idx]
292  */
293  const flexible_type& value(size_t idx) const {
294  DASSERT_LT(idx, buffers.size());
295 
296  DASSERT_GE(current_idx, block_start_idx);
297  DASSERT_LT(current_idx, block_end_idx);
298 
299  return buffers[idx][current_idx - block_start_idx];
300  }
301 
302  /**\overload
303  *
304  * Exactly like value(...), except it returns a move reference to
305  * the current value, invalidating the present one.
306  *
307  * \param[in] sframe_idx SFrame idx.
308  * \param[in] column_idx Column idx.
309  *
310  * \returns Moved value corresponding to SFrame[sframe_idx][column_idx]
311  */
312  flexible_type&& move_value(size_t sframe_idx, size_t column_idx) {
313  DASSERT_LT(sframe_idx, column_offsets.size() - 1);
314 
315  size_t row_idx = column_offsets[sframe_idx] + column_idx;
316 
317  DASSERT_LT(row_idx, column_offsets[sframe_idx + 1]);
318 
319  DASSERT_GE(current_idx, block_start_idx);
320  DASSERT_LT(current_idx, block_end_idx);
321 
322  return std::move(buffers[row_idx][current_idx - block_start_idx]);
323  }
324 
325  /** \overload
326  * Returns a move reference to the current value in column_idx of
327  * the first sframe 0, invalidating that reference. If multiple
328  * sframes are provided at initialization time, then this indexes
329  * the values as if all the columns were concatenated (in similar
330  * fashion to fill(x); )
331  *
332  * \param[in] sframe_idx SFrame idx.
333  *
334  * \returns Value corresponding to SFrame[0][column_idx]
335  */
336  flexible_type&& move_value(size_t idx) {
337  DASSERT_LT(idx, buffers.size());
338 
339  DASSERT_GE(current_idx, block_start_idx);
340  DASSERT_LT(current_idx, block_end_idx);
341 
342  return std::move(buffers[idx][current_idx - block_start_idx]);
343  }
344 
345 
346  private:
347 
348  /**
349  * Loads the current block.
350  */
351  void load_current_block();
352 
353  size_t current_idx; /**< Current id of the iterator.*/
354  size_t start_idx; /**< Row start for global block.*/
355  size_t end_idx; /**< Row end for global block.*/
356 
357  size_t block_start_idx; /**< Row start for current block.*/
358  size_t block_end_idx; /**< Row start for current block.*/
359  size_t max_block_size; /**< Max block size.*/
360 
361  std::vector< std::vector<flexible_type> > buffers;
362  std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > sources;
363  std::vector<size_t> column_offsets;
364 };
365 
366 /// \}
367 }
368 
369 #endif /* TURI_SFRAME_ITERATORS_H_ */
const flexible_type & value(size_t idx) const
void set_global_block(size_t _row_start=0, size_t _row_end=-1)
STL namespace.
const parallel_sframe_iterator & operator++()
flexible_type && move_value(size_t idx)
parallel_sframe_iterator(const std::vector< sframe > &data, size_t thread_idx=0, size_t num_threads=1)
const flexible_type & value(size_t sframe_idx, size_t column_idx) const
flexible_type && move_value(size_t sframe_idx, size_t column_idx)
parallel_sframe_iterator(sframe data, size_t thread_idx=0, size_t num_threads=1)
parallel_sframe_iterator_initializer(sframe data, const size_t &_row_start=0, const size_t &_row_end=-1)
void fill(size_t sframe_idx, std::vector< flexible_type > &x) const
void fill(std::vector< flexible_type > &x) const