Turi Create  4.0
sframe_utils.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_SFRAME_UTILS_H_
7 #define TURI_UNITY_SFRAME_UTILS_H_
8 
9 #include <functional>
10 #include <string>
11 #include <core/storage/sframe_data/sarray.hpp>
12 #include <core/data/flexible_type/flexible_type_base_types.hpp>
13 #include <Eigen/Core>
14 #include <core/storage/sframe_data/sframe.hpp>
15 #include <core/parallel/pthread_tools.hpp>
16 
17 namespace turi {
18 
19 class ml_data;
20 class unity_graph;
21 
22 /**
23  * \ingroup toolkit_util
24  * Filters the rows of an sframe into the first (false) or second
25  * (true) sframe by a switch function.
26  */
27  std::pair<sframe,sframe> split_sframe_on_index(const sframe& src, std::function<bool(size_t)> switch_function);
28 
29  /**
30  * \ingroup toolkit_util
31  * Create an SArray of vector type, where each element is a row of the
32  * provided matrix.
33  */
34  std::shared_ptr<sarray<flexible_type>> matrix_to_sarray(const Eigen::MatrixXd& m);
35 
36 /**
37  * \ingroup toolkit_util
38  * Generate an SFrame from a transform function that takes an index
39  * and fills a vector of flexible type. The signature of the
40  * transform function should be:
41  *
42  * gen_fill_func(size_t row_index, std::vector<flexible_type>& out_values);
43  *
44  * Access there is done in parallel.
45  *
46  s*/
47 template <typename GenFunction>
48 sframe sframe_from_ranged_generator(const std::vector<std::string>& column_names,
49  const std::vector<flex_type_enum>& column_types,
50  size_t num_rows,
51  GenFunction&& generator_function) {
52 
53  sframe out;
54 
55  out.open_for_write(column_names, column_types, "", thread::cpu_count());
56 
57  // Really inefficient due to the transpose.
58  in_parallel([&](size_t thread_index, size_t num_threads) {
59  std::vector<flexible_type> out_values(column_names.size());
60 
61  size_t start_idx = (num_rows * thread_index) / num_threads;
62  size_t end_idx = (num_rows * (thread_index + 1)) / num_threads;
63 
64  auto it_out = out.get_output_iterator(thread_index);
65 
66  for(size_t i = start_idx; i < end_idx; ++i, ++it_out) {
67  generator_function(i, out_values);
68  DASSERT_EQ(column_names.size(), out_values.size());
69  *it_out = out_values;
70  }
71  });
72 
73  out.close();
74 
75  return out;
76 }
77 
78 
79 
80 }
81 
82 #endif /* TURI_SFRAME_UTILS_H_ */
iterator get_output_iterator(size_t segmentid)
static size_t cpu_count()
std::shared_ptr< sarray< flexible_type > > matrix_to_sarray(const Eigen::MatrixXd &m)
std::pair< sframe, sframe > split_sframe_on_index(const sframe &src, std::function< bool(size_t)> switch_function)
void open_for_write(const std::vector< std::string > &column_names, const std::vector< flex_type_enum > &column_types, const std::string &frame_sidx_file="", size_t nsegments=SFRAME_DEFAULT_NUM_SEGMENTS, bool fail_on_column_names=true)
Definition: sframe.hpp:265
sframe sframe_from_ranged_generator(const std::vector< std::string > &column_names, const std::vector< flex_type_enum > &column_types, size_t num_rows, GenFunction &&generator_function)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
Definition: lambda_omp.hpp:35