6 #ifndef TURI_SFRAME_QUERY_MANAGER_LAMBDA_TRANSFORM_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_LAMBDA_TRANSFORM_HPP 8 #include <core/data/flexible_type/flexible_type.hpp> 9 #include <core/storage/query_engine/operators/operator.hpp> 10 #include <core/storage/query_engine/execution/query_context.hpp> 11 #include <core/storage/query_engine/operators/operator_properties.hpp> 12 #include <core/system/lambda/pylambda_function.hpp> 13 #include <core/system/exceptions/error_types.hpp> 16 namespace query_eval {
32 DECL_CORO_STATE(execute);
33 planner_node_type type()
const {
return planner_node_type::LAMBDA_TRANSFORM_NODE; }
35 static std::string name() {
return "lambda_transform"; }
45 inline operator_impl(std::shared_ptr<lambda::pylambda_function> lambda,
47 const std::vector<std::string>& column_names = {})
48 : m_lambda(lambda), m_output_type(output_type),
49 m_column_names(column_names) { }
51 inline std::shared_ptr<query_operator>
clone()
const {
52 return std::make_shared<operator_impl>(*this);
55 inline bool coro_running()
const {
56 return CORO_RUNNING(execute);
67 output->resize(1, rows->num_rows());
68 std::vector<flexible_type> out;
71 if (m_column_names.empty()) {
73 m_lambda->eval(*rows, out);
76 m_lambda->eval(m_column_names, *rows, out);
79 for (
size_t i = 0;i < out.size(); ++i) {
80 (*output)[i][0] = convert_value_to_output_type(out[i], m_output_type);
89 static std::shared_ptr<planner_node> make_planner_node(
90 std::shared_ptr<planner_node> source,
91 const std::string& lambda_str,
93 const std::vector<std::string> column_names = {},
94 bool skip_undefined =
false,
95 size_t random_seed = size_t(-1)) {
97 flex_list column_names_list(column_names.begin(), column_names.end());
98 auto lambda_function = std::make_shared<lambda::pylambda_function>(lambda_str);
99 lambda_function->set_skip_undefined(skip_undefined);
100 lambda_function->set_random_seed(random_seed);
102 {{
"output_type", (int)(output_type)},
103 {
"lambda_str", lambda_str},
104 {
"skip_undefined", (int)(skip_undefined)},
105 {
"random_seed", (int)(random_seed)},
106 {
"column_names", column_names_list}},
107 {{
"lambda_fn",
any(lambda_function)}},
111 static std::shared_ptr<query_operator> from_planner_node(
112 std::shared_ptr<planner_node> pnode) {
113 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::LAMBDA_TRANSFORM_NODE);
114 ASSERT_EQ(pnode->inputs.size(), 1);
115 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
116 ASSERT_TRUE(pnode->operator_parameters.count(
"lambda_str"));
117 ASSERT_TRUE(pnode->operator_parameters.count(
"column_names"));
118 ASSERT_TRUE(pnode->operator_parameters.count(
"skip_undefined"));
119 ASSERT_TRUE(pnode->operator_parameters.count(
"random_seed"));
120 ASSERT_TRUE(pnode->any_operator_parameters.count(
"lambda_fn"));
125 (pnode->operator_parameters[
"column_names"]).get<flex_list>();
126 std::vector<std::string> column_names(column_names_list.begin(), column_names_list.end());
128 auto fn = pnode->any_operator_parameters[
"lambda_fn"]
129 .as<std::shared_ptr<lambda::pylambda_function>>();
130 return std::make_shared<operator_impl>(fn, output_type, column_names);
133 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
134 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::LAMBDA_TRANSFORM_NODE);
135 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
136 return {(
flex_type_enum)(
int)(pnode->operator_parameters[
"output_type"])};
139 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
140 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::LAMBDA_TRANSFORM_NODE);
144 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
145 std::ostringstream out;
149 (pnode->operator_parameters[
"column_names"]).get<flex_list>();
151 if(!column_names_list.empty()) {
154 for(
size_t i = 0; i < column_names_list.size() - 1; ++i) {
155 out << column_names_list[i] <<
",";
158 out << column_names_list.back() <<
")";
165 std::shared_ptr<lambda::pylambda_function> m_lambda;
167 std::vector<std::string> m_column_names;
191 std::string message =
"Cannot convert " + std::string(val) +
194 " If this is not the correct type try specifying it during creation.";
207 #endif // TURI_SFRAME_QUERY_MANAGER_LAMBDA_TRANSFORM_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
flexible_type & soft_assign(const flexible_type &other)
size_t attribute_bitfield
A bitfield of the attribute enum.
const char * flex_type_enum_to_name(flex_type_enum en)
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
bool flex_type_is_convertible(flex_type_enum from, flex_type_enum to)
flex_type_enum get_type() const
#define ASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flexible_type > flex_list