6 #ifndef TURI_UNITY_SFRAME_SARRAY_HPP 7 #define TURI_UNITY_SFRAME_SARRAY_HPP 10 #include <type_traits> 12 #include <core/logging/assertions.hpp> 13 #include <boost/algorithm/string/predicate.hpp> 14 #include <core/storage/sframe_data/output_iterator.hpp> 15 #include <core/storage/serialization/iarchive.hpp> 16 #include <core/storage/serialization/oarchive.hpp> 17 #include <core/parallel/pthread_tools.hpp> 18 #include <core/parallel/lambda_omp.hpp> 19 #include <core/storage/sframe_data/swriter_base.hpp> 20 #include <core/storage/sframe_data/sarray_file_format_v2.hpp> 21 #include <core/storage/sframe_data/sarray_index_file.hpp> 22 #include <core/storage/sframe_data/algorithm.hpp> 23 #include <core/storage/sframe_data/sframe_config.hpp> 24 #include <core/data/flexible_type/flexible_type.hpp> 25 #include <core/storage/fileio/fixed_size_cache_manager.hpp> 26 #include <core/storage/fileio/file_ownership_handle.hpp> 27 #include <core/storage/fileio/file_handle_pool.hpp> 28 #include <core/system/exceptions/error_types.hpp> 29 #include <core/storage/sframe_data/sframe_constants.hpp> 30 #include <core/storage/sframe_data/sarray_saving.hpp> 31 #include <core/storage/sframe_data/sframe_compact.hpp> 39 namespace swriter_impl {
45 std::function<void(const T&)>,
46 std::function<void(T&&)>,
47 std::function<void(const sframe_rows&)> >;
124 template <
typename T>
125 class sarray :
public swriter_base<swriter_impl::output_iterator<T> > {
144 (*this) = std::move(other);
154 if (other.inited && other.writing) {
155 throw(
"Cannot copy an array which is writing");
158 throw(
"Cannot copy over an array which is writing");
160 index_info = other.index_info;
161 index_file = other.index_file;
162 files_managed = other.files_managed;
163 inited = other.inited;
164 writing = other.writing;
174 index_info = std::move(other.index_info);
175 index_file = std::move(other.index_file);
176 writer = std::move(other.writer);
177 files_managed = std::move(other.files_managed);
178 inited = other.inited;
179 writing = other.writing;
182 other.index_file =
"";
184 other.files_managed.clear();
185 other.inited =
false;
186 other.writing =
false;
198 explicit sarray(std::string sidx_or_directory) {
199 open_for_read(sidx_or_directory);
211 ASSERT_GT(num_segments, (
size_t)0);
212 open_for_write(num_segments);
214 size_t size_per_segment = size / num_segments;
216 auto out = get_output_iterator(i);
217 size_t begin = i * size_per_segment;
218 size_t end = (i + 1) * size_per_segment;
219 if (i == num_segments -1) end = size;
220 for (
size_t iter = begin; iter < end; ++iter) {
233 ASSERT_MSG(!inited,
"Attempting to init an SArray " 234 "which has already been inited");
236 keep_array_file_ref();
240 if (index_info.version == 0) {
242 <<
"Operations may not work as expected, or will be slow." 243 <<
"Please re-save the SFrame/SArray to update it to " 244 <<
"the latest version which has substantial " 245 <<
"performance optimizations\n";
254 ASSERT_MSG(!inited,
"Attempting to init an SArray " 255 "which has already been inited");
256 index_file = sidx_file;
259 keep_array_file_ref();
263 if (index_info.version == 0) {
265 <<
"Operations may not work as expected, or will be slow." 266 <<
"Please re-save the SFrame/SArray to update it to " 267 <<
"the latest version which has substantial " 268 <<
"performance optimizations\n";
281 bool disable_padding =
false) {
282 ASSERT_MSG(!inited,
"Attempting to init an SArray " 283 "which has already been inited");
284 std::string sidx_file = fileio::fixed_size_cache_manager::get_instance().get_temp_cache_id(
".sidx");
285 index_file = sidx_file;
288 if (writer) writer->open(sidx_file, num_segments, 1);
289 if (disable_padding) writer->set_options(
"disable_padding", 1);
292 index_info = writer->get_index_info().columns[0];
309 ASSERT_MSG(!inited,
"Attempting to init an SArray " 310 "which has already been inited");
311 index_file = sidx_file;
314 if (writer) writer->
open(sidx_file, num_segments, 1);
317 index_info = writer->get_index_info().columns[0];
325 return (inited && !writing);
334 return (inited && writing);
349 ASSERT_MSG(inited,
"Invalid SArray");
350 ASSERT_MSG(writing,
"SArray not opened for writing");
351 ASSERT_NE(writer, NULL);
361 std::tie(ret, val) = get_metadata(key);
371 ASSERT_MSG(inited,
"Invalid SArray");
372 if (index_info.metadata.count(key)) {
373 return std::pair<bool, std::string>(
true, index_info.metadata.at(key));
375 return std::pair<bool, std::string>(
false,
"");
386 for (
size_t i = 0;i < index_info.segment_sizes.size(); ++i) {
387 ret += index_info.segment_sizes[i];
397 ASSERT_MSG(inited,
"Invalid SArray");
398 ASSERT_MSG(!writing,
"Cannot open an SArraying which is still writing.");
399 std::unique_ptr<reader_type>
sarray_reader(
new reader_type());
400 sarray_reader->init(*
this);
401 return sarray_reader;
407 std::unique_ptr<reader_type>
get_reader(
size_t num_segments)
const {
408 ASSERT_MSG(inited,
"Invalid SArray");
409 ASSERT_MSG(!writing,
"Cannot open an SArray which is still writing.");
410 std::unique_ptr<reader_type>
sarray_reader(
new reader_type());
411 sarray_reader->init(*
this, num_segments);
412 return sarray_reader;
419 std::unique_ptr<reader_type>
get_reader(
const std::vector<size_t>& segment_lengths)
const {
420 ASSERT_MSG(inited,
"Invalid SArray");
421 ASSERT_MSG(!writing,
"Cannot open an SArray which is still writing.");
422 std::unique_ptr<reader_type>
sarray_reader(
new reader_type());
423 sarray_reader->init(*
this, segment_lengths);
424 return sarray_reader;
432 ASSERT_MSG(inited,
"Invalid SArray");
433 return index_info.nsegments;
441 ASSERT_MSG(inited,
"Invalid SArray");
442 return index_info.segment_sizes[i];
460 ASSERT_EQ(writing,
false);
461 ASSERT_EQ(other.writing,
false);
463 if (!other.inited)
return *
this;
464 if (!inited)
return other;
467 ASSERT_EQ(index_info.version, other.index_info.version);
468 ASSERT_EQ(index_info.block_size, other.index_info.block_size);
472 ret.index_info = index_info;
473 ret.files_managed = files_managed;
475 ret.index_info.nsegments += other.index_info.nsegments;
476 std::copy(other.index_info.segment_sizes.begin(), other.index_info.segment_sizes.end(),
477 std::inserter(ret.index_info.segment_sizes, ret.index_info.segment_sizes.end()));
478 std::copy(other.index_info.segment_files.begin(), other.index_info.segment_files.end(),
479 std::inserter(ret.index_info.segment_files, ret.index_info.segment_files.end()));
480 std::copy(other.files_managed.begin(), other.files_managed.end(),
481 std::inserter(ret.files_managed, ret.files_managed.end()));
488 std::shared_ptr<sarray>
clone(
size_t nsegments = 0)
const {
489 if (nsegments == 0) {
490 nsegments = num_segments();
492 auto ret = std::make_shared<sarray>();
493 ret->open_for_write(nsegments);
494 ret->set_type(get_type());
495 auto reader = get_reader(nsegments);
497 auto iter = reader->begin(segment_id);
498 auto end = reader->end(segment_id);
499 auto out = ret->get_output_iterator(segment_id);
500 while (iter != end) {
515 std::string prefix = oarc.get_prefix();
516 save(prefix +
".sidx");
524 std::string prefix = iarc.get_prefix();
525 open_for_read(prefix +
".sidx");
553 ASSERT_MSG(inited,
"Invalid SArray");
554 ASSERT_MSG(writing,
"SArray not opened for writing");
555 if (numseg == 0)
return false;
556 if (numseg != writer->num_segments()) {
559 writer->
open(index_file, numseg, 1);
561 writer->get_index_info().columns[0].metadata = index_info.metadata;
562 index_info = writer->get_index_info().columns[0];
595 iterator get_output_iterator(
size_t segmentid);
607 writer->write_index_file();
608 index_info = writer->get_index_info().columns[0];
612 keep_array_file_ref();
620 ASSERT_MSG(inited,
"Invalid SArray");
621 ASSERT_MSG(writing,
"SArray not opened for writing");
622 ASSERT_NE(writer, NULL);
623 writer->get_index_info().columns[0].metadata[key] = val;
624 index_info = writer->get_index_info().columns[0];
644 if (!std::is_same<T, flexible_type>::value) {
646 "Use of get_type() in SArray which " 647 "does not contain flexible_types");
649 ASSERT_MSG(inited,
"Invalid SArray");
650 if (!index_info.metadata.count(
"__type__")) {
653 return flex_type_enum(std::stoi(index_info.metadata.at(
"__type__")));
665 if (!std::is_same<T, flexible_type>::value) {
667 "Use of set_type() in SArray which " 668 "does not contain flexible_types");
670 ASSERT_MSG(inited,
"Invalid SArray");
671 ASSERT_MSG(writing,
"SArray not opened for writing");
672 set_metadata(
"__type__",
673 std::to_string(static_cast<int>(type)));
681 void set_segment(
size_t segmentid,
const std::string& segment_file,
size_t segment_size) {
682 ASSERT_MSG(inited,
"Invalid SArray");
683 ASSERT_MSG(writing,
"SArray not opened for writing");
684 auto& index_info = writer->get_index_info().columns[0];
685 index_info.segment_files[segmentid] = segment_file;
686 index_info.segment_sizes[segmentid] = segment_size;
693 void save(std::string index_file)
const {
696 std::string expected_ext(
".sidx");
697 if (!boost::algorithm::ends_with(index_file, expected_ext)) {
698 log_and_throw(
"Index file must end with " + expected_ext);
700 sarray_save_blockwise(*
this, index_file);
703 void delete_files_on_destruction() {
704 for(
auto &file : files_managed) {
706 file->delete_on_destruction();
715 void keep_array_file_ref() {
716 std::vector<std::string> managed_files;
717 for (
const auto& file: index_info.segment_files) {
720 if (!index_info.index_file.empty()) {
723 if (!index_file.empty()) {
726 for(
auto& file : managed_files) {
727 std::shared_ptr<fileio::file_ownership_handle> file_handle;
730 files_managed.push_back(file_handle);
737 std::string index_file;
744 bool writing =
false;
745 std::vector<std::shared_ptr<fileio::file_ownership_handle>> files_managed;
755 template <
typename T>
757 ASSERT_MSG(inited,
"Invalid SArray");
758 ASSERT_MSG(writing,
"SArray not opened for writing");
759 ASSERT_NE(writer, NULL);
760 ASSERT_LT(segmentid, num_segments());
764 [=](
const T& val)->
void {
765 writer->write_segment(0, segmentid, val);
768 writer->write_segment(0, segmentid, std::forward<T>(val));
782 ASSERT_NE(writer, NULL);
783 ASSERT_LT(segmentid, num_segments());
788 [=](
const value_type& val)->
void {
789 if (val.get_type() == stored_type ||
792 writer->write_segment(0, segmentid, val);
796 writer->write_segment(0, segmentid, res);
798 std::string message =
"Cannot convert " + std::string(val) +
" to " +
flex_type_enum_to_name(stored_type);
803 [=](value_type&& val)->
void {
804 if (val.get_type() == stored_type ||
807 writer->write_segment(0, segmentid, std::forward<flexible_type>(val));
811 writer->write_segment(0, segmentid, std::move(res));
813 std::string message =
"Cannot convert " + std::string(val) +
" to " +
flex_type_enum_to_name(stored_type);
820 writer->write_segment(segmentid, sfr.type_check({stored_type}));
827 #include <core/storage/sframe_data/sarray_reader.hpp> 828 #include <core/storage/sframe_data/sframe_compact_impl.hpp> 841 } END_OUT_OF_PLACE_SAVE()
845 arc >> is_not_nullptr;
850 m = std::shared_ptr<turi::sarray<turi::flexible_type> >(
nullptr);
852 } END_OUT_OF_PLACE_LOAD()
#define ASSERT_FALSE(cond)
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
sarray(const flexible_type &value, size_t size, size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS, flex_type_enum type=flex_type_enum::UNDEFINED)
std::unique_ptr< reader_type > get_reader(size_t num_segments) const
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
std::shared_ptr< sarray > clone(size_t nsegments=0) const
std::unique_ptr< reader_type > get_reader(const std::vector< size_t > &segment_lengths) const
T value_type
The type contained in the sarray.
bool get_metadata(std::string key, std::string &val) const
void set_type(flex_type_enum type)
void set_segment(size_t segmentid, const std::string &segment_file, size_t segment_size)
flexible_type & soft_assign(const flexible_type &other)
bool is_opened_for_write() const
sarray(std::string sidx_or_directory)
sarray & operator=(sarray &&other)
sarray append(const sarray &other) const
void save(oarchive &oarc) const
std::string get_index_file() const
size_t num_segments() const
std::pair< std::string, size_t > parse_v2_segment_filename(std::string fname)
std::pair< bool, std::string > get_metadata(std::string key) const
const char * flex_type_enum_to_name(flex_type_enum en)
const index_file_information get_index_info() const
size_t SFRAME_DEFAULT_NUM_SEGMENTS
sarray_reader< T > reader_type
The reader type.
#define logprogress_stream
static file_handle_pool & get_instance()
void open_for_read(std::string sidx_file)
bool is_opened_for_read() const
void copy(const std::string src, const std::string dest)
void open_for_read(index_file_information info)
bool flex_type_is_convertible(flex_type_enum from, flex_type_enum to)
flex_type_enum get_type() const
bool set_num_segments(size_t numseg)
sarray & operator=(const sarray &other)
Assignment operator.
swriter_impl::output_iterator< T > iterator
The iterator type which get_output_iterator returns.
#define ASSERT_TRUE(cond)
void open_for_write(size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS, bool disable_padding=false)
sarray(const sarray &other)
Copy constructor.
void load(iarchive &iarc)
bool set_metadata(std::string key, std::string val)
size_t segment_length(size_t i) const
std::shared_ptr< file_ownership_handle > register_file(const std::string &file_name)
index_file_information read_index_file(std::string index_file)
void save(std::string index_file) const
sarray_group_format_writer< T > * get_writer()
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
iterator get_output_iterator(size_t segmentid)
flex_type_enum get_type() const
void sarray_compact(sarray< T > &column, size_t segment_threshold)
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.
std::unique_ptr< reader_type > get_reader() const
size_t SFRAME_COMPACTION_THRESHOLD
void open_for_write(std::string sidx_file, size_t num_segments=SFRAME_DEFAULT_NUM_SEGMENTS)
sarray(sarray &&other)
Move constructor.