Turi Create  4.0
sarray_file_format_v2.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_SARRAY_FILE_FORMAT_V2_HPP
7 #define TURI_SFRAME_SARRAY_FILE_FORMAT_V2_HPP
8 #include <string>
9 #include <memory>
10 #include <typeinfo>
11 #include <map>
12 #include <core/parallel/mutex.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <core/logging/logger.hpp>
15 #include <core/random/random.hpp>
16 #include <core/util/dense_bitset.hpp>
17 #include <core/storage/sframe_data/sarray_file_format_interface.hpp>
18 #include <core/storage/sframe_data/sarray_index_file.hpp>
19 #include <core/storage/fileio/general_fstream.hpp>
20 #include <core/storage/fileio/temp_files.hpp>
21 #include <core/storage/serialization/serialization_includes.hpp>
22 #include <core/storage/sframe_data/sframe_constants.hpp>
23 #include <core/storage/sframe_data/sarray_v2_block_manager.hpp>
24 #include <core/storage/sframe_data/sarray_v2_block_writer.hpp>
25 #include <core/storage/sframe_data/sarray_v2_encoded_block.hpp>
26 #include <core/storage/sframe_data/sarray_v2_type_encoding.hpp>
27 #include <core/system/cppipc/server/cancel_ops.hpp>
28 namespace turi {
29 
30 
31 
32 /**
33  * \internal
34  * \ingroup sframe_physical
35  * \addtogroup sframe_internal SFrame Internal
36  * \{
37  */
38 
39 /**
40  * This class implements the version 2 file format.
41  * See the sarray_v2_block_manager for format details.
42  */
43 template <typename T>
44 class sarray_format_reader_v2: public sarray_format_reader<T> {
45  public:
46  /// Default Constructor
48  m_manager(v2_block_impl::block_manager::get_instance()) {
49  }
50 
51  /**
52  * Destructor. Also closes the sarray if open.
53  */
55  close();
56  }
57 
58  /// deleted copy constructor
59  sarray_format_reader_v2(const sarray_format_reader_v2& other) = delete;
60 
61  /// deleted assignment
63 
64 
65  /**
66  * Open has to be called before any of the other functions are called.
67  * Throws a string exception if it is unable to open the index file, or if
68  * there is a format error in the sarray index file.
69  *
70  * Will throw an exception if a file set is already open.
71  */
73  close();
74  m_index_info = index;
75  m_block_list.clear();
76  m_start_row.clear();
77  m_segment_list.clear();
78  m_num_rows = 0;
79  size_t row_count = 0;
80 
81  for (size_t i = 0;i < index.segment_files.size(); ++i) {
82  auto columnaddr = m_manager.open_column(index.segment_files[i]);
83  m_segment_list.push_back(columnaddr);
84  size_t nblocks = m_manager.num_blocks_in_column(columnaddr);
85 
86  size_t segment_id, column_id;
87  std::tie(segment_id, column_id) = columnaddr;
88 
89  const std::vector<std::vector<v2_block_impl::block_info>>& segment_blocks = m_manager.get_all_block_info(segment_id);
90 
91  for (size_t j = 0; j < nblocks; ++j) {
92  block_address blockaddr{std::get<0>(columnaddr), std::get<1>(columnaddr), j};
93  m_start_row.push_back(row_count);
94  row_count += segment_blocks[column_id][j].num_elem;
95  m_block_list.push_back(blockaddr);
96  }
97  }
98  for (auto& ssize: m_index_info.segment_sizes) m_num_rows += ssize;
99  m_cache.clear();
100  m_cache.resize(m_block_list.size());
101  m_used_cache_entries.resize(m_block_list.size());
102  m_used_cache_entries.clear();
103  // it is convenient for m_start_row to have one more entry which is
104  // the total # elements in the file
105  m_start_row.push_back(m_num_rows);
106  ASSERT_EQ(m_num_rows, row_count);
107  }
108 
109  /**
110  * Opens a single sidx file
111  */
112  void open(std::string sidx_file) {
113  open(read_index_file(sidx_file));
114  }
115  /**
116  * Closes an sarray file set. No-op if the array is already closed.
117  */
118  void close() {
119  // close all columns
120  for (auto column: m_segment_list) {
121  m_manager.close_column(column);
122  }
123  m_segment_list.clear();
124  m_cache.clear();
125  }
126 
127  /**
128  * Return the number of segments in the sarray
129  */
130  size_t num_segments() const {
131  return m_index_info.nsegments;
132  }
133 
134  /**
135  * Returns the number of elements in a given segment.
136  */
137  size_t segment_size(size_t segmentid) const {
138  DASSERT_LT(segmentid, m_index_info.nsegments);
139  return m_index_info.segment_sizes[segmentid];
140  }
141 
142  /**
143  * Gets the contents of the index file information read from the index file
144  */
146  return m_index_info;
147  }
148 
149  /**
150  * Returns the index_file of the array (the argument in \ref open)
151  */
152  std::string get_index_file() const {
153  return m_index_info.index_file;
154  }
155 
156  size_t read_rows(size_t row_start,
157  size_t row_end,
158  sframe_rows& out_obj);
159 
160  /**
161  * Reads a collection of rows, storing the result in out_obj.
162  * This function is independent of the open_segment/read_segment/close_segment
163  * functions, and can be called anytime. This function is also fully
164  * concurrent.
165  * \param row_start First row to read
166  * \param row_end one past the last row to read (i.e. EXCLUSIVE). row_end can
167  * be beyond the end of the array, in which case,
168  * fewer rows will be read.
169  * \param out_obj The output array
170  * \returns Actual number of rows read. Return (size_t)(-1) on failure.
171  *
172  * \note This function is currently only optimized for "mostly" sequential
173  * reads. i.e. we are expecting read_rows(a, b), to be soon followed by
174  * read_rows(b,c), etc.
175  */
176  size_t read_rows(size_t row_start,
177  size_t row_end,
178  std::vector<T>& out_obj) {
179  if (row_end > m_num_rows) row_end = m_num_rows;
180  if (row_start >= row_end) {
181  out_obj.clear();
182  return 0;
183  }
184  out_obj.resize(row_end - row_start);
185  fetch_rows_from_cache(row_start, row_end, out_obj);
186 
187  if(cppipc::must_cancel()) {
188  throw(std::string("Cancelled by user."));
189  }
190  return out_obj.size();
191  }
192 
193  private:
194  typedef v2_block_impl::block_address block_address;
195  typedef v2_block_impl::column_address column_address;
197 
198  /// A reference to the manager
199  v2_block_impl::block_manager& m_manager;
200 
201  /// The index information of this array
202  index_file_information m_index_info;
203  /// NUmber of rows of this array
204  size_t m_num_rows;
205  std::vector<block_address> m_block_list;
206  std::vector<size_t> m_start_row;
207  std::vector<column_address> m_segment_list;
208 
209  /**
210  * this describes one cache block.
211  *
212  * Each cache_entry is a decoded block. m_start_row provides the information
213  * as to which row this block contains. buffer_start_row is the first
214  * row in the buffer which is usable.
215  *
216  * The caching algorithm works as such:
217  * - fetch the cache entry from file for a given block if it doesn't exist.
218  * If we exceed the maximum cache limit, we evict something random
219  * - If buffer_start_row matches the first requested row, this is a
220  * sequential access and we use "moves" to move the read data into the
221  * user's buffer. We will then have to update the buffer_start_row since
222  * some data is now missing from the buffer. If the cache becomes empty
223  * we evict the entry.)
224  * - If buffer_start_row does not match the first requested row, it is
225  * a random access and we use copies.
226  *
227  * The random eviction process works as such:
228  * - m_used_cache_entries is a bitfield which lists the buffers in use
229  * - m_cache_size is an atomic counter which counts the number of buffers
230  * - When an eviction happens, we pick a random block number and search
231  * for the next block number which contains a cache entry, and try to evict
232  * that.
233  */
234  struct cache_entry {
235  cache_entry() = default;
236  cache_entry(const cache_entry& other) = default;
237 
238  cache_entry(cache_entry&& other) {
239  buffer_start_row = std::move(other.buffer_start_row);
240  is_encoded = std::move(other.is_encoded);
241  buffer = std::move(other.buffer);
242  encoded_buffer = std::move(other.encoded_buffer);
243  encoded_buffer_reader = std::move(other.encoded_buffer_reader);
244  }
245 
246  cache_entry& operator=(const cache_entry& other) = default;
247 
248  cache_entry& operator=(cache_entry&& other) {
249  buffer_start_row = std::move(other.buffer_start_row);
250  is_encoded = std::move(other.is_encoded);
251  buffer = std::move(other.buffer);
252  encoded_buffer = std::move(other.encoded_buffer);
253  encoded_buffer_reader = std::move(other.encoded_buffer_reader);
254  }
256  /// First accessible row in buffer. Either encoded or decoded.
257  size_t buffer_start_row = 0;
258  // whether this cache entry is held encoded or decoded
259  bool is_encoded = false;
260  bool has_data = false;
261  // if it is held decoded
262  std::shared_ptr<std::vector<T> > buffer;
263  // if it is held encoded
264  v2_block_impl::encoded_block encoded_buffer;
265  v2_block_impl::encoded_block_range encoded_buffer_reader;
266  };
267 
268  mutex m_lock;
269  /**
270  * This lists the cache entriese that have values in them
271  */
272  dense_bitset m_used_cache_entries;
273  /// The number of cached blocks. If this gets big we need to evict something
274  atomic<size_t> m_cache_size;
275  /**
276  * There is one cache object for each block
277  */
278  std::vector<cache_entry> m_cache;
279  static buffer_pool<std::vector<T> > m_buffer_pool;
280 
281  /**
282  * Extracts as many from fetch_start to fetch_end from the cache
283  * inserting into out_obj. out_obj must be resized to fetch_end - fetch_start.
284  *
285  * See \ref cache_entry for details on the caching process.
286  */
287  void fetch_rows_from_cache(size_t fetch_start,
288  size_t fetch_end,
289  std::vector<T>& out_obj);
290 
291  void ensure_cache_decoded(cache_entry& cache, size_t block_number);
292 
293  /**
294  * Releases a cache entry.
295  * Releases the buffer back to the pool and update the bitfield and
296  * cache_size counters.
297  */
298  void release_cache(size_t block_number) {
299  // if there is something to release
300  if (m_cache[block_number].has_data) {
301 // std::cerr << "Releasing cache : " << block_number << std::endl;
302  m_buffer_pool.release_buffer(std::move(m_cache[block_number].buffer));
303  m_cache[block_number].buffer.reset();
304  m_cache[block_number].encoded_buffer.release();
305  m_cache[block_number].encoded_buffer_reader.release();
306  m_cache[block_number].has_data = false;
307  m_used_cache_entries.clear_bit(block_number);
308  m_cache_size.dec();
309  }
310  }
311 
312  /**
313  * Picks a random number and evicts the next block after the number
314  * (looping around).
315  */
316  void try_evict_something_from_cache() {
317  size_t b = turi::random::fast_uniform<size_t>(0, m_cache.size() - 1);
318  /*
319  * if the current bit is not 1, try to find the next one bit
320  * if there is no bit after that, loop around, reset and 0 and try the bit
321  * after that
322  */
323  if (!m_used_cache_entries.get(b) &&
324  !m_used_cache_entries.next_bit(b)) {
325  // loop around to 0
326  b = 0;
327  if (!m_used_cache_entries.get(b)) m_used_cache_entries.next_bit(b);
328  }
329  std::unique_lock<turi::simple_spinlock> cache_lock_guard(m_cache[b].lock, std::defer_lock);
330  if (cache_lock_guard.try_lock()) {
331  release_cache(b);
332  }
333  }
334 
335  void fetch_cache_from_file(size_t block_number, cache_entry& ret);
336 
337  size_t block_offset_containing_row(size_t row) {
338  auto pos = std::lower_bound(m_start_row.begin(), m_start_row.end(), row);
339  size_t blocknum = std::distance(m_start_row.begin(), pos);
340  // common case
341  if (blocknum < m_block_list.size()) {
342  // the block containing this could be either at blocknum, or blocknum - 1
343  if (m_start_row[blocknum] == row) {
344  return blocknum;
345  } else {
346  return blocknum - 1;
347  }
348  } else {
349  // the last block
350  return m_block_list.size() - 1;
351  }
352  }
353 };
354 
355 template <typename T>
357 
358 // specialization for fetch_cache_from_file when T is a flexible_type
359 // since this permits an encoded representation
360 template <>
361 inline void
363 fetch_cache_from_file(size_t block_number, cache_entry& ret) {
364 // std::cerr << "Fetching from file: " << block_number << std::endl;
365  // don't use the buffer. hold as encoded always when reading from a
366  // flexible_type file
367  if (ret.buffer) {
368  m_buffer_pool.release_buffer(std::move(ret.buffer));
369  ret.buffer.reset();
370  }
371  block_address block_addr = m_block_list[block_number];
373  auto buffer = m_manager.read_block(block_addr, &info);
374  if (buffer == nullptr) {
375  log_and_throw("Unexpected block read failure. Bad file?");
376  }
377  ret.buffer_start_row = m_start_row[block_number];
378  ret.encoded_buffer.init(*info, buffer);
379  ret.encoded_buffer_reader = ret.encoded_buffer.get_range();
380  ret.is_encoded = true;
381  ret.has_data = true;
382  if (m_used_cache_entries.get(block_number) == false) m_cache_size.inc();
383  m_used_cache_entries.set_bit(block_number);
384  // evict something random
385  // we will only loop at most this number of times
386  size_t num_to_evict = m_cache_size.value -
388  while(num_to_evict > 0 &&
389  m_cache_size.value > SFRAME_MAX_BLOCKS_IN_CACHE) {
390  try_evict_something_from_cache();
391  --num_to_evict;
392  }
393 }
394 
395 template <typename T>
396 inline void
398 fetch_cache_from_file(size_t block_number, cache_entry& ret) {
399 // std::cerr << "Fetching from file: " << block_number << std::endl;
400  if (!ret.buffer) ret.buffer = m_buffer_pool.get_new_buffer();
401  block_address block_addr = m_block_list[block_number];
402  if (!m_manager.read_block(block_addr, *ret.buffer, NULL)) {
403  log_and_throw("Unexpected block read failure. Bad file?");
404  }
405  ret.buffer_start_row = m_start_row[block_number];
406  ret.is_encoded = false;
407  ret.has_data = true;
408  if (m_used_cache_entries.get(block_number) == false) m_cache_size.inc();
409  m_used_cache_entries.set_bit(block_number);
410  // evict something random
411  // we will only loop at most this number of times
412  size_t num_to_evict = m_cache_size.value - SFRAME_MAX_BLOCKS_IN_CACHE;
413  while(num_to_evict > 0 &&
414  m_cache_size.value > SFRAME_MAX_BLOCKS_IN_CACHE) {
415  try_evict_something_from_cache();
416  --num_to_evict;
417  }
418 }
419 
420 
421 template <>
423 ensure_cache_decoded(cache_entry& cache, size_t block_number) {
424  if (cache.is_encoded) {
425  cache.buffer = m_buffer_pool.get_new_buffer();
426  auto data = cache.encoded_buffer.get_block_data();
427  v2_block_impl::typed_decode(cache.encoded_buffer.get_block_info(),
428  data->data(),
429  data->size(),
430  *cache.buffer);
431  // clear the encoded buffer information
432  cache.encoded_buffer.release();
433  cache.encoded_buffer_reader.release();
434  // reset the start row
435  cache.is_encoded = false;
436  cache.buffer_start_row = m_start_row[block_number];
437  }
438 }
439 
440 
441 template <typename T>
443 ensure_cache_decoded(cache_entry& cache, size_t block_number) {
444  ASSERT_MSG(false, "Attempting to type decode a non-flexible_type column");
445 }
446 
447 
448 // specialization for fetch_rows_from_cache when T is a flexible_type
449 // to permit pulling from the encoded representation
450 template <>
452 fetch_rows_from_cache(size_t fetch_start,
453  size_t fetch_end,
454  std::vector<flexible_type>& out_obj) {
455  // find block address containing fetch_start and block containing fetch_end
456  size_t start_offset = block_offset_containing_row(fetch_start);
457  size_t end_offset = block_offset_containing_row(fetch_end - 1) + 1;
458  size_t output_idx = 0;
459  for (size_t i = start_offset; i < end_offset; ++i) {
460  size_t first_row_to_fetch_in_this_block = std::max(fetch_start, m_start_row[i]);
461  size_t last_row_to_fetch_in_this_block = std::min(fetch_end, m_start_row[i+1]);
462  auto& cache = m_cache[i];
463  std::unique_lock<turi::simple_spinlock> cache_lock_guard(cache.lock);
464  if (!cache.has_data) {
465  fetch_cache_from_file(i, cache);
466  }
467  if (cache.buffer_start_row < first_row_to_fetch_in_this_block && cache.is_encoded) {
468  // fast forward
469  size_t diff = first_row_to_fetch_in_this_block - cache.buffer_start_row;
470  cache.encoded_buffer_reader.skip(diff);
471  cache.buffer_start_row = first_row_to_fetch_in_this_block;
472  }
473 
474  if (cache.buffer_start_row == first_row_to_fetch_in_this_block) {
475  // this is a sequential read
476  // we do moves, and we can handle encoded reads
477  if (cache.is_encoded) {
478  size_t num_elem = last_row_to_fetch_in_this_block - first_row_to_fetch_in_this_block;
479  cache.encoded_buffer_reader.decode_to(&(out_obj[output_idx]), num_elem);
480  output_idx += num_elem;
481  cache.buffer_start_row = last_row_to_fetch_in_this_block;
482  } else {
483  size_t input_offset = m_start_row[i];
484  for (size_t j = first_row_to_fetch_in_this_block;
485  j < last_row_to_fetch_in_this_block;
486  ++j) {
487  out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
488  }
489  }
490  if (last_row_to_fetch_in_this_block == m_start_row[i + 1]) {
491  // we have exhausted this cache
492  release_cache(i);
493  }
494  } else {
495  // non sequential read
496  // we copy without updating the start_row
497  ensure_cache_decoded(cache, i);
498  size_t input_offset = m_start_row[i];
499  for (size_t j = first_row_to_fetch_in_this_block;
500  j < last_row_to_fetch_in_this_block;
501  ++j) {
502  out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
503  }
504  }
505  }
506 }
507 
508 template <typename T>
510 fetch_rows_from_cache(size_t fetch_start,
511  size_t fetch_end,
512  std::vector<T>& out_obj) {
513  // find block address containing fetch_start and block containing fetch_end
514 // std::cerr << "Fetching from cache: " << fetch_start << " " << fetch_end << std::endl;
515  size_t start_offset = block_offset_containing_row(fetch_start);
516  size_t end_offset = block_offset_containing_row(fetch_end - 1) + 1;
517  size_t output_idx = 0;
518  for (size_t i = start_offset; i < end_offset; ++i) {
519  size_t first_row_to_fetch_in_this_block = std::max(fetch_start, m_start_row[i]);
520  size_t last_row_to_fetch_in_this_block = std::min(fetch_end, m_start_row[i+1]);
521  auto& cache = m_cache[i];
522  std::unique_lock<turi::simple_spinlock> cache_lock_guard(cache.lock);
523  if (!cache.buffer ||
524  cache.buffer_start_row > first_row_to_fetch_in_this_block) {
525  // we need to reload the cache
526  fetch_cache_from_file(i, cache);
527  }
528  if (cache.buffer_start_row == first_row_to_fetch_in_this_block) {
529  // this is a sequential read
530  // encoded reads are impossible
531  size_t input_offset = m_start_row[i];
532  for (size_t j = first_row_to_fetch_in_this_block;
533  j < last_row_to_fetch_in_this_block;
534  ++j) {
535  out_obj[output_idx++] = std::move((*cache.buffer)[j - input_offset]);
536  }
537  cache.buffer_start_row = last_row_to_fetch_in_this_block;
538  if (last_row_to_fetch_in_this_block == m_start_row[i + 1]) {
539  // we have exhausted this cache
540  release_cache(i);
541  }
542  } else {
543  // non sequential read
544  // we copy without updating the start_row
545  size_t input_offset = m_start_row[i];
546  for (size_t j = first_row_to_fetch_in_this_block;
547  j < last_row_to_fetch_in_this_block;
548  ++j) {
549  out_obj[output_idx++] = (*cache.buffer)[j - input_offset];
550  }
551  }
552  }
553 }
554 
555 template <>
557 read_rows(size_t row_start,
558  size_t row_end,
559  sframe_rows& out_obj) {
560  return sarray_format_reader<flexible_type>::read_rows(row_start, row_end, out_obj);
561 }
562 
563 
564 template <typename T>
565 inline size_t sarray_format_reader_v2<T>::
566 read_rows(size_t row_start,
567  size_t row_end,
568  sframe_rows& out_obj) {
569  ASSERT_MSG(false, "Attempting to type decode a non-flexible_type column");
570  return 0;
571 }
572 
573 
574 /**
575  * The array group writer which emits array v2 file formats.
576  */
577 template <typename T>
579  public:
580  /**
581  * Open has to be called before any of the other functions are called.
582  * No files are actually opened at this point.
583  */
584  void open(std::string index_file,
585  size_t segments_to_create,
586  size_t columns_to_create) {
587  if (columns_to_create == 0) {
588  segments_to_create = 0;
589  }
590  m_array_open = true;
591  m_writer.init(index_file, segments_to_create, columns_to_create);
592  m_nsegments = segments_to_create;
593  m_column_buffers.resize(columns_to_create);
594  for (size_t i = 0; i < columns_to_create; ++i) {
595  m_column_buffers[i].segment_data.resize(segments_to_create);
596  }
597  for (size_t i = 0; i < m_nsegments; ++i) {
598  open_segment(i);
599  }
600  }
601 
602  /**
603  * Sets write options. See \ref v2_block_impl::block_writer::set_options
604  */
605  void set_options(const std::string& option, int64_t value) {
606  m_writer.set_options(option, value);
607  }
608 
609  /**
610  * Gets a modifiable reference to the index file information which will
611  * be written to the index file.
612  */
614  return m_writer.get_index_info();
615  }
616 
617  /**
618  * Writes a row to the array group
619  */
620  void write_segment(size_t segmentid,
621  const std::vector<T>& v) {
622  DASSERT_LT(segmentid, m_nsegments);
623  DASSERT_LE(v.size(), m_column_buffers.size());
624  DASSERT_EQ(m_array_open, true);
625  for (size_t i = 0;i < v.size(); ++i) {
626  write_segment(i, segmentid, v[i]);
627  }
628  }
629 
630  /**
631  * Writes a row to the array group
632  */
633  void write_segment(size_t segmentid,
634  std::vector<T>&& v) {
635  DASSERT_LT(segmentid, m_nsegments);
636  DASSERT_LE(v.size(), m_column_buffers.size());
637  DASSERT_EQ(m_array_open, true);
638  for (size_t i = 0;i < v.size(); ++i) {
639  write_segment(i, segmentid, std::forward<T>(v[i]));
640  }
641  }
642 
643  /**
644  * Writes a row to the array group
645  */
646  void write_segment(size_t columnid,
647  size_t segmentid,
648  const T& t) {
649  DASSERT_LT(segmentid, m_nsegments);
650  DASSERT_LT(columnid, m_column_buffers.size());
651  DASSERT_EQ(m_array_open, true);
652  m_column_buffers[columnid].segment_data[segmentid].push_back(t);
653  if (m_column_buffers[columnid].segment_data[segmentid].size() >=
654  m_column_buffers[columnid].elements_before_flush) {
655  flush_block(columnid, segmentid);
656  }
657  }
658 
659  /**
660  * Writes a collection of rows to a column
661  */
662  void write_column(size_t columnid,
663  size_t segmentid,
664  const std::vector<T>& t) {
665  DASSERT_LT(segmentid, m_nsegments);
666  DASSERT_LT(columnid, m_column_buffers.size());
667  DASSERT_EQ(m_array_open, true);
668 
669  auto& el_before_flush = m_column_buffers[columnid].elements_before_flush;
670  auto& buffer = m_column_buffers[columnid].segment_data[segmentid];
671  for(const auto& elem: t) {
672  buffer.push_back(elem);
673  if (buffer.size() >= el_before_flush) {
674  flush_block(columnid, segmentid);
675  }
676  }
677  }
678 
679  /**
680  * Writes a collection of rows to a column
681  */
682  void write_column(size_t columnid,
683  size_t segmentid,
684  std::vector<T>&& t) {
685  DASSERT_LT(segmentid, m_nsegments);
686  DASSERT_LT(columnid, m_column_buffers.size());
687  DASSERT_EQ(m_array_open, true);
688 
689  auto el_before_flush = m_column_buffers[columnid].elements_before_flush;
690  auto& buffer = m_column_buffers[columnid].segment_data[segmentid];
691  for(const auto& elem: t) {
692  buffer.push_back(std::move(elem));
693  if (buffer.size() >= el_before_flush) {
694  flush_block(columnid, segmentid);
695  el_before_flush = m_column_buffers[columnid].elements_before_flush;
696  }
697  }
698  }
699 
700  /**
701  * Writes a row to the array group
702  */
703  void write_segment(size_t columnid,
704  size_t segmentid,
705  T&& t) {
706  DASSERT_LT(segmentid, m_nsegments);
707  DASSERT_LT(columnid, m_column_buffers.size());
708  DASSERT_EQ(m_array_open, true);
709  m_column_buffers[columnid].segment_data[segmentid].push_back(std::forward<T>(t));
710  if (m_column_buffers[columnid].segment_data[segmentid].size() >=
711  m_column_buffers[columnid].elements_before_flush) {
712  flush_block(columnid, segmentid);
713  }
714  }
715 
716  void write_segment(size_t segmentid, const sframe_rows& rows);
717 
718  /** Closes all writes
719  */
720  void close() {
721  ASSERT_EQ(m_array_open, true);
722  // flush all data
723  m_array_open = false;
724  for (size_t i = 0;i < m_nsegments; ++i) {
725  for (size_t j = 0;j < m_column_buffers.size(); ++j) {
726  flush_block(j, i);
727  }
728  m_writer.close_segment(i);
729  }
730  /*
731  * for (size_t i = 0;i < m_column_buffers.size(); ++i) {
732  * logstream(LOG_INFO) << "Writing column " << i
733  * << " with total utilization of "
734  * << m_column_buffers[i].total_bytes_written << std::endl;
735  * }
736  */
737  }
738 
739  /**
740  * Flushes all writes for a particular segment
741  */
742  void flush_segment(size_t segmentid) {
743  for (size_t j = 0;j < m_column_buffers.size(); ++j) {
744  flush_block(j, segmentid);
745  m_column_buffers[j].segment_data[segmentid].shrink_to_fit();
746  }
747  }
748 
749  /**
750  * Flushes the index_file_information to disk
751  */
753  m_writer.write_index_file();
754  }
755 
756  /**
757  * Returns the number of segments
758  */
759  size_t num_segments() const {
760  ASSERT_EQ(m_array_open, true);
761  return m_nsegments;
762  }
763 
764  /**
765  * Returns the number of columns
766  */
767  size_t num_columns() const {
768  ASSERT_EQ(m_array_open, true);
769  return m_column_buffers.size();
770  }
771 
772  private:
773  /// whether the array is open
774  bool m_array_open = false;
775  /// The number of segments
776  size_t m_nsegments;
777  /// The writer
779 
780  struct column_buffer {
781  // Stores in memory, the last block that has not been written for each
782  // segment. When the block has been written, the archive is cleared.
783  simple_spinlock lock;
784  std::vector<std::vector<T> > segment_data;
785  size_t elements_before_flush = SARRAY_WRITER_INITAL_ELEMENTS_PER_BLOCK;
786  size_t total_bytes_written = 0;
787  size_t total_elements_written = 0;
788  };
789 
790  std::vector<column_buffer> m_column_buffers;
791  /**
792  * Makes a particular segment writable with \ref write_segment
793  * Should throw an exception if the segment is already open, or if
794  * the segment ID does not exist. Each segment should only be open once.
795  */
796  void open_segment(size_t segmentid) {
797  Dlog_func_entry();
798  ASSERT_MSG(m_array_open, "sarray not open");
799 
800  std::string index_file = m_writer.get_index_info().group_index_file;
801  std::string filename;
802  // put it in the same location as the index file
803  // generate a prefix for the file. if segmentid is 1, this generates 0001
804  // if segmentid is 2 this generates 0002, etc.
805  std::stringstream strm;
806  strm << index_file.substr(0, index_file.length() - 5) << ".";
807  strm.fill('0'); strm.width(4);
808  strm << segmentid;
809  filename = strm.str();
810  logstream(LOG_DEBUG) << "Open segment " << segmentid
811  << " for write on " << filename << std::endl;
812  m_writer.open_segment(segmentid, filename);
813  }
814 
815  /**
816  * Flushes the current contents of a segment of a column
817  */
818  void flush_block(size_t columnid, size_t segmentid);
819 };
820 
821 /// \}
822 template <>
824  size_t segmentid) {
825  // flexible_type specialization. writes typed blocks.
826  // if there is no data to write, skip
827  auto& colbuf = m_column_buffers[columnid];
828  if (colbuf.segment_data[segmentid].empty()) return;
829  size_t write_size = colbuf.segment_data[segmentid].size();
830  size_t ret = m_writer.write_typed_block(segmentid,
831  columnid,
832  colbuf.segment_data[segmentid],
834  colbuf.segment_data[segmentid].clear();
835  // update the column buffer counters and estimates the number of elements
836  // before the next flush.
837  std::lock_guard<simple_spinlock> guard(colbuf.lock);
838  colbuf.total_bytes_written += ret;
839  colbuf.total_elements_written += write_size;
840  colbuf.elements_before_flush = (float)(SFRAME_DEFAULT_BLOCK_SIZE) / (
841  (float)(colbuf.total_bytes_written+1) / (float)(colbuf.total_elements_written+1));
842  colbuf.elements_before_flush = std::max(colbuf.elements_before_flush,
844  colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
845  SFRAME_WRITER_MAX_BUFFERED_CELLS / (m_nsegments * m_column_buffers.size()));
846  colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
848 }
849 
850 template <typename T>
851 inline void sarray_group_format_writer_v2<T>::flush_block(size_t columnid,
852  size_t segmentid){
853  // regular type specialization. writes bytes
854  // if there is no data to write, skip
855  auto& colbuf = m_column_buffers[columnid];
856  if (colbuf.segment_data[segmentid].empty()) return;
857  size_t write_size = colbuf.segment_data[segmentid].size();
858  size_t ret = m_writer.write_block(segmentid,
859  columnid,
860  colbuf.segment_data[segmentid],
862  colbuf.segment_data[segmentid].clear();
863 
864  // update the column buffer counters and estimates the number of elements
865  // before the next flush.
866  std::lock_guard<simple_spinlock> guard(colbuf.lock);
867  colbuf.total_bytes_written += ret;
868  colbuf.total_elements_written += write_size;
869  colbuf.elements_before_flush = (float)(SFRAME_DEFAULT_BLOCK_SIZE) / (
870  (float)(colbuf.total_bytes_written+1) / (float)(colbuf.total_elements_written+1));
871  colbuf.elements_before_flush = std::max(colbuf.elements_before_flush,
873  colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
874  SFRAME_WRITER_MAX_BUFFERED_CELLS / (m_nsegments * m_column_buffers.size()));
875  colbuf.elements_before_flush = std::min(colbuf.elements_before_flush,
877 
878 }
879 
880 template <>
882  const sframe_rows& rows) {
883  DASSERT_LT(segmentid, m_nsegments);
884  DASSERT_EQ(m_array_open, true);
885 
886  DASSERT_EQ(rows.num_columns(), m_column_buffers.size());
887  const auto& cols = rows.cget_columns();
888  // reserve space in all buffers
889  for (size_t i = 0;i < m_column_buffers.size(); ++i) {
890  auto& buffer = m_column_buffers[i].segment_data[segmentid];
891  std::copy(cols[i]->begin(), cols[i]->end(), std::back_inserter(buffer));
892  if (m_column_buffers[i].segment_data[segmentid].size() >=
893  m_column_buffers[i].elements_before_flush) {
894  flush_block(i, segmentid);
895  }
896  }
897 }
898 
899 template <typename T>
901  const sframe_rows& rows) {
902  ASSERT_MSG(false, "Cannot write to general SArray with sframe_rows");
903 }
904 
905 } // namespace turi
906 #endif
std::string index_file
Input file name.
void write_segment(size_t columnid, size_t segmentid, T &&t)
#define logstream(lvl)
Definition: logger.hpp:276
sarray_format_reader_v2()
Default Constructor.
size_t SFRAME_DEFAULT_BLOCK_SIZE
sarray_format_reader_v2 & operator=(const sarray_format_reader_v2 &other)=delete
deleted assignment
void release_buffer(std::shared_ptr< T > &&buffer)
Definition: buffer_pool.hpp:67
std::tuple< size_t, size_t, size_t > block_address
std::tuple< size_t, size_t > column_address
size_t segment_size(size_t segmentid) const
size_t num_columns() const
Returns the number of columns.
Definition: sframe_rows.hpp:90
group_index_file_information & get_index_info()
group_index_file_information & get_index_info()
void open(std::string index_file, size_t segments_to_create, size_t columns_to_create)
const std::vector< ptr_to_decoded_column_type > & cget_columns() const
size_t SFRAME_MAX_BLOCKS_IN_CACHE
size_t SFRAME_WRITER_MAX_BUFFERED_CELLS
#define LOG_DEBUG
Definition: logger.hpp:102
void write_column(size_t columnid, size_t segmentid, const std::vector< T > &t)
std::vector< std::string > segment_files
The file name of each segment.
bool get(size_t b) const
Returns the value of the bit b.
void write_segment(size_t segmentid, std::vector< T > &&v)
column_address open_column(std::string column_file)
void open(std::string sidx_file)
const size_t SARRAY_WRITER_MIN_ELEMENTS_PER_BLOCK
void open(index_file_information index)
const std::vector< std::vector< block_info > > & get_all_block_info(size_t segment_id)
void resize(size_t n)
void copy(const std::string src, const std::string dest)
size_t nsegments
The number of segments in the array.
void write_segment(size_t columnid, size_t segmentid, const T &t)
std::string group_index_file
Input file name.
void clear()
Sets all bits to 0.
size_t num_blocks_in_column(column_address addr)
size_t SFRAME_WRITER_MAX_BUFFERED_CELLS_PER_BLOCK
const size_t SARRAY_WRITER_INITAL_ELEMENTS_PER_BLOCK
void write_segment(size_t segmentid, const std::vector< T > &v)
const index_file_information & get_index_info() const
bool next_bit(size_t &b) const
index_file_information read_index_file(std::string index_file)
size_t read_rows(size_t row_start, size_t row_end, std::vector< T > &out_obj)
void write_column(size_t columnid, size_t segmentid, std::vector< T > &&t)
void open_segment(size_t segment_id, std::string filename)
std::vector< size_t > segment_sizes
The length of each segment (number of entries).
bool clear_bit(size_t b)
Atomically set the bit at b to false returning the old value.
void close_column(column_address addr)
void set_options(const std::string &option, int64_t value)
bool typed_decode(const block_info &info, char *start, size_t len, std::vector< flexible_type > &ret)