6 #include <core/storage/sframe_data/sarray.hpp> 47 template <
typename DataType>
54 , num_segments(data->get_index_info().segment_files.size())
79 inline bool read_next(
size_t *row_number_start, std::vector<DataType>* read_data) {
85 std::shared_ptr<segment> read_segment;
91 if(cppipc::must_cancel()) {
92 log_and_throw(
"Canceled by user.");
96 std::lock_guard<simple_spinlock> lg(block_read_lock);
99 *row_number_start = data->size();
104 read_segment = segment_of_next_block;
107 DASSERT_LT(block_number_of_next_block, read_segment->num_blocks());
113 std::get<1>(read_segment->address()),
114 block_number_of_next_block};
116 n_elem = block_manager.
get_block_info(read_block_address).num_elem;
117 *row_number_start = row_start_idx_of_next_block;
118 row_start_idx_of_next_block += n_elem;
121 _is_done = load_next_block(
true);
125 block_manager.
read_block(read_block_address, *read_data);
126 DASSERT_EQ(read_data->size(), n_elem);
129 if(cppipc::must_cancel()) {
130 log_and_throw(
"Canceled by user.");
140 std::shared_ptr<sarray<DataType> > data;
146 const size_t num_segments = 0;
148 bool _is_done =
false;
152 size_t segment_index_of_next_block = 0;
153 size_t block_number_of_next_block = 0;
154 size_t row_start_idx_of_next_block = 0;
161 segment(
const std::shared_ptr<
sarray<DataType> >& data,
size_t segment_index)
165 const auto& column_index = data->get_index_info();
166 DASSERT_LT(segment_index, column_index.segment_files.size());
168 const auto& segment_file = column_index.segment_files[segment_index];
171 segment_address = block_manager.
open_column(segment_file);
182 return segment_address;
185 size_t num_blocks()
const {
194 size_t _num_blocks = 0;
197 std::shared_ptr<segment> segment_of_next_block;
201 if(data->size() == 0) {
206 DASSERT_NE(num_segments, 0);
207 segment_index_of_next_block = 0;
208 block_number_of_next_block = 0;
209 row_start_idx_of_next_block = 0;
211 segment_of_next_block.reset(
new segment(data, 0));
215 load_next_block(
false);
218 bool load_next_block(
bool advance_from_current_position) {
220 if(advance_from_current_position) {
221 ++block_number_of_next_block;
224 while(block_number_of_next_block == segment_of_next_block->num_blocks()) {
225 ++segment_index_of_next_block;
228 if(segment_index_of_next_block == num_segments) {
231 segment_of_next_block.reset(
new segment(data, segment_index_of_next_block));
232 block_number_of_next_block = 0;
244 template <
typename T>
std::tuple< size_t, size_t, size_t > block_address
std::tuple< size_t, size_t > column_address
static block_manager & get_instance()
Get singleton instance.
column_address open_column(std::string column_file)
bool read_next(size_t *row_number_start, std::vector< DataType > *read_data)
size_t num_blocks_in_column(column_address addr)
std::shared_ptr< std::vector< char > > read_block(block_address addr, block_info **ret_info=NULL)
sarray_block_iterator< T > make_sarray_block_iterator(const std::shared_ptr< sarray< T > > &data)
#define DASSERT_TRUE(cond)
const block_info & get_block_info(block_address addr)
void close_column(column_address addr)