6 #ifndef TURI_SFRAME_QUERY_MANAGER_TERNARY_OPERATOR_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_TERNARY_OPERATOR_HPP 10 #include <core/logging/assertions.hpp> 11 #include <core/data/flexible_type/flexible_type.hpp> 12 #include <core/storage/query_engine/operators/operator.hpp> 13 #include <core/storage/query_engine/execution/query_context.hpp> 14 #include <core/storage/query_engine/operators/operator_properties.hpp> 18 namespace query_eval {
36 DECL_CORO_STATE(execute);
40 static std::string name() {
return "ternary"; }
49 inline operator_impl() { }
51 inline std::shared_ptr<query_operator>
clone()
const {
52 return std::make_shared<operator_impl>(*this);
55 inline bool coro_running()
const {
56 return CORO_RUNNING(execute);
59 constexpr
size_t CONDITION_INPUT = 0;
60 constexpr
size_t ISTRUE_INPUT = 1;
61 constexpr
size_t ISFALSE_INPUT = 2;
65 auto condition = context.
get_next(CONDITION_INPUT);
67 if (condition ==
nullptr)
break;
68 ASSERT_EQ(condition->num_columns(), 1);
69 const auto& condition_column = *(condition->cget_columns()[0]);
71 size_t num_non_zero = std::accumulate(condition_column.begin(),
72 condition_column.end(),
81 if (num_non_zero == 0 || num_non_zero == condition_column.size()) {
84 if (num_non_zero == 0) {
85 skip_number = ISTRUE_INPUT;
86 input_number = ISFALSE_INPUT;
88 skip_number = ISFALSE_INPUT;
89 input_number = ISTRUE_INPUT;
95 auto& out_columns = output_buffer->get_columns();
97 auto input = context.
get_next(input_number);
98 ASSERT_EQ(input->num_rows(), condition_column.size());
99 ASSERT_EQ(input->num_columns(), 1);
102 out_columns.push_back(input->cget_columns()[0]);
105 auto isfalse = context.
get_next(ISFALSE_INPUT);
106 auto istrue = context.
get_next(ISTRUE_INPUT);
109 ASSERT_EQ(isfalse->num_rows(), condition_column.size());
110 ASSERT_EQ(istrue->num_rows(), condition_column.size());
111 ASSERT_EQ(isfalse->num_columns(), 1);
112 ASSERT_EQ(istrue->num_columns(), 1);
114 output_buffer->resize(1, condition_column.size());
116 auto istrue_iter = istrue->cbegin();
117 auto isfalse_iter = isfalse->cbegin();
118 auto out_iter = output_buffer->begin();
119 for (
auto& cval : condition_column) {
120 if (cval.is_zero()) {
121 (*out_iter)[0] = (*isfalse_iter)[0];
123 (*out_iter)[0] = (*istrue_iter)[0];
130 context.
emit(output_buffer);
137 static std::shared_ptr<planner_node> make_planner_node(
138 std::shared_ptr<planner_node> condition,
139 std::shared_ptr<planner_node> istrue,
140 std::shared_ptr<planner_node> isfalse) {
143 std::map<std::string, flexible_type>(),
144 std::map<std::string, any>(),
145 {condition, istrue, isfalse});
148 static std::shared_ptr<query_operator> from_planner_node(
149 std::shared_ptr<planner_node> pnode) {
151 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TERNARY_OPERATOR);
152 ASSERT_EQ(pnode->inputs.size(), 3);
154 return std::make_shared<operator_impl>();
157 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
158 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TERNARY_OPERATOR);
162 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
163 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::TERNARY_OPERATOR);
176 #endif // TURI_SFRAME_QUERY_MANAGER_TERNARY_OPERATOR_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
std::shared_ptr< query_operator > clone() const
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
void execute(query_context &context)
void skip_next(size_t input_number)