Turi Create  4.0
sarray_iterators.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #include <core/storage/sframe_data/sarray.hpp>
7 #include <iostream>
8 
9 namespace turi {
10 
11 
12 /**
13  * \ingroup sframe_physical
14  * \addtogroup sframe_main Main SFrame Objects
15  * \{
16  */
17 
18 /**
19  *
20  * Class for fast, sequential iteration through an SArray. Avoids copies
21  * and buffering, and the size of each chunk of data is given by a single
22  * block.
23  *
24  * Example:
25  *
26  * auto it = make_sarray_block_iterator(data);
27  *
28  * in_parallel([&](size_t thread_idx, size_t num_threads) {
29  *
30  * std::vector<T> v;
31  *
32  * while(true) {
33  *
34  * size_t row_start;
35  *
36  * if(it.read_next(&row_start, &v)) {
37  * break;
38  * }
39  *
40  * //...
41  * // v contains elements row_start + 0, row_start + 1, ..., row_start + (v.size()-1)
42  * //...
43  * }
44  * });
45  *
46  */
47 template <typename DataType>
49  public:
50 
51  sarray_block_iterator(const std::shared_ptr<sarray<DataType> >& _data)
52  : data(_data)
54  , num_segments(data->get_index_info().segment_files.size())
55  {
56  initialize();
57  }
58 
60  }
61 
62  /** True if we're done.
63  */
64  inline bool done() const {
65  return _is_done;
66  }
67 
68  /** Reads the next block of data, writing the row number of the
69  * starting block to row_number_start, and all the data within that
70  * block to the vector pointed to by read_data. After this call,
71  * read_data contains elements row_number_start + 0,
72  * row_number_start + 1, ..., row_number_start + (read_data.size()-1)
73  *
74  * Returns true if all data has been read. In this case,
75  * row_number_start is set to the number of elements in the sarray.
76  * Otherwise, returns false.
77  *
78  */
79  inline bool read_next(size_t *row_number_start, std::vector<DataType>* read_data) {
80 
81  // The information of what we're currently reading. Keep the
82  // segment alive here in read_segment so it doesn't get closed by
83  // another thread before this one gets done actually reading it.
84  v2_block_impl::block_address read_block_address;
85  std::shared_ptr<segment> read_segment;
86 
87  size_t n_elem = 0;
88 
89  // Check once here for a CTRL-C interupt as the following
90  // interface does not naturally do that.
91  if(cppipc::must_cancel()) {
92  log_and_throw("Canceled by user.");
93  }
94 
95  {
96  std::lock_guard<simple_spinlock> lg(block_read_lock);
97 
98  if(_is_done) {
99  *row_number_start = data->size();
100  return true;
101  }
102 
103  // Pull this into what we are currently
104  read_segment = segment_of_next_block;
105  DASSERT_TRUE(read_segment != nullptr);
106 
107  DASSERT_LT(block_number_of_next_block, read_segment->num_blocks());
108 
109  // First, set the current block address and block rows,
110  // then advance the counter to the next level.
111  read_block_address
112  = v2_block_impl::block_address{std::get<0>(read_segment->address()),
113  std::get<1>(read_segment->address()),
114  block_number_of_next_block};
115 
116  n_elem = block_manager.get_block_info(read_block_address).num_elem;
117  *row_number_start = row_start_idx_of_next_block;
118  row_start_idx_of_next_block += n_elem;
119 
120  // Now, advance to the next block or segment.
121  _is_done = load_next_block(true);
122  } // Release lock.
123 
124  // Now, read the current block
125  block_manager.read_block(read_block_address, *read_data);
126  DASSERT_EQ(read_data->size(), n_elem);
127 
128  // Check once once for a CTRL-C interupt.
129  if(cppipc::must_cancel()) {
130  log_and_throw("Canceled by user.");
131  }
132 
133  // We are done.
134  return false;
135  }
136 
137  private:
138 
139  // The original sarray.
140  std::shared_ptr<sarray<DataType> > data;
141 
142  // Global block manager.
143  v2_block_impl::block_manager& block_manager;
144 
145  // Number of segments.
146  const size_t num_segments = 0;
147 
148  bool _is_done = false;
149 
150  simple_spinlock block_read_lock;
151 
152  size_t segment_index_of_next_block = 0;
153  size_t block_number_of_next_block = 0;
154  size_t row_start_idx_of_next_block = 0;
155 
156  // The segment information. Kept as a shared_ptr to a struct so
157  // that things don't get closed when out of line.
158  struct segment {
159  public:
160  // Constructor -- opens the segment.
161  segment(const std::shared_ptr<sarray<DataType> >& data, size_t segment_index)
163  {
164 
165  const auto& column_index = data->get_index_info();
166  DASSERT_LT(segment_index, column_index.segment_files.size());
167 
168  const auto& segment_file = column_index.segment_files[segment_index];
169 
170  // Open the next segment.
171  segment_address = block_manager.open_column(segment_file);
172  _num_blocks = block_manager.num_blocks_in_column(segment_address);
173  }
174 
175  // Destructor -- important -- closes the segment.
176  ~segment() {
178  }
179 
180  // Address of the segment.
181  const v2_block_impl::column_address& address() const {
182  return segment_address;
183  }
184 
185  size_t num_blocks() const {
186  return _num_blocks;
187  }
188 
189  private:
190  index_file_information column_index;
191  v2_block_impl::column_address segment_address;
192  v2_block_impl::block_manager& block_manager;
193 
194  size_t _num_blocks = 0;
195  };
196 
197  std::shared_ptr<segment> segment_of_next_block;
198 
199  void initialize() {
200 
201  if(data->size() == 0) {
202  _is_done = true;
203  return;
204  }
205 
206  DASSERT_NE(num_segments, 0);
207  segment_index_of_next_block = 0;
208  block_number_of_next_block = 0;
209  row_start_idx_of_next_block = 0;
210 
211  segment_of_next_block.reset(new segment(data, 0));
212 
213  // With no advancement, this just makes sure the first segment has
214  // one or more blocks.
215  load_next_block(false);
216  }
217 
218  bool load_next_block(bool advance_from_current_position) {
219 
220  if(advance_from_current_position) {
221  ++block_number_of_next_block;
222  }
223 
224  while(block_number_of_next_block == segment_of_next_block->num_blocks()) {
225  ++segment_index_of_next_block;
226 
227  // If we are done,
228  if(segment_index_of_next_block == num_segments) {
229  return true;
230  } else {
231  segment_of_next_block.reset(new segment(data, segment_index_of_next_block));
232  block_number_of_next_block = 0;
233  }
234  }
235 
236  return false;
237  }
238 };
239 
240 /** Creates a sarray block iterator; convenience function using
241  * automatic template matching.
242  *
243  */
244 template <typename T>
246  return sarray_block_iterator<T>(data);
247 }
248 
249 /// \}
250 }
std::tuple< size_t, size_t, size_t > block_address
std::tuple< size_t, size_t > column_address
static block_manager & get_instance()
Get singleton instance.
column_address open_column(std::string column_file)
bool read_next(size_t *row_number_start, std::vector< DataType > *read_data)
size_t num_blocks_in_column(column_address addr)
std::shared_ptr< std::vector< char > > read_block(block_address addr, block_info **ret_info=NULL)
sarray_block_iterator< T > make_sarray_block_iterator(const std::shared_ptr< sarray< T > > &data)
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
const block_info & get_block_info(block_address addr)
void close_column(column_address addr)