6 #ifndef TURI_SFRAME_QUERY_MANAGER_TRANSFORM_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_TRANSFORM_HPP 8 #include <core/data/flexible_type/flexible_type.hpp> 9 #include <core/random/random.hpp> 10 #include <core/parallel/pthread_tools.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/execution/query_context.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 16 namespace query_eval {
18 typedef std::function<flexible_type(const sframe_rows::row&)> transform_type;
33 DECL_CORO_STATE(execute);
36 static std::string name() {
return "transform"; }
47 inline operator_impl(
const transform_type& f,
49 size_t random_seed=
size_t(-1))
50 : m_transform_fn(f), m_output_type(output_type), m_random_seed(random_seed)
53 inline std::shared_ptr<query_operator>
clone()
const {
54 return std::make_shared<operator_impl>(*this);
57 inline bool coro_running()
const {
58 return CORO_RUNNING(execute);
62 if (m_random_seed !=
size_t(-1)){
71 output->resize(1, rows->num_rows());
73 auto iter = rows->cbegin();
74 auto output_iter = output->begin();
75 while(iter != rows->cend()) {
76 auto outval = m_transform_fn((*iter));
78 outval.get_type() == m_output_type ||
80 (*output_iter)[0] = outval;
84 (*output_iter)[0] = f;
96 static std::shared_ptr<planner_node> make_planner_node(
97 std::shared_ptr<planner_node> source,
100 size_t random_seed=
size_t(-1)) {
102 {{
"output_type", (int)(output_type)},
103 {
"random_seed", random_seed}},
104 {{
"function",
any(fn)}},
108 static std::shared_ptr<query_operator> from_planner_node(
109 std::shared_ptr<planner_node> pnode) {
110 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TRANSFORM_NODE);
111 ASSERT_EQ(pnode->inputs.size(), 1);
112 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
113 ASSERT_TRUE(pnode->any_operator_parameters.count(
"function"));
117 fn = pnode->any_operator_parameters[
"function"].as<transform_type>();
118 int random_seed = (int)(
flex_int)(pnode->operator_parameters[
"random_seed"]);
119 return std::make_shared<operator_impl>(fn, output_type, random_seed);
122 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
123 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TRANSFORM_NODE);
124 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
125 return {(
flex_type_enum)(
int)(pnode->operator_parameters[
"output_type"])};
128 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
129 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TRANSFORM_NODE);
134 transform_type m_transform_fn;
136 size_t m_random_seed;
145 #endif // TURI_SFRAME_QUERY_MANAGER_TRANSFORM_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
void seed()
Seed the generator using the default seed.
flexible_type & soft_assign(const flexible_type &other)
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
static size_t thread_id()
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())