6 #ifndef TURI_UNITY_SFRAME_SARRAY_READER_HPP 7 #define TURI_UNITY_SFRAME_SARRAY_READER_HPP 10 #include <type_traits> 12 #include <core/logging/assertions.hpp> 13 #include <core/storage/serialization/iarchive.hpp> 14 #include <core/parallel/pthread_tools.hpp> 15 #include <core/storage/sframe_data/siterable.hpp> 16 #include <core/data/flexible_type/flexible_type.hpp> 17 #include <core/storage/sframe_data/sarray_file_format_v2.hpp> 18 #include <core/storage/sframe_data/sframe_constants.hpp> 19 #include <core/storage/fileio/file_ownership_handle.hpp> 20 #include <core/storage/sframe_data/sarray_reader_buffer.hpp> 46 typedef int difference_type;
49 typedef std::input_iterator_tag iterator_category;
56 :reader(NULL), segmentid(0),
57 current_element(T()), current_element_counter(0) {}
76 bool is_start_iterator)
77 :reader(reader), segmentid(segmentid),
78 current_element(T()), current_element_counter(0) {
79 num_elements = reader->
size();
80 if (is_start_iterator) {
81 if (num_elements > 0) {
82 current_element = reader->
next();
84 current_element_counter = 0;
87 current_element_counter = num_elements;
95 if (reader->has_next()) {
96 current_element = reader->next();
97 ++current_element_counter;
99 current_element_counter = num_elements;
118 return (reader == other.reader) &&
119 (segmentid == other.segmentid) &&
120 (current_element_counter == other.current_element_counter);
128 return !((*this) == other);
136 return current_element;
143 return current_element;
151 return (
int)(current_element_counter) - (
int)(other.current_element_counter);
162 size_t current_element_counter;
188 template <
typename T>
212 if (reader)
delete reader;
228 ASSERT_MSG(!reader,
"Reader already inited");
229 open_format_reader(array);
231 std::vector<std::pair<size_t, size_t> > segment_row_start_end;
232 if (num_segments == (
size_t)(-1)) {
236 size_t current_row_index = 0;
238 for (
size_t i = 0;i < index_info.nsegments; ++i) {
239 size_t row_start = current_row_index;
240 size_t row_end = row_start + index_info.segment_sizes[i];
241 segment_row_start_end.push_back({row_start, row_end});
242 current_row_index = row_end;
247 ASSERT_GT(num_segments, 0);
248 size_t totallength = size();
250 for (
size_t i = 0;i < num_segments; ++i) {
251 size_t row_start = ((i * totallength) / num_segments);
252 size_t row_end = (((i + 1) * totallength) / num_segments);
253 segment_row_start_end.push_back({row_start, row_end});
256 create_segment_read_buffers(segment_row_start_end);
257 files_managed = array.files_managed;
271 void init(
const sarray<T>& array,
const std::vector<size_t>& segment_lengths) {
272 ASSERT_MSG(!reader,
"Reader already inited");
273 open_format_reader(array);
276 for(
size_t s: segment_lengths) sum += s;
277 ASSERT_EQ(sum, size());
280 std::vector<std::pair<size_t, size_t> > segment_row_start_end;
283 size_t current_row_index = 0;
285 for (
size_t i = 0;i < segment_lengths.size(); ++i) {
286 size_t row_start = current_row_index;
287 size_t row_end = row_start + segment_lengths[i];
288 segment_row_start_end.push_back({row_start, row_end});
289 current_row_index = row_end;
291 create_segment_read_buffers(segment_row_start_end);
300 ASSERT_NE(reader, NULL);
301 return m_num_segments;
310 ASSERT_NE(reader, NULL);
311 return m_segment_lengths[segment];
318 ASSERT_NE(reader, NULL);
319 return reader->get_index_file();
328 ASSERT_NE(reader, NULL);
329 return reader->get_index_info().segment_files;
337 ASSERT_NE(reader, NULL);
338 if (reader == NULL) log_and_throw(std::string(
"Invalid sarray"));
340 std::tie(ret, val) = get_metadata(key);
350 ASSERT_NE(reader, NULL);
352 if (index_info.
metadata.count(key)) {
353 return std::pair<bool, std::string>(
true,
356 return std::pair<bool, std::string>(
false,
"");
364 ASSERT_NE(reader, NULL);
396 iterator
begin(
size_t segmentid)
const {
397 std::lock_guard<mutex> lck(lock);
398 if (opened_segments.count(segmentid) == 0) {
399 opened_segments.insert(segmentid);
401 log_and_throw(std::string(
"Must reset sarray iterators!"));
403 if (reader == NULL) log_and_throw(std::string(
"Invalid sarray"));
404 if (segmentid >= num_segments()) log_and_throw(std::string(
"Invalid segment ID"));
405 return iterator(&(m_read_buffers[segmentid]), segmentid,
true);
429 iterator
end(
size_t segmentid)
const {
430 ASSERT_NE(reader, NULL);
431 ASSERT_LT(segmentid, num_segments());
432 return iterator(&(m_read_buffers[segmentid]), segmentid,
false);
453 std::vector<T>& out_obj) {
454 DASSERT_NE(reader, NULL);
455 return reader->read_rows(row_start, row_end, out_obj);
473 size_t read_rows(
size_t row_start,
482 std::lock_guard<mutex> lck(lock);
483 opened_segments.clear();
484 for (
auto& buf: m_read_buffers) buf.clear();
498 if (!std::is_same<T, flexible_type>::value) {
500 "Use of get_type() in SArray which " 501 "does not contain flexible_types");
503 ASSERT_NE(reader, NULL);
505 if (index_info.
metadata.count(
"__type__")) {
513 mutable sarray_format_reader<T>* reader = NULL;
515 size_t m_num_segments = 0;
516 mutable std::set<size_t> opened_segments;
517 std::vector<size_t> m_segment_lengths;
520 std::vector<std::shared_ptr<fileio::file_ownership_handle>> files_managed;
522 mutable std::vector<sarray_reader_buffer<T> > m_read_buffers;
528 void open_format_reader(
const sarray<T>& array) {
533 switch(file_version) {
535 log_and_throw(
"Format version 0 deprecated");
538 log_and_throw(
"Format version 1 deprecated");
546 log_and_throw(
"Invalid file format version");
556 void create_segment_read_buffers(
557 const std::vector<std::pair<size_t, size_t> >& segment_row_start_end) {
559 m_num_segments = segment_row_start_end.size();
560 m_segment_lengths.resize(m_num_segments);
561 m_read_buffers.resize(m_num_segments);
563 for (
size_t i = 0;i < m_segment_lengths.size(); ++i) {
564 m_segment_lengths[i] =
565 segment_row_start_end[i].second - segment_row_start_end[i].first;
566 m_read_buffers[i].init(
this,
567 segment_row_start_end[i].first,
568 segment_row_start_end[i].second);
573 template <
typename T>
577 ASSERT_MSG(
false,
"read_rows() to sframe_rows not implemented for " 578 "non-flexible_type templatizations of sarray");
587 DASSERT_NE(reader, NULL);
588 return reader->read_rows(row_start, row_end, out_obj);
598 template <
typename T>
iterator begin(size_t segmentid) const
sarray_iterator< T > iterator
The iterator type which begin and end returns.
void init(const sarray< T > &array, const std::vector< size_t > &segment_lengths)
int operator-(const sarray_iterator &other) const
bool operator==(const sarray_iterator &other) const
std::pair< bool, std::string > get_metadata(std::string key) const
const value_type & operator*() const
value_type && next()
Return the next element in the reader.
sarray_iterator & operator++()
const index_file_information get_index_info() const
size_t read_rows(size_t row_start, size_t row_end, std::vector< T > &out_obj)
sarray_iterator(sarray_reader_buffer< T > *reader, size_t segmentid, bool is_start_iterator)
std::string get_index_file() const
std::vector< std::string > get_file_names() const
flex_type_enum get_type() const
size_t segment_length(size_t segment) const
size_t num_segments() const
bool get_metadata(std::string key, std::string &val) const
size_t size()
Return the Number of elements between row_start and row_end.
void init(const sarray< T > &array, size_t num_segments=(size_t)(-1))
iterator::value_type value_type
The value type the sarray stores.
sarray_iterator & operator=(const sarray_iterator &other)=default
Default assignment operator.
iterator end(size_t segmentid) const
bool operator!=(const sarray_iterator &other) const