Turi Create  4.0
sarray.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_SFRAME_SARRAY_HPP
7 #define TURI_UNITY_SFRAME_SARRAY_HPP
8 #include <set>
9 #include <iterator>
10 #include <type_traits>
11 #include <core/logging/logger.hpp>
12 #include <core/logging/assertions.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <core/storage/sframe_data/output_iterator.hpp>
15 #include <core/storage/serialization/iarchive.hpp>
16 #include <core/storage/serialization/oarchive.hpp>
17 #include <core/parallel/pthread_tools.hpp>
18 #include <core/parallel/lambda_omp.hpp>
19 #include <core/storage/sframe_data/swriter_base.hpp>
20 #include <core/storage/sframe_data/sarray_file_format_v2.hpp>
21 #include <core/storage/sframe_data/sarray_index_file.hpp>
22 #include <core/storage/sframe_data/algorithm.hpp>
23 #include <core/storage/sframe_data/sframe_config.hpp>
24 #include <core/data/flexible_type/flexible_type.hpp>
25 #include <core/storage/fileio/fixed_size_cache_manager.hpp>
26 #include <core/storage/fileio/file_ownership_handle.hpp>
27 #include <core/storage/fileio/file_handle_pool.hpp>
28 #include <core/system/exceptions/error_types.hpp>
29 #include <core/storage/sframe_data/sframe_constants.hpp>
30 #include <core/storage/sframe_data/sarray_saving.hpp>
31 #include <core/storage/sframe_data/sframe_compact.hpp>
32 
33 
34 namespace turi {
35 
36 template <typename T>
37 class sarray_reader;
38 
39 namespace swriter_impl {
40  // We define the iterator type here so as not to polute the outer namesapce
41  //
42 template <typename T>
43 using output_iterator = turi::sframe_function_output_iterator<
44  T,
45  std::function<void(const T&)>,
46  std::function<void(T&&)>,
47  std::function<void(const sframe_rows&)> >;
48 } // namespace swriter_impl
49 
50 /**
51  * \ingroup sframe_physical
52  * \addtogroup sframe_main Main SFrame Objects
53  * \{
54  */
55 
56 /**
57  * The SArray represents an immutable, on disk, sequence of objects T.
58  *
59  * The SArray is an immutable sequence of objects of type T, and is internally
60  * represented as a collection of files. The sequence is cut up into a
61  * collection of segments (not necessarily of equal length), where each segment
62  * covers a disjoint subset of the sequence. Each segment can then be read
63  * in parallel. SArray is referenced on disk by a single ".sidx" file, which
64  * then has a list of file names, one file for each segment.
65  *
66  * The SArray is \b write-once, \b read-many. The SArray can be opened for
67  * writing \b once, after which it is read-only.
68  *
69  * To open an existing sarray on disk for reading:
70  * \code
71  * sarray<int> array;
72  * array.open_for_read("test.sidx");
73  * \endcode
74  * Note that the type of the array on disk is NOT checked.
75  * (though, we probably should)
76  *
77  * To open an sarray for writing:
78  * \code
79  * sarray<int> array;
80  * array.open_for_write(); // create an sarray backed with temporary files
81  * //temporary files will be deleted when array goes out of scope
82  * \endcode
83  *
84  * \code
85  * sarray<int> array;
86  * array.open_for_write("test.sidx"); // create an sarray backed by real files
87  * \endcode
88  *
89  * When the array is opened for writing, it can written into using
90  * \ref get_output_iterator() , to get an output iterator into each segment.
91  * \code
92  * // Gets the output iterator for the 3rd segment
93  * auto iter = get_output_iterator(3);
94  * // writes the value "5" into the segment
95  * (*iter) = 5; ++iter;
96  *
97  * // when done,
98  * close(); // closes the write.
99  * \endcode
100  * The get_output_iterator() function can be called concurrently, but
101  * each individual output iterator is not concurrent.
102  * After close() is called, the sarray becomes a read-only array, and is
103  * equivalent to having called array.open_for_read(...)
104  *
105  * To read from the sarray, \ref get_reader() is used.
106  * \code
107  * auto reader = array.get_reader();
108  * \endcode
109  * Each reader provides read access to the SArray. Multiple readers can be
110  * obtained, as each has its own distinct file handles which are closed as
111  * the reader goes out of scope.
112  * See the documentation for \ref sarray_reader for details.
113  *
114  * The sarray<flexible_type> has additional capabilities.
115  * - It has the functions set_type() and get_type() to set the run-time type
116  * of the stored values.
117  * - Writes to the sarray<flexible_type> type check against the type setted.
118  * The writes \b must match either the type set by set_type() or be
119  * UNDEFINED.
120  *
121  * \note The only guaranteed concurrent safe function is get_output_iterator.
122  * All other mutating functions are not guaranteed to be safe.
123  */
124 template <typename T>
125 class sarray : public swriter_base<swriter_impl::output_iterator<T> > {
126  public:
127  /// The reader type
129 
130  /// The iterator type which \ref get_output_iterator returns
132 
133  /// The type contained in the sarray
134  typedef T value_type;
135 
136  /**
137  * default constructor; does nothing; use \ref open_for_read or
138  * \ref open_for_write after construction to read/create an sarray.
139  */
140  sarray() = default;
141 
142  /// Move constructor
143  sarray(sarray&& other):sarray() {
144  (*this) = std::move(other);
145  }
146 
147  /// Copy constructor
148  sarray(const sarray& other) {
149  (*this) = other;
150  }
151 
152  /// Assignment operator
153  sarray& operator=(const sarray& other) {
154  if (other.inited && other.writing) {
155  throw("Cannot copy an array which is writing");
156  }
157  if (writing) {
158  throw("Cannot copy over an array which is writing");
159  }
160  index_info = other.index_info;
161  index_file = other.index_file;
162  files_managed = other.files_managed;
163  inited = other.inited;
164  writing = other.writing;
165  return (*this);
166  }
167 
168 
169  /** Move assignment.
170  * Moves other into this. Other will be cleared as if it is a newly
171  * constructed sarray object.
172  */
173  sarray& operator=(sarray&& other) {
174  index_info = std::move(other.index_info);
175  index_file = std::move(other.index_file);
176  writer = std::move(other.writer);
177  files_managed = std::move(other.files_managed);
178  inited = other.inited;
179  writing = other.writing;
180 
181  other.index_info = index_file_information();
182  other.index_file = "";
183  other.writer = NULL;
184  other.files_managed.clear();
185  other.inited = false;
186  other.writing = false;
187  return *this;
188  }
189 
190  ~sarray() {
191  // the RAII deleter in files_managed will take care of the deletion
192  }
193 
194  /**
195  * Attempts to construct an sarray which reads from an sfrom the given file index file.
196  * If the index cannot be opened, an exception is thrown.
197  */
198  explicit sarray(std::string sidx_or_directory) {
199  open_for_read(sidx_or_directory);
200  }
201 
202  /**
203  * Create an sarray of given value and size.
204  */
205  sarray(const flexible_type& value, size_t size,
206  size_t num_segments = SFRAME_DEFAULT_NUM_SEGMENTS,
208  if (type == flex_type_enum::UNDEFINED) {
209  type = value.get_type();
210  }
211  ASSERT_GT(num_segments, (size_t)0);
212  open_for_write(num_segments);
213  set_type(type);
214  size_t size_per_segment = size / num_segments;
215  parallel_for(0, num_segments, [&](size_t i) {
216  auto out = get_output_iterator(i);
217  size_t begin = i * size_per_segment;
218  size_t end = (i + 1) * size_per_segment;
219  if (i == num_segments -1) end = size;
220  for (size_t iter = begin; iter < end; ++iter) {
221  *out = value;
222  ++out;
223  }
224  });
225  close();
226  }
227 
228  /**
229  * Initializes the SArray with an index info.
230  * If the SArray is already inited, this will throw an exception
231  */
233  ASSERT_MSG(!inited, "Attempting to init an SArray "
234  "which has already been inited");
235  index_info = info;
236  keep_array_file_ref();
237 
238  inited = true;
239  writing = false;
240  if (index_info.version == 0) {
241  logprogress_stream << "Version 0 file format has been deprecated. "
242  << "Operations may not work as expected, or will be slow."
243  << "Please re-save the SFrame/SArray to update it to "
244  << "the latest version which has substantial "
245  << "performance optimizations\n";
246  }
247  }
248 
249  /**
250  * Initializes the SArray with an index file.
251  * If the SArray is already inited, this will throw an exception
252  */
253  void open_for_read(std::string sidx_file) {
254  ASSERT_MSG(!inited, "Attempting to init an SArray "
255  "which has already been inited");
256  index_file = sidx_file;
257 
258  index_info = read_index_file(index_file);
259  keep_array_file_ref();
260 
261  inited = true;
262  writing = false;
263  if (index_info.version == 0) {
264  logprogress_stream << "Version 0 file format has been deprecated. "
265  << "Operations may not work as expected, or will be slow."
266  << "Please re-save the SFrame/SArray to update it to "
267  << "the latest version which has substantial "
268  << "performance optimizations\n";
269 
270  }
271  }
272 
273 
274  /**
275  * Opens the Array for writing with an arbitrary temporary file.
276  * The array must not already been inited.
277  *
278  * \param num_segments The number of segments in the array
279  */
280  void open_for_write(size_t num_segments = SFRAME_DEFAULT_NUM_SEGMENTS,
281  bool disable_padding = false) {
282  ASSERT_MSG(!inited, "Attempting to init an SArray "
283  "which has already been inited");
284  std::string sidx_file = fileio::fixed_size_cache_manager::get_instance().get_temp_cache_id(".sidx");
285  index_file = sidx_file;
287  ASSERT_TRUE(writer != NULL);
288  if (writer) writer->open(sidx_file, num_segments, 1);
289  if (disable_padding) writer->set_options("disable_padding", 1);
290  inited = true;
291  writing = true;
292  index_info = writer->get_index_info().columns[0];
293  }
294 
295 
296  /**
297  * Opens the Array for writing with a location on disk.
298  * The array must not already been inited.
299  *
300  * \param sidx_file If not specified, an argitrary temporary
301  * file will be created. Otherwise, all frame
302  * files will be written to the same location
303  * as the frame_sidx_file. Must end in
304  * ".sidx"
305  * \param num_segments The number of segments in the array
306  */
307  void open_for_write(std::string sidx_file,
308  size_t num_segments = SFRAME_DEFAULT_NUM_SEGMENTS) {
309  ASSERT_MSG(!inited, "Attempting to init an SArray "
310  "which has already been inited");
311  index_file = sidx_file;
313  // writer->open will test if the sidx file ends with .sidx
314  if (writer) writer->open(sidx_file, num_segments, 1);
315  inited = true;
316  writing = true;
317  index_info = writer->get_index_info().columns[0];
318  }
319 
320  /**
321  * Returns true if the Array is opened for reading.
322  * i.e. get_reader() will succeed
323  */
324  bool is_opened_for_read() const {
325  return (inited && !writing);
326  }
327 
328 
329  /**
330  * Returns true if the Array is opened for writing.
331  * i.e. get_output_iterator() will succeed
332  */
333  bool is_opened_for_write() const {
334  return (inited && writing);
335  }
336 
337  /**
338  * Return the location of the index file of the sarray
339  */
340  std::string get_index_file() const {
341  ASSERT_TRUE(inited);
342  return index_file;
343  }
344 
345  /**
346  * Return the underlying writer of the sarray
347  */
349  ASSERT_MSG(inited, "Invalid SArray");
350  ASSERT_MSG(writing, "SArray not opened for writing");
351  ASSERT_NE(writer, NULL);
352  return writer;
353  }
354 
355  /**
356  * Reads the value of a key associated with the sarray.
357  * Returns true on success, false on failure.
358  */
359  bool get_metadata(std::string key, std::string &val) const {
360  bool ret;
361  std::tie(ret, val) = get_metadata(key);
362  return ret;
363  }
364 
365  /**
366  * Reads the value of a key associated with the sarray.
367  * Returns a pair of (true, value) on success, and (false, empty_string)
368  * on failure.
369  */
370  std::pair<bool, std::string> get_metadata(std::string key) const {
371  ASSERT_MSG(inited, "Invalid SArray");
372  if (index_info.metadata.count(key)) {
373  return std::pair<bool, std::string>(true, index_info.metadata.at(key));
374  } else {
375  return std::pair<bool, std::string>(false, "");
376  }
377  }
378 
379  /**
380  * Returns the number of elements in the SArray
381  */
382  size_t size() const {
383  if(!inited)
384  return 0;
385  size_t ret = 0;
386  for (size_t i = 0;i < index_info.segment_sizes.size(); ++i) {
387  ret += index_info.segment_sizes[i];
388  }
389  return ret;
390  }
391 
392  /**
393  * Gets an sarray reader object using the segmentation produced by the
394  * actual file segments on disk.
395  */
396  std::unique_ptr<reader_type> get_reader() const {
397  ASSERT_MSG(inited, "Invalid SArray");
398  ASSERT_MSG(!writing, "Cannot open an SArraying which is still writing.");
399  std::unique_ptr<reader_type> sarray_reader(new reader_type());
400  sarray_reader->init(*this);
401  return sarray_reader;
402  }
403 
404  /**
405  * Gets an sarray reader object with num_segments number of logical segments.
406  */
407  std::unique_ptr<reader_type> get_reader(size_t num_segments) const {
408  ASSERT_MSG(inited, "Invalid SArray");
409  ASSERT_MSG(!writing, "Cannot open an SArray which is still writing.");
410  std::unique_ptr<reader_type> sarray_reader(new reader_type());
411  sarray_reader->init(*this, num_segments);
412  return sarray_reader;
413  }
414 
415  /**
416  * Gets an sarray reader object with a custom segment layout. segment_lengths
417  * must sum up to the same length as the original array.
418  */
419  std::unique_ptr<reader_type> get_reader(const std::vector<size_t>& segment_lengths) const {
420  ASSERT_MSG(inited, "Invalid SArray");
421  ASSERT_MSG(!writing, "Cannot open an SArray which is still writing.");
422  std::unique_ptr<reader_type> sarray_reader(new reader_type());
423  sarray_reader->init(*this, segment_lengths);
424  return sarray_reader;
425  }
426 
427 
428  /**
429  * Return the number of segments in the array.
430  */
431  size_t num_segments() const {
432  ASSERT_MSG(inited, "Invalid SArray");
433  return index_info.nsegments;
434  }
435 
436 
437  /**
438  * Return the length of segment i in the array.
439  */
440  size_t segment_length(size_t i) const {
441  ASSERT_MSG(inited, "Invalid SArray");
442  return index_info.segment_sizes[i];
443  }
444 
445  /**
446  * Returns all the index information of the array.
447  */
449  return index_info;
450  }
451 
452  /**
453  * Appends another SArray of the same type with the current SArray,
454  * returning a new sarray.
455  * without destroying the other array. Both SArrays can be empty, but
456  * cannot be opened for writing.
457  */
458  sarray append(const sarray& other) const {
459  // both cannot be writing
460  ASSERT_EQ(writing, false);
461  ASSERT_EQ(other.writing, false);
462  // if one is inited, return the other
463  if (!other.inited) return *this;
464  if (!inited) return other;
465 
466  // cannot combine across format version
467  ASSERT_EQ(index_info.version, other.index_info.version);
468  ASSERT_EQ(index_info.block_size, other.index_info.block_size);
469 
470  sarray ret;
471  ret.inited = true;
472  ret.index_info = index_info;
473  ret.files_managed = files_managed;
474 
475  ret.index_info.nsegments += other.index_info.nsegments;
476  std::copy(other.index_info.segment_sizes.begin(), other.index_info.segment_sizes.end(),
477  std::inserter(ret.index_info.segment_sizes, ret.index_info.segment_sizes.end()));
478  std::copy(other.index_info.segment_files.begin(), other.index_info.segment_files.end(),
479  std::inserter(ret.index_info.segment_files, ret.index_info.segment_files.end()));
480  std::copy(other.files_managed.begin(), other.files_managed.end(),
481  std::inserter(ret.files_managed, ret.files_managed.end()));
482  ret.try_compact();
483  return ret;
484  }
485  /**
486  * Return a new sarray that contains a copy of the data in the current array.
487  */
488  std::shared_ptr<sarray> clone(size_t nsegments = 0) const {
489  if (nsegments == 0) {
490  nsegments = num_segments();
491  }
492  auto ret = std::make_shared<sarray>();
493  ret->open_for_write(nsegments);
494  ret->set_type(get_type());
495  auto reader = get_reader(nsegments);
496  parallel_for(0, nsegments, [&](size_t segment_id) {
497  auto iter = reader->begin(segment_id);
498  auto end = reader->end(segment_id);
499  auto out = ret->get_output_iterator(segment_id);
500  while (iter != end) {
501  *out = *iter;
502  ++out;
503  ++iter;
504  }
505  });
506  ret->close();
507  return ret;
508  }
509 
510  /**
511  * Sarray serializer. iarc must be associated with a directory.
512  * Saves into a prefix inside the directory.
513  */
514  void save(oarchive& oarc) const {
515  std::string prefix = oarc.get_prefix();
516  save(prefix + ".sidx");
517  }
518 
519  /**
520  * SArray deserializer. iarc must be associated with a directory.
521  * Loads from the next prefix inside the directory.
522  */
523  void load(iarchive& iarc) {
524  std::string prefix = iarc.get_prefix();
525  open_for_read(prefix + ".sidx");
526  }
527 
528  /**
529  * Attempts to compact if the number of segments in the SArray
530  * exceeds SFRAME_COMPACTION_THRESHOLD.
531  */
532  void try_compact() {
533  if (SFRAME_COMPACTION_THRESHOLD > 0 &&
534  index_info.segment_files.size() > SFRAME_COMPACTION_THRESHOLD) {
536  }
537  }
538 
539 /**************************************************************************/
540 /* */
541 /* Writing Functions */
542 /* */
543 /**************************************************************************/
544 // These functions are only valid when the array is opened for writing
545 
546  /**
547  * Sets the number of segments in the output.
548  * Array must be first opened for writing.
549  * If any writes has occured prior to this, those writes will be lost.
550  * Returns true on sucess, false on failure.
551  */
552  bool set_num_segments(size_t numseg) {
553  ASSERT_MSG(inited, "Invalid SArray");
554  ASSERT_MSG(writing, "SArray not opened for writing");
555  if (numseg == 0) return false;
556  if (numseg != writer->num_segments()) {
557  delete writer;
559  writer->open(index_file, numseg, 1);
560  // restore metadata
561  writer->get_index_info().columns[0].metadata = index_info.metadata;
562  index_info = writer->get_index_info().columns[0];
563  return true;
564  }
565  return false;
566  }
567 
568  /**
569  * Return an output iterator which can be used to write data to the segment.
570  * Array must be first opened for writing.
571  * The iterator (\ref iterator) is of the output iterator type and
572  * has value_type T.
573  *
574  * The iterator is invalid once the segment is closed (See \ref close).
575  * Accessing the iterator after the writer is destroyed is undefined
576  * behavior.
577  *
578  * \code
579  * // example to write a little array to segment 1
580  * // say sw is of type sarray<int>
581  * auto iter = sw.get_output_iterator(1);
582  * std::vector<int> vals{1,2,3}
583  * auto(int i, vals) {
584  * *iter = i;
585  * ++iter;
586  * }
587  * \endcode
588  *
589  * Will throw an exception if the array is invalid (there is an error
590  * opening/writing files) Also segmentid must be a valid segment ID. Will
591  * throw an exception otherwise.
592  *
593  * When T is a flexible_type, the output iterator performs type checking.
594  */
595  iterator get_output_iterator(size_t segmentid);
596 
597  /**
598  * Closes the array.
599  * Array must be first opened for writing.
600  * close() also implicitly closes all segments.
601  * After the writer is closed, no segments can be written.
602  * Only once the array is closed, the SArray becomes readable with the
603  * get_reader() function.
604  */
605  void close() {
606  writer->close();
607  writer->write_index_file();
608  index_info = writer->get_index_info().columns[0];
609  delete writer;
610  writing = false;
611 
612  keep_array_file_ref();
613  }
614 
615  /**
616  * Adds meta data to the array.
617  * Array must be first opened for writing.
618  */
619  bool set_metadata(std::string key, std::string val) {
620  ASSERT_MSG(inited, "Invalid SArray");
621  ASSERT_MSG(writing, "SArray not opened for writing");
622  ASSERT_NE(writer, NULL);
623  writer->get_index_info().columns[0].metadata[key] = val;
624  index_info = writer->get_index_info().columns[0];
625  return true;
626  }
627 
628 /**************************************************************************/
629 /* */
630 /* SArray<flexible_type> specific functions */
631 /* */
632 /**************************************************************************/
633 
634  /**
635  * Returns the type of the SArray (as set by
636  * \ref swriter<flexible_type>::set_type). If the type of the SArray was
637  * not set, this returns \ref flex_type_enum::UNDEFINED, in which case
638  * each row can be of arbitrary type.
639  *
640  * This function should only be used for sarray<flexible_type> and
641  * will fail fatally otherwise.
642  */
644  if (!std::is_same<T, flexible_type>::value) {
645  ASSERT_MSG(false,
646  "Use of get_type() in SArray which "
647  "does not contain flexible_types");
648  }
649  ASSERT_MSG(inited, "Invalid SArray");
650  if (!index_info.metadata.count("__type__")) {
652  }
653  return flex_type_enum(std::stoi(index_info.metadata.at("__type__")));
654  }
655 
656 
657  /**
658  * Sets the internal type of the flexible_type when written.
659  * All writes will cast to this type.
660  *
661  * This function should only be used for sarray<flexible_type> and
662  * will fail fatally otherwise.
663  */
665  if (!std::is_same<T, flexible_type>::value) {
666  ASSERT_MSG(false,
667  "Use of set_type() in SArray which "
668  "does not contain flexible_types");
669  }
670  ASSERT_MSG(inited, "Invalid SArray");
671  ASSERT_MSG(writing, "SArray not opened for writing");
672  set_metadata("__type__",
673  std::to_string(static_cast<int>(type)));
674  }
675 
676  /**
677  * Set the writer index_info for a given segment.
678  * This function can be called, when the actual segment writing is done by other
679  * logics.
680  */
681  void set_segment(size_t segmentid, const std::string& segment_file, size_t segment_size) {
682  ASSERT_MSG(inited, "Invalid SArray");
683  ASSERT_MSG(writing, "SArray not opened for writing");
684  auto& index_info = writer->get_index_info().columns[0];
685  index_info.segment_files[segmentid] = segment_file;
686  index_info.segment_sizes[segmentid] = segment_size;
687  }
688 
689  /**
690  * Saves a copy of the current sarray into a different location.
691  * Does not modify the current sarray.
692  */
693  void save(std::string index_file) const {
694  ASSERT_TRUE(inited);
695  ASSERT_FALSE(writing);
696  std::string expected_ext(".sidx");
697  if (!boost::algorithm::ends_with(index_file, expected_ext)) {
698  log_and_throw("Index file must end with " + expected_ext);
699  }
700  sarray_save_blockwise(*this, index_file);
701  }
702 
703  void delete_files_on_destruction() {
704  for(auto &file : files_managed) {
705  logstream(LOG_INFO) << "Will delete data file: " << file->m_file << std::endl;
706  file->delete_on_destruction();
707  }
708  }
709 
710  private:
711  // put all the files into the files_managed to manage their lifespan.
712  // For non temp file, we also use a global handle pool to manage the lifespan
713  // so that in normal case they are not going to be removed when the file is
714  // in use
715  void keep_array_file_ref() {
716  std::vector<std::string> managed_files;
717  for (const auto& file: index_info.segment_files) {
718  managed_files.push_back(parse_v2_segment_filename(file).first);
719  }
720  if (!index_info.index_file.empty()) {
721  managed_files.push_back(parse_v2_segment_filename(index_info.index_file).first);
722  }
723  if (!index_file.empty()) {
724  managed_files.push_back(parse_v2_segment_filename(index_file).first);
725  }
726  for(auto& file : managed_files) {
727  std::shared_ptr<fileio::file_ownership_handle> file_handle;
729 
730  files_managed.push_back(file_handle);
731  }
732  }
733 
734  // read when the sarray is opened for reading
735  // synchronized against the writer class when used for writing
736  index_file_information index_info;
737  std::string index_file;
738  sarray_group_format_writer<T>* writer = NULL;
739  mutex lock;
740  // this flag only matters on construction
741  // it tells me whether I should insert the created segment files into the
742  // "files_managed" RAII structure
743  bool inited = false;
744  bool writing = false;
745  std::vector<std::shared_ptr<fileio::file_ownership_handle>> files_managed;
746 
747  friend class sarray_reader<T>;
748 };
749 
750 /// \}
751 //
752 /*
753  * When T is a regular type, the output iterator just writes directly
754  */
755 template <typename T>
756 typename sarray<T>::iterator inline sarray<T>::get_output_iterator(size_t segmentid) {
757  ASSERT_MSG(inited, "Invalid SArray");
758  ASSERT_MSG(writing, "SArray not opened for writing");
759  ASSERT_NE(writer, NULL);
760  ASSERT_LT(segmentid, num_segments());
761  // return an output iterator which when written to,
762  // will write to the segment
763  return iterator(
764  [=](const T& val)->void {
765  writer->write_segment(0, segmentid, val);
766  },
767  [=](T&& val)->void {
768  writer->write_segment(0, segmentid, std::forward<T>(val));
769  },
770  [=](const sframe_rows&)->void {
771  ASSERT_TRUE(false);
772  } );
773 }
774 
775 
776 /*
777  * When T is a flexible type, the output iterator performs type checking
778  */
779 template <>
782  ASSERT_NE(writer, NULL);
783  ASSERT_LT(segmentid, num_segments());
784  flex_type_enum stored_type = get_type();
785  // return an output iterator which when written to,
786  // will write to the segment
787  return iterator(
788  [=](const value_type& val)->void {
789  if (val.get_type() == stored_type ||
790  val.get_type() == flex_type_enum::UNDEFINED ||
791  stored_type == flex_type_enum::UNDEFINED) {
792  writer->write_segment(0, segmentid, val);
793  } else if (flex_type_is_convertible(val.get_type(), stored_type)) {
794  flexible_type res(stored_type);
795  res.soft_assign(val);
796  writer->write_segment(0, segmentid, res);
797  } else {
798  std::string message = "Cannot convert " + std::string(val) + " to " + flex_type_enum_to_name(stored_type);
799  logstream(LOG_ERROR) << message << std::endl;
800  throw(bad_cast(message));
801  }
802  },
803  [=](value_type&& val)->void {
804  if (val.get_type() == stored_type ||
805  val.get_type() == flex_type_enum::UNDEFINED ||
806  stored_type == flex_type_enum::UNDEFINED) {
807  writer->write_segment(0, segmentid, std::forward<flexible_type>(val));
808  } else if (flex_type_is_convertible(val.get_type(), stored_type)) {
809  flexible_type res(stored_type);
810  res.soft_assign(val);
811  writer->write_segment(0, segmentid, std::move(res));
812  } else {
813  std::string message = "Cannot convert " + std::string(val) + " to " + flex_type_enum_to_name(stored_type);
814  logstream(LOG_ERROR) << message << std::endl;
815  throw(bad_cast(message));
816  }
817  },
818  [=](const sframe_rows& sfr)->void {
819  ASSERT_TRUE(sfr.num_columns() == 1);
820  writer->write_segment(segmentid, sfr.type_check({stored_type}));
821  });
822 }
823 
824 
825 } // namespace turi
826 
827 #include <core/storage/sframe_data/sarray_reader.hpp>
828 #include <core/storage/sframe_data/sframe_compact_impl.hpp>
829 
830 ////////////////////////////////////////////////////////////////////////////////
831 // Implement serialization for
832 // std::shared_ptr<sarray<flexible_type> >
833 
835  if(m == nullptr) {
836  arc << false;
837  } else {
838  arc << true;
839  arc << (*m);
840  }
841 } END_OUT_OF_PLACE_SAVE()
842 
844  bool is_not_nullptr;
845  arc >> is_not_nullptr;
846  if(is_not_nullptr) {
848  arc >> (*m);
849  } else {
850  m = std::shared_ptr<turi::sarray<turi::flexible_type> >(nullptr);
851  }
852 } END_OUT_OF_PLACE_LOAD()
853 
854 
855 #endif
#define ASSERT_FALSE(cond)
Definition: assertions.hpp:310
#define logstream(lvl)
Definition: logger.hpp:276
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
Definition: iarchive.hpp:314
sarray(const flexible_type &value, size_t size, size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS, flex_type_enum type=flex_type_enum::UNDEFINED)
Definition: sarray.hpp:205
std::unique_ptr< reader_type > get_reader(size_t num_segments) const
Definition: sarray.hpp:407
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::shared_ptr< sarray > clone(size_t nsegments=0) const
Definition: sarray.hpp:488
size_t size() const
Definition: sarray.hpp:382
std::unique_ptr< reader_type > get_reader(const std::vector< size_t > &segment_lengths) const
Definition: sarray.hpp:419
T value_type
The type contained in the sarray.
Definition: sarray.hpp:134
void open(std::string index_file, size_t segments_to_create, size_t columns_to_create)
bool get_metadata(std::string key, std::string &val) const
Definition: sarray.hpp:359
void set_type(flex_type_enum type)
Definition: sarray.hpp:664
#define LOG_INFO
Definition: logger.hpp:101
void set_segment(size_t segmentid, const std::string &segment_file, size_t segment_size)
Definition: sarray.hpp:681
flexible_type & soft_assign(const flexible_type &other)
bool is_opened_for_write() const
Definition: sarray.hpp:333
sarray(std::string sidx_or_directory)
Definition: sarray.hpp:198
sarray & operator=(sarray &&other)
Definition: sarray.hpp:173
sarray append(const sarray &other) const
Definition: sarray.hpp:458
void save(oarchive &oarc) const
Definition: sarray.hpp:514
std::string get_index_file() const
Definition: sarray.hpp:340
size_t num_segments() const
Definition: sarray.hpp:431
std::pair< std::string, size_t > parse_v2_segment_filename(std::string fname)
std::pair< bool, std::string > get_metadata(std::string key) const
Definition: sarray.hpp:370
const char * flex_type_enum_to_name(flex_type_enum en)
const index_file_information get_index_info() const
Definition: sarray.hpp:448
size_t SFRAME_DEFAULT_NUM_SEGMENTS
sarray_reader< T > reader_type
The reader type.
Definition: sarray.hpp:128
#define logprogress_stream
Definition: logger.hpp:325
static file_handle_pool & get_instance()
void open_for_read(std::string sidx_file)
Definition: sarray.hpp:253
bool is_opened_for_read() const
Definition: sarray.hpp:324
void copy(const std::string src, const std::string dest)
#define LOG_ERROR
Definition: logger.hpp:97
void open_for_read(index_file_information info)
Definition: sarray.hpp:232
bool flex_type_is_convertible(flex_type_enum from, flex_type_enum to)
flex_type_enum get_type() const
bool set_num_segments(size_t numseg)
Definition: sarray.hpp:552
sarray & operator=(const sarray &other)
Assignment operator.
Definition: sarray.hpp:153
swriter_impl::output_iterator< T > iterator
The iterator type which get_output_iterator returns.
Definition: sarray.hpp:131
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
void open_for_write(size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS, bool disable_padding=false)
Definition: sarray.hpp:280
sarray(const sarray &other)
Copy constructor.
Definition: sarray.hpp:148
void load(iarchive &iarc)
Definition: sarray.hpp:523
bool set_metadata(std::string key, std::string val)
Definition: sarray.hpp:619
size_t segment_length(size_t i) const
Definition: sarray.hpp:440
std::shared_ptr< file_ownership_handle > register_file(const std::string &file_name)
void close()
Definition: sarray.hpp:605
index_file_information read_index_file(std::string index_file)
void save(std::string index_file) const
Definition: sarray.hpp:693
sarray_group_format_writer< T > * get_writer()
Definition: sarray.hpp:348
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
iterator get_output_iterator(size_t segmentid)
Definition: sarray.hpp:756
void try_compact()
Definition: sarray.hpp:532
flex_type_enum get_type() const
Definition: sarray.hpp:643
void sarray_compact(sarray< T > &column, size_t segment_threshold)
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.
Definition: oarchive.hpp:346
std::unique_ptr< reader_type > get_reader() const
Definition: sarray.hpp:396
size_t SFRAME_COMPACTION_THRESHOLD
void open_for_write(std::string sidx_file, size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS)
Definition: sarray.hpp:307
sarray(sarray &&other)
Move constructor.
Definition: sarray.hpp:143