Turi Create  4.0
sarray_v2_block_writer.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_SARRAY_V2_BLOCK_WRITER_HPP
7 #define TURI_SFRAME_SARRAY_V2_BLOCK_WRITER_HPP
8 #include <stdint.h>
9 #include <vector>
10 #include <fstream>
11 #include <tuple>
12 #include <core/parallel/pthread_tools.hpp>
13 #include <core/parallel/atomic.hpp>
14 #include <core/storage/fileio/general_fstream.hpp>
15 #include <core/storage/sframe_data/sarray_index_file.hpp>
16 #include <core/data/flexible_type/flexible_type.hpp>
17 #include <core/util/buffer_pool.hpp>
18 #include <core/storage/sframe_data/sarray_v2_block_types.hpp>
19 
20 namespace turi {
21 
22 
23 /**
24  * \internal
25  * \ingroup sframe_physical
26  * \addtogroup sframe_internal SFrame Internal
27  * \{
28  */
29 
30 /**
31  * SFrame v2 Format Implementation Detail
32  */
33 namespace v2_block_impl {
34 
35 /**
36  * Provides the file writing implementation for the v2 block format.
37  * See the sarray_v2_block_manager for details on the format.
38  *
39  * Basic usage is:
40  * \code
41  * block_writer writer;
42  * writer.init("index", #segments, #columns);
43  * for i = 0 to #segments:
44  * writer.open_segment(i, filename)
45  *
46  * // now you perform repeat calls to the following functions
47  * // to write blocks to the segments/columns
48  * writer.write_block(...)
49  * writer.write_typed_block(...)
50  *
51  * // close all writes
52  * for i = 0 to #segments:
53  * writer.close_segment(i)
54  *
55  * // output the array group index file
56  * writer.write_index_file()
57  * \endcode
58  */
59 class block_writer {
60  public:
61 
62  /**
63  * Opens a block writer with a target index file, the number of segments
64  * to write, and the number of columns to write.
65  */
66  void init(std::string group_index_file,
67  size_t num_segments,
68  size_t num_columns);
69 
70  /**
71  * Opens a segment, using a given file name.
72  */
73  void open_segment(size_t segment_id,
74  std::string filename);
75 
76  /**
77  * Sets write options. The only option available now is
78  * "disable_padding". If set to non-zero, disables 4K padding of blocks.
79  */
80  void set_options(const std::string& option, int64_t value);
81 
82  /**
83  * Writes a block of data into a segment.
84  *
85  * \param segmentid The segment to write to
86  * \param data Pointer to the data to write
87  * \param block_info Metadata about the block.
88  *
89  * The only fields in block_info which *must* be filled is block_size and
90  * num_elem.
91  * Returns the actual number of bytes written.
92  */
93  size_t write_block(size_t segment_id,
94  size_t column_id,
95  char* data,
96  block_info block);
97 
98  /**
99  * Writes a block of data into a segment.
100  *
101  * \param segment_id The segment to write to
102  * \param data Reference to the data to write
103  * \param block_info Metadata about the block.
104  *
105  * No fields of block_info are required at the moment.
106  * Returns the actual number of bytes written.
107  */
108  size_t write_typed_block(size_t segment_id,
109  size_t column_id,
110  const std::vector<flexible_type>& data,
111  block_info block);
112 
113 
114  /**
115  * Writes a block of arbitrary contents. Direct serialization is used.
116  */
117  template <typename T>
118  size_t write_block(size_t segment_id,
119  size_t column_id,
120  const std::vector<T>& data,
121  block_info block) {
122  auto serialization_buffer = m_buffer_pool.get_new_buffer();
123  oarchive oarc(*serialization_buffer);
124  oarc << data;
125  block.block_size = oarc.off;
126  block.num_elem = data.size();
127  size_t ret = write_block(segment_id, column_id, serialization_buffer->data(), block);
128  m_buffer_pool.release_buffer(std::move(serialization_buffer));
129  return ret;
130  }
131  /**
132  * Closes the segment file
133  */
134  void close_segment(size_t segment_id);
135 
136  /**
137  * Gets a modifiable reference to the index information for the data written.
138  * If the segment is not closed yet, partial results will be provided.
139  */
141 
142  /**
143  * Writes the index file
144  */
145  void write_index_file();
146  private:
147 
148  /// Pool of buffers used for compression, etc.
149  buffer_pool<std::vector<char> > m_buffer_pool;
150  /// The output files for each open segment
151  std::vector<std::shared_ptr<general_ofstream> > m_output_files;
152  /// Locks on the output segments
153  std::vector<turi::mutex> m_output_file_locks;
154  /// Number of bytes written to each output segments
155  std::vector<size_t> m_output_bytes_written;
156 
157  group_index_file_information m_index_info;
158 
159  /**
160  * A vector of all the block information is stuck in the footer of the file
161  * for each segment, for each column, the collection of blocks.
162  * Once inited, this array is never modified and is safe for concurrent
163  * reads.
164  * blocks[segment_id][column_id][block_id]
165  */
166  std::vector<std::vector<std::vector<block_info> > > m_blocks;
167 
168  /// For each segment, for each column the number of rows written so far
169  std::vector<std::vector<size_t> > m_column_row_counter;
170 
171  /// Writes the file footer
172  void emit_footer(size_t segment_id);
173 
174  /// Disables 4K padding if enabled
175  bool m_disable_padding = false;
176 };
177 
178 } // namespace v2_block_impl
179 
180 /// \}
181 } // namespace turi
182 #endif
void release_buffer(std::shared_ptr< T > &&buffer)
Definition: buffer_pool.hpp:67
group_index_file_information & get_index_info()
size_t write_typed_block(size_t segment_id, size_t column_id, const std::vector< flexible_type > &data, block_info block)
void set_options(const std::string &option, int64_t value)
size_t write_block(size_t segment_id, size_t column_id, const std::vector< T > &data, block_info block)
uint64_t block_size
The length of the block in bytes on disk.
std::shared_ptr< T > get_new_buffer()
Definition: buffer_pool.hpp:40
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
size_t write_block(size_t segment_id, size_t column_id, char *data, block_info block)
void init(std::string group_index_file, size_t num_segments, size_t num_columns)
void open_segment(size_t segment_id, std::string filename)
void close_segment(size_t segment_id)