Turi Create  4.0
sframe_compact_impl.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_COMPACT_IMPL_HPP
7 #define TURI_SFRAME_COMPACT_IMPL_HPP
8 #include <vector>
9 #include <memory>
10 #include <core/storage/sframe_data/sarray.hpp>
11 namespace turi {
12 namespace {
13 /**
14  * Returns the number of blocks in a segment file
15  */
16 inline size_t get_num_blocks_in_segment_file(const std::string& s) {
18  auto columnaddr = manager.open_column(s);
19  return manager.num_blocks_in_column(columnaddr);
20 }
21 
22 template <typename T>
23 inline std::shared_ptr<sarray<T>>
24 compact_rows(sarray<T>& arr, size_t row_start, size_t row_end) {
25  // returned sarray
26  auto ret = std::make_shared<sarray<T>>();
27  ret->open_for_write(1);
28  auto output = ret->get_output_iterator(0);
29 
30  // read input array
31  auto reader = arr.get_reader();
32  sframe_rows rows;
33  while(row_start < row_end) {
34  size_t read_end = std::min(row_start + DEFAULT_SARRAY_READER_BUFFER_SIZE, row_end);
35  bool read_ok = reader->read_rows(row_start, read_end, rows);
36  ASSERT_TRUE(read_ok);
37 
38  // write output array
39  (*output) = rows;
40  row_start = read_end;
41  }
42  ret->close();
43  return ret;
44 }
45 
46 } // anonymous namespace
47 
48 template <typename T>
50 
51  auto index = column.get_index_info();
52 
53  // this is the resultant index
54  auto updated_index = index;
55  updated_index.segment_sizes.clear();
56  updated_index.segment_files.clear();
57 
58  size_t row_counter = 0;
59  bool compaction_performed = false;
60 
61  // we keep a buffer of the new sarrays constructed after compaction until we
62  // actually piece them together. To avoid them going out of scope and
63  // and clearing the data.
64  std::vector<std::shared_ptr<sarray<T>>> new_sarrays;
65 
66  for (size_t i = 0;i < index.segment_files.size(); ++i) {
67  size_t nblocks = get_num_blocks_in_segment_file(index.segment_files[i]);
69  // find a run of block size 1
70  size_t runlength_in_segments = 1;
71  size_t runlength_in_rows = index.segment_sizes[i];
72  for (size_t j = i + 1; j < index.segment_files.size(); ++j) {
73  size_t nblocks = get_num_blocks_in_segment_file(index.segment_files[j]);
74  if (nblocks <= 1) {
75  runlength_in_rows += index.segment_sizes[j];
76  ++runlength_in_segments;
77  } else {
78  break;
79  }
80  }
81  // we only compact if there is more than one segment of block size 1
82  if (runlength_in_segments > 1) {
83  logstream(LOG_INFO) << "Compacting range of "
84  << runlength_in_segments << " blocks, "
85  << runlength_in_rows << " rows" << std::endl;
86 
87  // we compact the segment range
88  auto new_sarray = compact_rows(column, row_counter, row_counter + runlength_in_rows);
89  // put it into the updated index
90  auto new_sarray_index = new_sarray->get_index_info();
91  ASSERT_EQ(new_sarray_index.segment_files.size(), 1);
92  ASSERT_EQ(new_sarray_index.segment_sizes[0], runlength_in_rows);
93  row_counter += runlength_in_rows;
94  updated_index.segment_sizes.push_back(new_sarray_index.segment_sizes[0]);
95  updated_index.segment_files.push_back(new_sarray_index.segment_files[0]);
96 
97  //remember the new sarray so that it doesn't go out of scope
98  //until we actually construct the result.
99  compaction_performed = true;
100  new_sarrays.push_back(new_sarray);
101  // increment the index by the right amount so that we end up
102  // on the segment after the run
103  i += runlength_in_segments - 1;
104  continue;
105  }
106  }
107  row_counter += index.segment_sizes[i];
108  updated_index.segment_sizes.push_back(index.segment_sizes[i]);
109  updated_index.segment_files.push_back(index.segment_files[i]);
110  }
111 
112  if (compaction_performed) {
113  sarray<T> final_array;
114  updated_index.nsegments = updated_index.segment_files.size();
115  final_array.open_for_read(updated_index);
116  ASSERT_EQ(final_array.size(), column.size());
117  column = final_array;
118  }
119  return compaction_performed;
120 }
121 
122 
123 template <typename T>
124 void sarray_compact(sarray<T>& column, size_t segment_threshold) {
125  sarray_fast_compact(column);
126  if (column.get_index_info().segment_files.size() > segment_threshold) {
128  << "Slow compaction triggered because fast compact did not achieve target"
129  << std::endl;
130  column = *(column.clone(std::min(segment_threshold, thread::cpu_count())));
131  }
132 }
133 
134 } // turi
135 #endif
#define logstream(lvl)
Definition: logger.hpp:276
std::shared_ptr< sarray > clone(size_t nsegments=0) const
Definition: sarray.hpp:488
size_t size() const
Definition: sarray.hpp:382
static block_manager & get_instance()
Get singleton instance.
#define LOG_INFO
Definition: logger.hpp:101
std::vector< std::string > segment_files
The file name of each segment.
const index_file_information get_index_info() const
Definition: sarray.hpp:448
static size_t cpu_count()
const size_t DEFAULT_SARRAY_READER_BUFFER_SIZE
void open_for_read(index_file_information info)
Definition: sarray.hpp:232
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
bool sarray_fast_compact(sarray< T > &column)
size_t FAST_COMPACT_BLOCKS_IN_SMALL_SEGMENT
void sarray_compact(sarray< T > &column, size_t segment_threshold)
std::vector< size_t > segment_sizes
The length of each segment (number of entries).