6 #ifndef TURI_UNITY_SITERABLE_ALGORITHMS_HPP 7 #define TURI_UNITY_SITERABLE_ALGORITHMS_HPP 11 #include <type_traits> 13 #include <core/parallel/lambda_omp.hpp> 14 #include <core/random/random.hpp> 15 #include <core/storage/sframe_data/siterable.hpp> 16 #include <core/storage/sframe_data/swriter_base.hpp> 17 #include <core/storage/sframe_data/sarray_reader.hpp> 18 #include <core/storage/sframe_data/is_sarray_like.hpp> 61 template <
typename S,
typename T,
typename TransformFn,
62 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type,
63 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
65 TransformFn transformfn,
66 std::set<size_t> constraint_segments = std::set<size_t>()) {
70 auto input_reader = input.get_reader(output.num_segments());
72 std::vector<size_t> segments;
74 if (constraint_segments.empty()) {
75 segments.resize(output.num_segments());
76 for (
size_t i = 0; i < segments.size(); ++i) {
81 constraint_segments.end(),
82 std::inserter(segments, segments.end()));
88 size_t segid = segments[idx];
89 if (segid >= input_reader->num_segments())
return;
90 auto input_begin = input_reader->begin(segid);
91 auto input_end = input_reader->end(segid);
92 auto output_iter = output.get_output_iterator(segid);
134 template <
typename S,
typename T,
typename FilterFn,
135 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type,
136 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
139 std::set<size_t> constraint_segments = std::set<size_t>(),
140 size_t random_seed=
size_t(-1)) {
144 auto input_reader = input.get_reader(output.num_segments());
146 std::vector<size_t> segments;
148 if (constraint_segments.empty()) {
149 segments.resize(input_reader->num_segments());
150 for (
size_t i = 0; i < segments.size(); ++i) {
155 constraint_segments.end(),
156 std::inserter(segments, segments.end()));
162 if (random_seed !=
size_t(-1)){
165 size_t segid = segments[idx];
166 if (segid >= input_reader->num_segments())
return;
167 auto input_begin = input_reader->begin(segid);
168 auto input_end = input_reader->end(segid);
169 auto output_iter = output.get_output_iterator(segid);
170 std::copy_if(input_begin, input_end, output_iter, filterfn);
208 template <
typename S,
typename T,
typename FilterFn,
typename TransformFn,
209 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type,
210 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
213 TransformFn transformfn,
214 std::set<size_t> constraint_segments = std::set<size_t>(),
215 size_t random_seed=
size_t(-1)) {
219 auto input_reader = input.get_reader(output.num_segments());
221 std::vector<size_t> segments;
223 if (constraint_segments.empty()) {
224 segments.resize(input_reader->num_segments());
225 for (
size_t i = 0; i < segments.size(); ++i) {
230 constraint_segments.end(),
231 std::inserter(segments, segments.end()));
237 if (random_seed !=
size_t(-1)){
240 size_t segid = segments[idx];
241 if (segid >= input_reader->num_segments())
return;
242 auto input_begin = input_reader->begin(segid);
243 auto input_end = input_reader->end(segid);
244 auto output_iter = output.get_output_iterator(segid);
246 while (input_begin != input_end) {
247 if (filterfn(*input_begin)) {
248 *output_iter = transformfn(*input_begin);
290 template <
typename S,
typename T,
typename FilterFn,
291 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type,
292 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
293 void split(S&& input, T&& output1, T&& output2,
295 size_t random_seed=std::time(NULL)) {
300 if (output1.set_num_segments(output2.num_segments()) ==
false) {
301 log_and_throw(
"Expects outputs to have the same number of segments");
304 auto input_reader = input.get_reader(output1.num_segments());
308 if (random_seed != -1) {
311 auto input_begin = input_reader->begin(idx);
312 auto input_end = input_reader->end(idx);
313 auto output_iter1 = output1.get_output_iterator(idx);
314 auto output_iter2 = output2.get_output_iterator(idx);
315 auto iter = input_begin;
316 while(iter != input_end) {
337 namespace sframe_impl {
339 template <
typename Iterator,
typename SWriter>
340 void do_copy(Iterator begin, Iterator end, SWriter&& writer,
341 std::input_iterator_tag) {
343 size_t length = distance(begin, end);
344 size_t items_written = 0;
346 for (
size_t i = 0; i < writer.num_segments(); ++i) {
347 size_t remaining_length = length - items_written;
350 size_t items_to_output = remaining_length / (writer.num_segments() - i);
351 auto outputiter = writer.get_output_iterator(i);
352 for (
size_t j = 0;j < items_to_output; ++j) {
353 *outputiter = *begin;
357 items_written += items_to_output;
363 template <
typename Iterator,
typename SWriter>
364 void do_copy(Iterator begin, Iterator end, SWriter&& writer,
365 std::random_access_iterator_tag tag) {
366 size_t num_segments = writer.num_segments();
368 size_t length = std::distance(begin, end);
370 double split_size = (double)length / num_segments;
373 [&](
size_t segment) {
375 Iterator segment_begin = begin + split_size * segment;
377 Iterator segment_end = begin + split_size * (segment + 1);
378 if (segment == num_segments - 1) segment_end = end;
379 auto outputiter = writer.get_output_iterator(segment);
380 while(segment_begin != segment_end) {
381 *outputiter = *segment_begin;
414 template <
typename Iterator,
typename SWriter,
415 typename =
typename std::enable_if<sframe_impl::is_sarray_like<SWriter>::value>::type>
416 void copy(Iterator begin, Iterator end, SWriter&& writer) {
418 sframe_impl::do_copy(begin, end, std::forward<SWriter>(writer),
419 typename std::iterator_traits<Iterator>::iterator_category());
439 template <
typename S,
typename Iterator,
440 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type>
441 void copy(S&& array, Iterator output,
size_t limit=(
size_t)(-1)) {
446 auto reader = array.get_reader();
448 for (
size_t i = 0;i < reader->num_segments(); ++i) {
449 auto begin = reader->begin(i);
450 auto end = reader->end(i);
451 while(ctr < limit && begin != end) {
452 (*output) = std::move(*begin);
457 if (ctr >= limit)
break;
489 template <
typename ResultType,
typename S,
typename FunctionType,
490 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type>
491 std::vector<ResultType>
reduce(S&& input, FunctionType f,
492 ResultType init = ResultType()) {
495 std::vector<ResultType> ret;
497 ret.resize(dop, init);
498 auto input_reader = input.get_reader(dop);
502 auto input_begin = input_reader->begin(idx);
503 auto input_end = input_reader->end(idx);
504 ResultType reduce_result = init;
505 while(input_begin != input_end) {
506 if (f(*input_begin, reduce_result) ==
false)
break;
509 ret[idx] = reduce_result;
539 template <
typename S1,
typename S2,
typename T,
typename TransformFn,
540 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S1>::value>::type,
541 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S2>::value>::type,
542 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
544 TransformFn transformfn) {
550 auto input1_reader = input1.get_reader(output.num_segments());
551 auto input2_reader = input2.get_reader(output.num_segments());
555 auto input1_begin = input1_reader->begin(idx);
556 auto input1_end = input1_reader->end(idx);
557 auto input2_begin = input2_reader->begin(idx);
558 auto input2_end = input2_reader->end(idx);
559 auto output_iter = output.get_output_iterator(idx);
560 while(input1_begin != input1_end) {
561 (*output_iter) = transformfn(*input1_begin, *input2_begin);
596 template <
typename S,
typename T,
597 typename =
typename std::enable_if<sframe_impl::is_sarray_like<S>::value>::type,
598 typename =
typename std::enable_if<sframe_impl::is_sarray_like<T>::value>::type>
607 auto reader = input.get_reader();
608 end = std::min(end, reader->size());
611 log_and_throw(
"End must be at least start");
615 size_t element_range = end - start;
617 size_t num_out_elems = 1 + (element_range - 1) / step;
621 auto writer = output.get_output_iterator(idx);
622 size_t start_idx = idx * num_out_elems / output.num_segments();
623 size_t end_idx = (idx + 1) * num_out_elems / output.num_segments();
625 std::vector<typename std::decay<S>::type::value_type> buffer;
629 for (
size_t i = start_idx;
632 size_t block_read_range_start = start + i;
633 size_t block_read_range_end =
635 block_read_range_end = std::min(block_read_range_end, start + end_idx);
636 reader->read_rows(block_read_range_start,
637 block_read_range_end,
639 for (
auto& row: buffer) {
646 for (
size_t i = start_idx; i < end_idx; ++i) {
647 reader->read_rows(start + i * step,
648 start + i * step + 1,
650 if (buffer.size() == 0)
break;
651 (*writer) = buffer[0];
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
void copy(S &&array, Iterator output, size_t limit=(size_t)(-1))
void seed()
Seed the generator using the default seed.
std::vector< ResultType > reduce(S &&input, FunctionType f, ResultType init=ResultType())
void copy_range(S &&input, T &&output, size_t start, size_t step, size_t end)
static size_t cpu_count()
const size_t DEFAULT_SARRAY_READER_BUFFER_SIZE
void copy_transform_if(S &&input, T &&output, FilterFn filterfn, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >(), size_t random_seed=size_t(-1))
#define ASSERT_TRUE(cond)
void copy(Iterator begin, Iterator end, SWriter &&writer)
void split(S &&input, T &&output1, T &&output2, FilterFn filterfn, size_t random_seed=std::time(NULL))
void binary_transform(S1 &&input1, S2 &&input2, T &&output, TransformFn transformfn)
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())
void copy_if(S &&input, T &&output, FilterFn filterfn, std::set< size_t > constraint_segments=std::set< size_t >(), size_t random_seed=size_t(-1))