6 #ifndef TURI_SFRAME_COMPACT_IMPL_HPP 7 #define TURI_SFRAME_COMPACT_IMPL_HPP 10 #include <core/storage/sframe_data/sarray.hpp> 16 inline size_t get_num_blocks_in_segment_file(
const std::string& s) {
18 auto columnaddr = manager.open_column(s);
19 return manager.num_blocks_in_column(columnaddr);
23 inline std::shared_ptr<sarray<T>>
24 compact_rows(sarray<T>& arr,
size_t row_start,
size_t row_end) {
26 auto ret = std::make_shared<sarray<T>>();
27 ret->open_for_write(1);
28 auto output = ret->get_output_iterator(0);
31 auto reader = arr.get_reader();
33 while(row_start < row_end) {
35 bool read_ok = reader->read_rows(row_start, read_end, rows);
54 auto updated_index = index;
56 updated_index.segment_files.clear();
58 size_t row_counter = 0;
59 bool compaction_performed =
false;
64 std::vector<std::shared_ptr<sarray<T>>> new_sarrays;
66 for (
size_t i = 0;i < index.segment_files.size(); ++i) {
67 size_t nblocks = get_num_blocks_in_segment_file(index.segment_files[i]);
70 size_t runlength_in_segments = 1;
71 size_t runlength_in_rows = index.segment_sizes[i];
72 for (
size_t j = i + 1; j < index.segment_files.size(); ++j) {
73 size_t nblocks = get_num_blocks_in_segment_file(index.segment_files[j]);
75 runlength_in_rows += index.segment_sizes[j];
76 ++runlength_in_segments;
82 if (runlength_in_segments > 1) {
84 << runlength_in_segments <<
" blocks, " 85 << runlength_in_rows <<
" rows" << std::endl;
88 auto new_sarray = compact_rows(column, row_counter, row_counter + runlength_in_rows);
90 auto new_sarray_index = new_sarray->get_index_info();
91 ASSERT_EQ(new_sarray_index.segment_files.size(), 1);
92 ASSERT_EQ(new_sarray_index.segment_sizes[0], runlength_in_rows);
93 row_counter += runlength_in_rows;
94 updated_index.segment_sizes.push_back(new_sarray_index.segment_sizes[0]);
95 updated_index.segment_files.push_back(new_sarray_index.segment_files[0]);
99 compaction_performed =
true;
100 new_sarrays.push_back(new_sarray);
103 i += runlength_in_segments - 1;
107 row_counter += index.segment_sizes[i];
108 updated_index.segment_sizes.push_back(index.segment_sizes[i]);
109 updated_index.segment_files.push_back(index.segment_files[i]);
112 if (compaction_performed) {
114 updated_index.nsegments = updated_index.segment_files.
size();
116 ASSERT_EQ(final_array.
size(), column.
size());
117 column = final_array;
119 return compaction_performed;
123 template <
typename T>
128 <<
"Slow compaction triggered because fast compact did not achieve target"
std::shared_ptr< sarray > clone(size_t nsegments=0) const
static block_manager & get_instance()
Get singleton instance.
const index_file_information get_index_info() const
static size_t cpu_count()
const size_t DEFAULT_SARRAY_READER_BUFFER_SIZE
void open_for_read(index_file_information info)
#define ASSERT_TRUE(cond)
bool sarray_fast_compact(sarray< T > &column)
size_t FAST_COMPACT_BLOCKS_IN_SMALL_SEGMENT
void sarray_compact(sarray< T > &column, size_t segment_threshold)