6 #ifndef TURI_SFRAME_QUERY_MANAGER_GENERALIZED_TRANSFORM_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_GENERALIZED_TRANSFORM_HPP 8 #include <core/data/flexible_type/flexible_type.hpp> 9 #include <core/random/random.hpp> 10 #include <core/parallel/pthread_tools.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/execution/query_context.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 16 namespace query_eval {
18 typedef std::function<void (
const sframe_rows::row&,
19 sframe_rows::row&)> generalized_transform_type;
34 DECL_CORO_STATE(execute);
36 planner_node_type type()
const {
return planner_node_type::GENERALIZED_TRANSFORM_NODE; }
38 static std::string name() {
return "generalized_transform"; }
49 inline operator_impl(
const generalized_transform_type& f,
50 const std::vector<flex_type_enum>& output_types,
52 : m_transform_fn(f), m_output_types(output_types), m_random_seed(random_seed)
55 inline std::shared_ptr<query_operator>
clone()
const {
56 return std::make_shared<operator_impl>(*this);
59 inline bool coro_running()
const {
60 return CORO_RUNNING(execute);
64 if (m_random_seed != -1){
73 output->resize(m_output_types.size(), rows->num_rows());
75 auto iter = rows->cbegin();
76 auto output_iter = output->begin();
77 while(iter != rows->cend()) {
78 m_transform_fn((*iter), (*output_iter));
82 output->type_check_inplace(m_output_types);
90 static std::shared_ptr<planner_node> make_planner_node(
91 std::shared_ptr<planner_node> source,
92 generalized_transform_type fn,
93 const std::vector<flex_type_enum>& output_types,
97 for (
size_t i = 0; i < output_types.size(); ++i) {
98 type_list[i] =
flex_int(output_types[i]);
102 {{
"output_types", type_list},
103 {
"random_seed", random_seed}},
104 {{
"function",
any(fn)}},
108 static std::shared_ptr<query_operator> from_planner_node(
109 std::shared_ptr<planner_node> pnode) {
110 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_TRANSFORM_NODE);
111 ASSERT_EQ(pnode->inputs.size(), 1);
112 ASSERT_TRUE(pnode->operator_parameters.count(
"output_types"));
113 ASSERT_TRUE(pnode->any_operator_parameters.count(
"function"));
114 generalized_transform_type fn;
116 flex_list list_output_types = pnode->operator_parameters[
"output_types"];
117 std::vector<flex_type_enum> output_types;
120 fn = pnode->any_operator_parameters[
"function"].as<generalized_transform_type>();
122 int random_seed = (int)(
flex_int)(pnode->operator_parameters[
"random_seed"]);
123 return std::make_shared<operator_impl>(fn, output_types, random_seed);
126 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
127 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_TRANSFORM_NODE);
128 ASSERT_TRUE(pnode->operator_parameters.count(
"output_types"));
130 flex_list outtypes = pnode->operator_parameters[
"output_types"];
131 std::vector<flex_type_enum> ret;
136 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
137 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_TRANSFORM_NODE);
141 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
142 size_t n_outs = infer_length(pnode);
145 case 1:
return "Tr->[C0]";
146 case 2:
return "Tr->[C0,C1]";
147 case 3:
return "Tr->[C0,C1,C2]";
149 std::ostringstream out;
150 out <<
"Tr->[C0,...,C" << (n_outs - 1) <<
"]";
157 generalized_transform_type m_transform_fn;
158 std::vector<flex_type_enum> m_output_types;
168 #endif // TURI_SFRAME_QUERY_MANAGER_TRANSFORM_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
void seed()
Seed the generator using the default seed.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
static size_t thread_id()
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flexible_type > flex_list