6 #ifndef TURI_SFRAME_SARRAY_FILE_FORMAT_V2_HPP 7 #define TURI_SFRAME_SARRAY_FILE_FORMAT_V2_HPP 12 #include <core/parallel/mutex.hpp> 13 #include <boost/algorithm/string/predicate.hpp> 15 #include <core/random/random.hpp> 16 #include <core/util/dense_bitset.hpp> 17 #include <core/storage/sframe_data/sarray_file_format_interface.hpp> 18 #include <core/storage/sframe_data/sarray_index_file.hpp> 19 #include <core/storage/fileio/general_fstream.hpp> 20 #include <core/storage/fileio/temp_files.hpp> 21 #include <core/storage/serialization/serialization_includes.hpp> 22 #include <core/storage/sframe_data/sframe_constants.hpp> 23 #include <core/storage/sframe_data/sarray_v2_block_manager.hpp> 24 #include <core/storage/sframe_data/sarray_v2_block_writer.hpp> 25 #include <core/storage/sframe_data/sarray_v2_encoded_block.hpp> 26 #include <core/storage/sframe_data/sarray_v2_type_encoding.hpp> 27 #include <core/system/cppipc/server/cancel_ops.hpp> 48 m_manager(v2_block_impl::block_manager::get_instance()) {
77 m_segment_list.clear();
83 m_segment_list.push_back(columnaddr);
86 size_t segment_id, column_id;
87 std::tie(segment_id, column_id) = columnaddr;
89 const std::vector<std::vector<v2_block_impl::block_info>>& segment_blocks = m_manager.
get_all_block_info(segment_id);
91 for (
size_t j = 0; j < nblocks; ++j) {
92 block_address blockaddr{std::get<0>(columnaddr), std::get<1>(columnaddr), j};
93 m_start_row.push_back(row_count);
94 row_count += segment_blocks[column_id][j].num_elem;
95 m_block_list.push_back(blockaddr);
98 for (
auto& ssize: m_index_info.
segment_sizes) m_num_rows += ssize;
100 m_cache.resize(m_block_list.size());
101 m_used_cache_entries.
resize(m_block_list.size());
102 m_used_cache_entries.
clear();
105 m_start_row.push_back(m_num_rows);
106 ASSERT_EQ(m_num_rows, row_count);
112 void open(std::string sidx_file) {
120 for (
auto column: m_segment_list) {
123 m_segment_list.clear();
138 DASSERT_LT(segmentid, m_index_info.
nsegments);
156 size_t read_rows(
size_t row_start,
178 std::vector<T>& out_obj) {
179 if (row_end > m_num_rows) row_end = m_num_rows;
180 if (row_start >= row_end) {
184 out_obj.resize(row_end - row_start);
185 fetch_rows_from_cache(row_start, row_end, out_obj);
187 if(cppipc::must_cancel()) {
188 throw(std::string(
"Cancelled by user."));
190 return out_obj.size();
205 std::vector<block_address> m_block_list;
206 std::vector<size_t> m_start_row;
207 std::vector<column_address> m_segment_list;
235 cache_entry() =
default;
236 cache_entry(
const cache_entry& other) =
default;
238 cache_entry(cache_entry&& other) {
239 buffer_start_row = std::move(other.buffer_start_row);
240 is_encoded = std::move(other.is_encoded);
241 buffer = std::move(other.buffer);
242 encoded_buffer = std::move(other.encoded_buffer);
243 encoded_buffer_reader = std::move(other.encoded_buffer_reader);
246 cache_entry&
operator=(
const cache_entry& other) =
default;
248 cache_entry&
operator=(cache_entry&& other) {
249 buffer_start_row = std::move(other.buffer_start_row);
250 is_encoded = std::move(other.is_encoded);
251 buffer = std::move(other.buffer);
252 encoded_buffer = std::move(other.encoded_buffer);
253 encoded_buffer_reader = std::move(other.encoded_buffer_reader);
257 size_t buffer_start_row = 0;
259 bool is_encoded =
false;
260 bool has_data =
false;
262 std::shared_ptr<std::vector<T> > buffer;
274 atomic<size_t> m_cache_size;
278 std::vector<cache_entry> m_cache;
287 void fetch_rows_from_cache(
size_t fetch_start,
289 std::vector<T>& out_obj);
291 void ensure_cache_decoded(cache_entry& cache,
size_t block_number);
298 void release_cache(
size_t block_number) {
300 if (m_cache[block_number].has_data) {
302 m_buffer_pool.
release_buffer(std::move(m_cache[block_number].buffer));
303 m_cache[block_number].buffer.reset();
304 m_cache[block_number].encoded_buffer.release();
305 m_cache[block_number].encoded_buffer_reader.release();
306 m_cache[block_number].has_data =
false;
307 m_used_cache_entries.
clear_bit(block_number);
316 void try_evict_something_from_cache() {
317 size_t b = turi::random::fast_uniform<size_t>(0, m_cache.size() - 1);
323 if (!m_used_cache_entries.
get(b) &&
324 !m_used_cache_entries.
next_bit(b)) {
327 if (!m_used_cache_entries.
get(b)) m_used_cache_entries.
next_bit(b);
329 std::unique_lock<turi::simple_spinlock> cache_lock_guard(m_cache[b].lock, std::defer_lock);
330 if (cache_lock_guard.try_lock()) {
335 void fetch_cache_from_file(
size_t block_number, cache_entry& ret);
337 size_t block_offset_containing_row(
size_t row) {
338 auto pos = std::lower_bound(m_start_row.begin(), m_start_row.end(), row);
339 size_t blocknum = std::distance(m_start_row.begin(), pos);
341 if (blocknum < m_block_list.size()) {
343 if (m_start_row[blocknum] == row) {
350 return m_block_list.size() - 1;
355 template <
typename T>
368 m_buffer_pool.release_buffer(std::move(ret.buffer));
371 block_address block_addr = m_block_list[block_number];
373 auto buffer = m_manager.read_block(block_addr, &info);
374 if (buffer ==
nullptr) {
375 log_and_throw(
"Unexpected block read failure. Bad file?");
377 ret.buffer_start_row = m_start_row[block_number];
378 ret.encoded_buffer.init(*info, buffer);
379 ret.encoded_buffer_reader = ret.encoded_buffer.get_range();
380 ret.is_encoded =
true;
382 if (m_used_cache_entries.get(block_number) ==
false) m_cache_size.inc();
383 m_used_cache_entries.set_bit(block_number);
386 size_t num_to_evict = m_cache_size.value -
388 while(num_to_evict > 0 &&
390 try_evict_something_from_cache();
395 template <
typename T>
400 if (!ret.buffer) ret.buffer = m_buffer_pool.get_new_buffer();
401 block_address block_addr = m_block_list[block_number];
402 if (!m_manager.read_block(block_addr, *ret.buffer, NULL)) {
403 log_and_throw(
"Unexpected block read failure. Bad file?");
405 ret.buffer_start_row = m_start_row[block_number];
406 ret.is_encoded =
false;
408 if (m_used_cache_entries.get(block_number) ==
false) m_cache_size.inc();
409 m_used_cache_entries.set_bit(block_number);
413 while(num_to_evict > 0 &&
415 try_evict_something_from_cache();
424 if (cache.is_encoded) {
425 cache.buffer = m_buffer_pool.get_new_buffer();
426 auto data = cache.encoded_buffer.get_block_data();
432 cache.encoded_buffer.release();
433 cache.encoded_buffer_reader.release();
435 cache.is_encoded =
false;
436 cache.buffer_start_row = m_start_row[block_number];
441 template <
typename T>
444 ASSERT_MSG(
false,
"Attempting to type decode a non-flexible_type column");
454 std::vector<flexible_type>& out_obj) {
456 size_t start_offset = block_offset_containing_row(fetch_start);
457 size_t end_offset = block_offset_containing_row(fetch_end - 1) + 1;
458 size_t output_idx = 0;
459 for (
size_t i = start_offset; i < end_offset; ++i) {
460 size_t first_row_to_fetch_in_this_block = std::max(fetch_start, m_start_row[i]);
461 size_t last_row_to_fetch_in_this_block = std::min(fetch_end, m_start_row[i+1]);
462 auto& cache = m_cache[i];
463 std::unique_lock<turi::simple_spinlock> cache_lock_guard(cache.lock);
464 if (!cache.has_data) {
465 fetch_cache_from_file(i, cache);
467 if (cache.buffer_start_row < first_row_to_fetch_in_this_block && cache.is_encoded) {
469 size_t diff = first_row_to_fetch_in_this_block - cache.buffer_start_row;
470 cache.encoded_buffer_reader.skip(diff);
471 cache.buffer_start_row = first_row_to_fetch_in_this_block;
474 if (cache.buffer_start_row == first_row_to_fetch_in_this_block) {
477 if (cache.is_encoded) {
478 size_t num_elem = last_row_to_fetch_in_this_block - first_row_to_fetch_in_this_block;
479 cache.encoded_buffer_reader.decode_to(&(out_obj[output_idx]), num_elem);
480 output_idx += num_elem;
481 cache.buffer_start_row = last_row_to_fetch_in_this_block;
483 size_t input_offset = m_start_row[i];
484 for (
size_t j = first_row_to_fetch_in_this_block;
485 j < last_row_to_fetch_in_this_block;
487 out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
490 if (last_row_to_fetch_in_this_block == m_start_row[i + 1]) {
497 ensure_cache_decoded(cache, i);
498 size_t input_offset = m_start_row[i];
499 for (
size_t j = first_row_to_fetch_in_this_block;
500 j < last_row_to_fetch_in_this_block;
502 out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
508 template <
typename T>
512 std::vector<T>& out_obj) {
515 size_t start_offset = block_offset_containing_row(fetch_start);
516 size_t end_offset = block_offset_containing_row(fetch_end - 1) + 1;
517 size_t output_idx = 0;
518 for (
size_t i = start_offset; i < end_offset; ++i) {
519 size_t first_row_to_fetch_in_this_block = std::max(fetch_start, m_start_row[i]);
520 size_t last_row_to_fetch_in_this_block = std::min(fetch_end, m_start_row[i+1]);
521 auto& cache = m_cache[i];
522 std::unique_lock<turi::simple_spinlock> cache_lock_guard(cache.lock);
524 cache.buffer_start_row > first_row_to_fetch_in_this_block) {
526 fetch_cache_from_file(i, cache);
528 if (cache.buffer_start_row == first_row_to_fetch_in_this_block) {
531 size_t input_offset = m_start_row[i];
532 for (
size_t j = first_row_to_fetch_in_this_block;
533 j < last_row_to_fetch_in_this_block;
535 out_obj[output_idx++] = std::move((*cache.buffer)[j - input_offset]);
537 cache.buffer_start_row = last_row_to_fetch_in_this_block;
538 if (last_row_to_fetch_in_this_block == m_start_row[i + 1]) {
545 size_t input_offset = m_start_row[i];
546 for (
size_t j = first_row_to_fetch_in_this_block;
547 j < last_row_to_fetch_in_this_block;
549 out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
560 return sarray_format_reader<flexible_type>::read_rows(row_start, row_end, out_obj);
564 template <
typename T>
569 ASSERT_MSG(
false,
"Attempting to type decode a non-flexible_type column");
577 template <
typename T>
584 void open(std::string index_file,
585 size_t segments_to_create,
586 size_t columns_to_create) {
587 if (columns_to_create == 0) {
588 segments_to_create = 0;
591 m_writer.init(index_file, segments_to_create, columns_to_create);
592 m_nsegments = segments_to_create;
593 m_column_buffers.resize(columns_to_create);
594 for (
size_t i = 0; i < columns_to_create; ++i) {
595 m_column_buffers[i].segment_data.resize(segments_to_create);
597 for (
size_t i = 0; i < m_nsegments; ++i) {
606 m_writer.set_options(option, value);
614 return m_writer.get_index_info();
621 const std::vector<T>& v) {
622 DASSERT_LT(segmentid, m_nsegments);
623 DASSERT_LE(v.size(), m_column_buffers.size());
624 DASSERT_EQ(m_array_open,
true);
625 for (
size_t i = 0;i < v.size(); ++i) {
626 write_segment(i, segmentid, v[i]);
634 std::vector<T>&& v) {
635 DASSERT_LT(segmentid, m_nsegments);
636 DASSERT_LE(v.size(), m_column_buffers.size());
637 DASSERT_EQ(m_array_open,
true);
638 for (
size_t i = 0;i < v.size(); ++i) {
639 write_segment(i, segmentid, std::forward<T>(v[i]));
649 DASSERT_LT(segmentid, m_nsegments);
650 DASSERT_LT(columnid, m_column_buffers.size());
651 DASSERT_EQ(m_array_open,
true);
652 m_column_buffers[columnid].segment_data[segmentid].push_back(t);
653 if (m_column_buffers[columnid].segment_data[segmentid].size() >=
654 m_column_buffers[columnid].elements_before_flush) {
655 flush_block(columnid, segmentid);
664 const std::vector<T>& t) {
665 DASSERT_LT(segmentid, m_nsegments);
666 DASSERT_LT(columnid, m_column_buffers.size());
667 DASSERT_EQ(m_array_open,
true);
669 auto& el_before_flush = m_column_buffers[columnid].elements_before_flush;
670 auto& buffer = m_column_buffers[columnid].segment_data[segmentid];
671 for(
const auto& elem: t) {
672 buffer.push_back(elem);
673 if (buffer.size() >= el_before_flush) {
674 flush_block(columnid, segmentid);
684 std::vector<T>&& t) {
685 DASSERT_LT(segmentid, m_nsegments);
686 DASSERT_LT(columnid, m_column_buffers.size());
687 DASSERT_EQ(m_array_open,
true);
689 auto el_before_flush = m_column_buffers[columnid].elements_before_flush;
690 auto& buffer = m_column_buffers[columnid].segment_data[segmentid];
691 for(
const auto& elem: t) {
692 buffer.push_back(std::move(elem));
693 if (buffer.size() >= el_before_flush) {
694 flush_block(columnid, segmentid);
695 el_before_flush = m_column_buffers[columnid].elements_before_flush;
706 DASSERT_LT(segmentid, m_nsegments);
707 DASSERT_LT(columnid, m_column_buffers.size());
708 DASSERT_EQ(m_array_open,
true);
709 m_column_buffers[columnid].segment_data[segmentid].push_back(std::forward<T>(t));
710 if (m_column_buffers[columnid].segment_data[segmentid].size() >=
711 m_column_buffers[columnid].elements_before_flush) {
712 flush_block(columnid, segmentid);
716 void write_segment(
size_t segmentid,
const sframe_rows& rows);
721 ASSERT_EQ(m_array_open,
true);
723 m_array_open =
false;
724 for (
size_t i = 0;i < m_nsegments; ++i) {
725 for (
size_t j = 0;j < m_column_buffers.size(); ++j) {
728 m_writer.close_segment(i);
743 for (
size_t j = 0;j < m_column_buffers.size(); ++j) {
744 flush_block(j, segmentid);
745 m_column_buffers[j].segment_data[segmentid].shrink_to_fit();
753 m_writer.write_index_file();
760 ASSERT_EQ(m_array_open,
true);
768 ASSERT_EQ(m_array_open,
true);
769 return m_column_buffers.size();
774 bool m_array_open =
false;
780 struct column_buffer {
784 std::vector<std::vector<T> > segment_data;
786 size_t total_bytes_written = 0;
787 size_t total_elements_written = 0;
790 std::vector<column_buffer> m_column_buffers;
796 void open_segment(
size_t segmentid) {
798 ASSERT_MSG(m_array_open,
"sarray not open");
801 std::string filename;
805 std::stringstream strm;
806 strm << index_file.substr(0, index_file.length() - 5) <<
".";
807 strm.fill(
'0'); strm.width(4);
809 filename = strm.str();
811 <<
" for write on " << filename << std::endl;
818 void flush_block(
size_t columnid,
size_t segmentid);
827 auto& colbuf = m_column_buffers[columnid];
828 if (colbuf.segment_data[segmentid].empty())
return;
829 size_t write_size = colbuf.segment_data[segmentid].size();
830 size_t ret = m_writer.write_typed_block(segmentid,
832 colbuf.segment_data[segmentid],
834 colbuf.segment_data[segmentid].clear();
837 std::lock_guard<simple_spinlock> guard(colbuf.lock);
838 colbuf.total_bytes_written += ret;
839 colbuf.total_elements_written += write_size;
841 (float)(colbuf.total_bytes_written+1) / (float)(colbuf.total_elements_written+1));
842 colbuf.elements_before_flush = std::max(colbuf.elements_before_flush,
844 colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
846 colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
850 template <
typename T>
855 auto& colbuf = m_column_buffers[columnid];
856 if (colbuf.segment_data[segmentid].empty())
return;
857 size_t write_size = colbuf.segment_data[segmentid].size();
858 size_t ret = m_writer.write_block(segmentid,
860 colbuf.segment_data[segmentid],
862 colbuf.segment_data[segmentid].clear();
866 std::lock_guard<simple_spinlock> guard(colbuf.lock);
867 colbuf.total_bytes_written += ret;
868 colbuf.total_elements_written += write_size;
870 (float)(colbuf.total_bytes_written+1) / (float)(colbuf.total_elements_written+1));
871 colbuf.elements_before_flush = std::max(colbuf.elements_before_flush,
873 colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
875 colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
883 DASSERT_LT(segmentid, m_nsegments);
884 DASSERT_EQ(m_array_open,
true);
886 DASSERT_EQ(rows.
num_columns(), m_column_buffers.size());
889 for (
size_t i = 0;i < m_column_buffers.size(); ++i) {
890 auto& buffer = m_column_buffers[i].segment_data[segmentid];
891 std::copy(cols[i]->begin(), cols[i]->end(), std::back_inserter(buffer));
892 if (m_column_buffers[i].segment_data[segmentid].size() >=
893 m_column_buffers[i].elements_before_flush) {
894 flush_block(i, segmentid);
899 template <
typename T>
902 ASSERT_MSG(
false,
"Cannot write to general SArray with sframe_rows");
size_t SFRAME_DEFAULT_BLOCK_SIZE
void release_buffer(std::shared_ptr< T > &&buffer)
std::tuple< size_t, size_t, size_t > block_address
std::tuple< size_t, size_t > column_address
size_t num_columns() const
Returns the number of columns.
group_index_file_information & get_index_info()
const std::vector< ptr_to_decoded_column_type > & cget_columns() const
size_t SFRAME_MAX_BLOCKS_IN_CACHE
size_t SFRAME_WRITER_MAX_BUFFERED_CELLS
bool get(size_t b) const
Returns the value of the bit b.
column_address open_column(std::string column_file)
const size_t SARRAY_WRITER_MIN_ELEMENTS_PER_BLOCK
const std::vector< std::vector< block_info > > & get_all_block_info(size_t segment_id)
void copy(const std::string src, const std::string dest)
void clear()
Sets all bits to 0.
size_t num_blocks_in_column(column_address addr)
size_t SFRAME_WRITER_MAX_BUFFERED_CELLS_PER_BLOCK
const size_t SARRAY_WRITER_INITAL_ELEMENTS_PER_BLOCK
bool next_bit(size_t &b) const
index_file_information read_index_file(std::string index_file)
void open_segment(size_t segment_id, std::string filename)
bool clear_bit(size_t b)
Atomically set the bit at b to false returning the old value.
void close_column(column_address addr)
bool typed_decode(const block_info &info, char *start, size_t len, std::vector< flexible_type > &ret)