Turi Create  4.0
sarray_saving.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SARRAY_SAVING_HPP
7 #define TURI_SARRAY_SAVING_HPP
8 #include <core/storage/sframe_data/sarray.hpp>
9 #include <core/storage/sframe_data/sarray_v2_block_manager.hpp>
10 #include <core/storage/sframe_data/sframe_saving_impl.hpp>
11 namespace turi {
12 
13 template <typename T>
14 void sarray_save_blockwise(const sarray<T>& cur_column,
15  std::string index_file) {
16  // initialize reader and writer
17  auto& block_manager = v2_block_impl::block_manager::get_instance();
18  v2_block_impl::block_writer writer;
19 
20  // call it indexfile.0000
21  std::string base_name;
22  size_t last_dot = index_file.find_last_of(".");
23  if (last_dot != std::string::npos) {
24  base_name = index_file.substr(0, last_dot);
25  } else {
26  base_name = index_file;
27  }
28  auto index = base_name + ".sidx";
29  auto segment_file = base_name + ".0000";
30  // we are going to emit only 1 segment. We should be rather IO bound anyway
31  // and 1 column
32  writer.init(index, 1, 1);
33  writer.open_segment(0, segment_file);
34 
35 
36  sframe_saving_impl::column_blocks col;
37  // this is going to be a max heap with each entry referencing a column.
38  try {
39  col.column_index = cur_column.get_index_info();
40  if (col.column_index.segment_files.size() > 0) {
41  col.segment_address =
42  block_manager.open_column(col.column_index.segment_files[0]);
43  // the block address is basically a tuple beginning with
44  // the column address
45  col.num_blocks_in_current_segment =
46  block_manager.num_blocks_in_column(col.segment_address);
47  col.next_row = 0;
48  col.column_number = 0;
49  col.num_segments = col.column_index.segment_files.size();
50  if (col.current_block_number >= col.num_blocks_in_current_segment) {
51  advance_column_blocks_to_next_block(block_manager, col);
52  }
53  }
54 
55 
56  writer.get_index_info().columns[0].metadata = col.column_index.metadata;
57 
58  while(!col.eof) {
59  // read a block
60  v2_block_impl::block_info* infoptr = nullptr;
61  v2_block_impl::block_info info;
63  {std::get<0>(col.segment_address),
64  std::get<1>(col.segment_address),
65  col.current_block_number};
66  auto data = block_manager.read_block(block_address , &infoptr);
67  info = *infoptr;
68  // write to segment 0. We have only 1 segment
69  writer.write_block(0, col.column_number, data->data(), info);
70  // increment the block number
71  advance_column_blocks_to_next_block(block_manager, col);
72  // if there are still blocks. push it back
73  }
74 
75  // close writers.
76  writer.close_segment(0);
77  writer.write_index_file();
78  } catch (...) {
79  // cleanup. close any open columns
80  try {
81  block_manager.close_column(col.segment_address);
82  } catch (...) { }
83  throw;
84  }
85 }
86 }
87 #endif
std::tuple< size_t, size_t, size_t > block_address
static block_manager & get_instance()
Get singleton instance.
void advance_column_blocks_to_next_block(v2_block_impl::block_manager &block_manager, column_blocks &block)