6 #ifndef TURI_SFRAME_ITERATORS_H_ 7 #define TURI_SFRAME_ITERATORS_H_ 10 #include <core/storage/sframe_data/sframe.hpp> 11 #include <core/logging/assertions.hpp> 15 class parallel_sframe_iterator;
44 const size_t& _row_start=0,
45 const size_t& _row_end =-1)
68 const size_t& _row_start=0,
69 const size_t& _row_end =-1);
89 size_t global_block_size;
92 std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > sources;
93 std::vector<size_t> column_offsets;
133 : current_idx(0), start_idx(0), end_idx(0)
134 , block_start_idx(0), block_end_idx(0), max_block_size(0)
173 size_t thread_idx,
size_t num_threads);
179 DASSERT_GE(current_idx, start_idx);
180 DASSERT_LT(current_idx, end_idx);
184 if(current_idx != end_idx && current_idx == block_end_idx)
185 load_current_block();
195 DASSERT_GE(current_idx, start_idx);
196 DASSERT_LE(current_idx, end_idx);
197 return current_idx == end_idx;
204 current_idx = start_idx;
205 block_start_idx = start_idx;
206 block_end_idx = start_idx;
208 load_current_block();
227 void fill(std::vector<flexible_type>& x)
const {
228 DASSERT_GE(current_idx, block_start_idx);
229 DASSERT_LT(current_idx, block_end_idx);
231 x.resize(buffers.size());
233 size_t idx = current_idx - block_start_idx;
234 for(
size_t i = 0; i < buffers.size(); ++i)
235 x[i] = buffers[i][idx];
245 void fill(
size_t sframe_idx, std::vector<flexible_type>& x)
const {
246 DASSERT_LT(sframe_idx, column_offsets.size() - 1);
247 DASSERT_GE(current_idx, block_start_idx);
248 DASSERT_LT(current_idx, block_end_idx);
250 size_t start_col_idx = column_offsets[sframe_idx];
251 size_t end_col_idx = column_offsets[sframe_idx + 1];
253 x.resize(end_col_idx - start_col_idx);
255 size_t idx = current_idx - block_start_idx;
257 for(
size_t col_idx = start_col_idx; col_idx < end_col_idx; ++col_idx, ++i)
258 x[i] = buffers[col_idx][idx];
271 DASSERT_LT(sframe_idx, column_offsets.size() - 1);
273 size_t row_idx = column_offsets[sframe_idx] + column_idx;
275 DASSERT_LT(row_idx, column_offsets[sframe_idx + 1]);
277 DASSERT_GE(current_idx, block_start_idx);
278 DASSERT_LT(current_idx, block_end_idx);
280 return buffers[row_idx][current_idx - block_start_idx];
294 DASSERT_LT(idx, buffers.size());
296 DASSERT_GE(current_idx, block_start_idx);
297 DASSERT_LT(current_idx, block_end_idx);
299 return buffers[idx][current_idx - block_start_idx];
313 DASSERT_LT(sframe_idx, column_offsets.size() - 1);
315 size_t row_idx = column_offsets[sframe_idx] + column_idx;
317 DASSERT_LT(row_idx, column_offsets[sframe_idx + 1]);
319 DASSERT_GE(current_idx, block_start_idx);
320 DASSERT_LT(current_idx, block_end_idx);
322 return std::move(buffers[row_idx][current_idx - block_start_idx]);
337 DASSERT_LT(idx, buffers.size());
339 DASSERT_GE(current_idx, block_start_idx);
340 DASSERT_LT(current_idx, block_end_idx);
342 return std::move(buffers[idx][current_idx - block_start_idx]);
351 void load_current_block();
357 size_t block_start_idx;
358 size_t block_end_idx;
359 size_t max_block_size;
361 std::vector< std::vector<flexible_type> > buffers;
362 std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > sources;
363 std::vector<size_t> column_offsets;
const flexible_type & value(size_t idx) const
void set_global_block(size_t _row_start=0, size_t _row_end=-1)
parallel_sframe_iterator()
const parallel_sframe_iterator & operator++()
flexible_type && move_value(size_t idx)
parallel_sframe_iterator(const std::vector< sframe > &data, size_t thread_idx=0, size_t num_threads=1)
const flexible_type & value(size_t sframe_idx, size_t column_idx) const
flexible_type && move_value(size_t sframe_idx, size_t column_idx)
parallel_sframe_iterator(sframe data, size_t thread_idx=0, size_t num_threads=1)
parallel_sframe_iterator_initializer(sframe data, const size_t &_row_start=0, const size_t &_row_end=-1)
void fill(size_t sframe_idx, std::vector< flexible_type > &x) const
void fill(std::vector< flexible_type > &x) const