Turi Create  4.0
sarray_v2_block_manager.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_SARRAY_V2_BLOCK_MANAGER_HPP
7 #define TURI_SFRAME_SARRAY_V2_BLOCK_MANAGER_HPP
8 #include <stdint.h>
9 #include <vector>
10 #include <fstream>
11 #include <tuple>
12 #include <core/parallel/pthread_tools.hpp>
13 #include <core/parallel/atomic.hpp>
14 #include <core/storage/fileio/general_fstream.hpp>
15 #include <core/storage/sframe_data/sarray_index_file.hpp>
16 #include <core/data/flexible_type/flexible_type.hpp>
17 #include <core/util/buffer_pool.hpp>
18 #include <core/storage/sframe_data/sarray_v2_block_types.hpp>
19 
20 // forward declaration for LZ4. required here annoyingly since I have a template
21 // function here which calls it
22 extern "C" {
23  int LZ4_decompress_safe (const char* source, char* dest, int inputSize, int maxOutputSize);
24 }
25 namespace turi {
26 
27 
28 /**
29  * \internal
30  * \ingroup sframe_physical
31  * \addtogroup sframe_internal SFrame Internal
32  * \{
33  */
34 
35 /**
36  * SFrame v2 Format Implementation Detail
37  */
38 namespace v2_block_impl {
39 
40 /**
41  * Provides block reading capability in v2 segment files.
42  *
43  * This class manages block reading of an SArray/SArray group , and provides
44  * functions to query the blocks (such has how many blocks are there in the
45  * segment, and how many rows are there in the block etc).
46  *
47  * Array Group
48  * -----------
49  * An array group is a collection of segment files which contain and represent
50  * and collection of arrays (columns).
51  *
52  * Essentially an Array Group comprises of the following:
53  * - group.sidx
54  * the group index file. A JSON serialized contents of
55  * group_index_file_information. Describes a collection of arrays.
56  * - group.0000, group.0001, group.0002
57  * Each file is one segment of the array group. (multiple segments in an
58  * array group really exist only for parallel writing (and appending)
59  * capabilities. On reading, the segment layout is inconsequential, and a
60  * logical partitioning across threads is used.)
61  *
62  * Each segment file internally then has the following layout
63  * (1) Consecutive Block contents, each block 4K aligned.
64  * (2) A direct serialization of a vector<vector<block_info> > (blocks[column_id][block_id])
65  * (3) 8 bytes containing the file offset. at which (2) begins
66  *
67  * For instance, if there are 2 segments with 3 columns each of 20 rows,
68  * we may get the following layout:
69  *
70  * group.0001:
71  * - column 0, block 0, 3 rows
72  * - column 1, block 0, 3 rows
73  * - column 2, block 0, 4 rows
74  * - column 0, block 1, 4 rows
75  * - column 0, block 2, 3 rows
76  * - column 1, block 1, 7 rows
77  * - column 2, block 1, 6 rows
78  *
79  * group.0002:
80  * - column 1, block 0, 5 rows
81  * - column 0, block 0, 10 rows
82  * - column 1, block 1, 5 rows
83  * - column 2, block 0, 10 rows
84  *
85  * Observe the following:
86  * 1) Each segment contains the same number
87  * of rows from each column. (technically the format does not require
88  * this, but the writer will always produce this result)
89  * 2) Blocks can be of different sizes. (the block_manager and block_writer
90  * do not have a block size constraint. The \ref sarray_group_format_writer_v2
91  * tries to keep to a block size of SFRAME_DEFAULT_BLOCK_SIZE
92  * after compression, but this is done by performing block size estimation
93  * (#bytes written / #rows written). But the format itself does not care.
94  * 3) Blocks can be laid out in arbitrary order across columns. Striping of
95  * columns is unnecessary)
96  * 4) Within each segment, the blocks for a given column are consecutive.
97  *
98  * File Addressing
99  * ---------------
100  * Since an array group (and hence a segment) can contain multiple columns,
101  * we need a uniform way of addressing a particular column inside an array
102  * group, or inside a segment. Thus the following convention is used:
103  *
104  * Given an array group of 3 columns comprising of the files:
105  * - group.sidx
106  * - group.0000, group.0001, group.0002, group.0003
107  *
108  * Column 0 in the array group can be addressed by opening the index file
109  * "group.sidx:0". Similarly, column 2 can be addressed using "group.sidx:2"
110  *
111  * Column 2 of the array group thus has the segment files:
112  * - group.0000:2, group.0001:2, group.0002:2, group.0002:3
113  *
114  * By convention if "group.sidx" is opened as a single array, it refers to
115  * column 0.
116  *
117  * Block Manager
118  * -------------
119  * The block manager is a singleton reader object that provides read access
120  * to columns. The usage convention is:
121  * - block_manager& manager = block_manager::get_instance()
122  * - column_address = manager.open_column("group.0000:2") // opens column 2 in segment
123  * - .. do stuff ..
124  * - manager.close_column(column_address)
125  * We will expand on .. do stuff .. below.
126  *
127  * The reason for having a singleton block manager is to provide better control
128  * over file handle utilization. Specifically, the block manager maintains a
129  * pool of file handles and will recycle file handles (close them until they
130  * are next needed, then reopen and seek) so as to avoid file handle usage
131  * exceeding a certain limit (as defined in DEFAULT_FILE_HANDLE_POOL_SIZE)
132  * Furthermore, the block manager can combine accesses of multiple columns in the
133  * same array group into a single file handle. Future performance improvements
134  * involving better IO scheduling can also be performed here.
135  *
136  * When a column is opened by \ref open_column(), a \ref column_address is
137  * returned. This is a pair of integers of {segment_file_id, and column_id}.
138  * column_id is the column within the segment. For instance, opening "group.0000:2"
139  * will have column_id = 2. The segment_file_id is an internal ID assigned by
140  * the block manager to track all accesses to the file group.0000. All open calls
141  * to group.0000 will return the same segment_file_id, and a reference counter
142  * is used internally to figure out when the file handle and block metadata
143  * can be released. \ref close_column() thus must be called for every call
144  * to \ref open_column().
145  *
146  * Once the column is opened, \ref num_blocks_in_column() can be used to obtain
147  * the number of blocks in the segment file belonging to the column.
148  * \ref read_block() or \ref read_typed_block() can then be used to read the
149  * blocks. These functions take a \ref block_address, which is a triple of
150  * {segment_file_id, column_id, block_id}. The first 2 fields can be copied
151  * from the column_address, the block_id is a sequential counter from 0 to
152  * \ref num_blocks_in_column() - 1.
153  *
154  */
156  public:
157 
158  /// Get singleton instance
159  static block_manager& get_instance();
160 
161  /// default constructor.
162  block_manager();
163 
164  /**
165  * Opens a file of the form segment_file:column_number and returns the
166  * the column address: {segment_file_id, column_id}.
167  *
168  * calling num_blocks_in_column() will return the number of blocks within
169  * this column, after which columns can be read by providing
170  * {segment_file_id, column_id, block_id} to read_block()
171  *
172  * close_column() must be called for each call to open_column()
173  */
174  column_address open_column(std::string column_file);
175 
176  /**
177  * Releases the column opened with \ref open_column()
178  */
179  void close_column(column_address addr);
180 
181  /**
182  * Returns the number of blocks in this column of this segment.
183  */
185 
186  /** Returns the number of rows in a block
187  * Returns (size_t)(-1) on failure.
188  */
190 
191  /**
192  * Returns all the blockinfo in a segment
193  */
194  const std::vector<std::vector<block_info> >& get_all_block_info(size_t segment_id);
195 
196  /**
197  * Reads a block as bytes a block address ((array_group ID, segment ID, block
198  * ID) tuple),
199  *
200  * If info is not NULL, A pointer to the block information will be stored
201  * info *info. This is a pointer into internal datastructures of the
202  * block manager and should not be modified or freed.
203  *
204  * Return an empty pointer on failure.
205  *
206  * Safe for concurrent operation.
207  */
208  std::shared_ptr<std::vector<char> >
209  read_block(block_address addr, block_info** ret_info = NULL);
210 
211 
212  /**
213  * Reads a block given a block address ((array_group ID, segment ID, block
214  * ID) tuple), into a typed array. The block must have been stored as
215  * a typed block. Returns true on success, false on failure.
216  *
217  * Safe for concurrent operation.
218  */
220  std::vector<flexible_type>& ret,
221  block_info** ret_info = NULL);
222 
223  /**
224  * Reads a few blocks starting from a given a block address ((array_group ID,
225  * segment ID, block ID) tuple), into a typed array. The block must have been
226  * stored as a typed block. Returns true on success, false on failure.
227  *
228  * May return less than nblocks if addr goes past the last block.
229  *
230  * Safe for concurrent operation.
231  */
233  size_t nblocks,
234  std::vector<std::vector<flexible_type> >& ret,
235  std::vector<block_info>* ret_info = NULL);
236 
237  /**
238  * Reads a few blocks starting from a given a block address ((array_group ID,
239  * segment ID, block ID) tuple) and deserializes it into an array. The block
240  * Returns true on success, false on failure.
241  *
242  * May return less than nblocks if addr goes past the last block.
243  *
244  * Safe for concurrent operation.
245  */
246  template <typename T>
248  std::vector<T>& ret,
249  block_info** ret_info = NULL) {
250  bool success = false;
251  auto buffer = read_block(addr, ret_info);
252  if (buffer) {
253  turi::iarchive iarc(buffer->data(), buffer->size());
254  iarc >> ret;
255  success = true;
256  }
257  m_buffer_pool.release_buffer(std::move(buffer));
258  return success;
259  }
260 
261  private:
262 
263  mutable turi::mutex m_global_lock;
264  mutable turi::mutex m_file_handles_lock;
265  /**
266  * Describes an array group and all the file handles pointing into
267  * the array group
268  */
269  struct segment {
270  turi::mutex lock;
271 
272  std::string segment_file;
273 
274  size_t file_size = 0;
275 
276  size_t io_parallelism_id = 0;
277  /**
278  * File handle to this segment
279  */
280  std::weak_ptr<general_ifstream> segment_file_handle;
281 
282  bool inited = false;
283 
284  /** for for each column in the segment, the collection of blocks.
285  * Once inited, this array is never modified and is safe for concurrent
286  * reads.
287  * blocks[column_id][block_id]
288  */
289  std::vector<std::vector<block_info> > blocks;
290 
291  turi::atomic<size_t> reference_count;
292  };
293 
294  /// All the internal segments
295  size_t segment_id_counter = 0;
296  std::map<size_t, std::shared_ptr<segment> > m_segments;
297  std::map<std::string, size_t> m_file_to_segments;
298 
299  /**
300  * file handle pool management. We implement a simple LIFO pool.
301  */
302  std::deque<std::weak_ptr<general_ifstream> > m_file_handle_pool;
303 
304  /// Pool of buffers used for decompression, returns, etc.
305  buffer_pool<std::vector<char> > m_buffer_pool;
306 
307 /**************************************************************************/
308 /* */
309 /* Private Functions */
310 /* */
311 /**************************************************************************/
312 
313  /// Returns a new file handle from the file handle pool
314  std::shared_ptr<general_ifstream> get_new_file_handle(std::string file);
315 
316  /**
317  * Returns an opened handle to a segment file in an array group.
318  * Handle may be pointing anywhere within the file. This will reuse an
319  * existing handle if the handle has not yet been collected, and will
320  * create a new handle if none exist.
321  * Locks are not acquired and it is up to the caller to ensure locking.
322  */
323  std::shared_ptr<general_ifstream>
324  get_segment_file_handle(std::shared_ptr<segment>& group);
325 
326  /**
327  * reads a block from an input stream.
328  * Decompresses the block if it was compressed.
329  * Returns false on failure.
330  */
331  bool read_block_from_stream(general_ifstream& fin, std::vector<char>& ret,
332  block_info& info);
333 
334  std::shared_ptr<segment> get_segment(size_t segmentid);
335 
336  void init_segment(std::shared_ptr<segment>& seg);
337 };
338 
339 } // namespace v2_block_impl
340 
341 /// \}
342 } // namespace turi
343 #endif
void release_buffer(std::shared_ptr< T > &&buffer)
Definition: buffer_pool.hpp:67
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::tuple< size_t, size_t, size_t > block_address
std::tuple< size_t, size_t > column_address
bool read_typed_block(block_address addr, std::vector< flexible_type > &ret, block_info **ret_info=NULL)
sframe group(sframe sframe_in, std::string key_column)
static block_manager & get_instance()
Get singleton instance.
column_address open_column(std::string column_file)
const std::vector< std::vector< block_info > > & get_all_block_info(size_t segment_id)
size_t num_blocks_in_column(column_address addr)
bool read_block(block_address addr, std::vector< T > &ret, block_info **ret_info=NULL)
std::shared_ptr< std::vector< char > > read_block(block_address addr, block_info **ret_info=NULL)
block_manager()
default constructor.
bool read_typed_blocks(block_address addr, size_t nblocks, std::vector< std::vector< flexible_type > > &ret, std::vector< block_info > *ret_info=NULL)
const block_info & get_block_info(block_address addr)
void close_column(column_address addr)