6 #ifndef TURI_SFRAME_QUERY_MANAGER_LOGICAL_FILTER_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_LOGICAL_FILTER_HPP 8 #include <core/data/flexible_type/flexible_type.hpp> 9 #include <core/storage/query_engine/operators/operator.hpp> 10 #include <core/storage/query_engine/execution/query_context.hpp> 11 #include <core/storage/query_engine/operators/operator_properties.hpp> 15 namespace query_eval {
31 DECL_CORO_STATE(execute);
32 std::shared_ptr<const sframe_rows> rows_left, rows_right;
34 std::shared_ptr<sframe_rows> output_buffer;
35 size_t cur_output_index = 0;
38 bool has_data =
false;
42 static std::string name() {
return "logical_filter"; }
44 inline operator_impl() { };
53 inline std::shared_ptr<query_operator>
clone()
const {
54 return std::make_shared<operator_impl>(*this);
58 bool is_all_zero(
const std::shared_ptr<const sframe_rows>& col) {
60 for (
auto& row: *col) {
61 if (!(row[0].is_zero()))
return false;
66 inline bool coro_running()
const {
67 return CORO_RUNNING(execute);
74 if (rows_left ==
nullptr && rows_right ==
nullptr)
return;
75 ASSERT_TRUE(rows_left !=
nullptr && rows_right !=
nullptr);
80 ncols = rows_left->num_columns();
82 output_buffer->resize(ncols, nrows);
86 ASSERT_TRUE(rows_left !=
nullptr && rows_right !=
nullptr);
87 ASSERT_EQ(rows_left->num_rows(), rows_right->num_rows());
89 left_iter = rows_left->cbegin();
90 right_iter = rows_right->cbegin();
91 while(left_iter != rows_left->cend()) {
92 if (!(*right_iter)[0].is_zero()) {
93 (*output_buffer)[cur_output_index] = (*left_iter);
95 if (cur_output_index == nrows) {
96 context.
emit(output_buffer);
99 output_buffer->resize(ncols, nrows);
100 cur_output_index = 0;
111 if (rows_right !=
nullptr && is_all_zero(rows_right)) {
118 }
while(has_data ==
false);
119 if(rows_left ==
nullptr && rows_right ==
nullptr)
break;
122 if (cur_output_index > 0) {
123 output_buffer->resize(ncols, cur_output_index);
124 context.
emit(output_buffer);
130 static std::shared_ptr<planner_node> make_planner_node(
131 std::shared_ptr<planner_node> left,
132 std::shared_ptr<planner_node> right) {
134 std::map<std::string, flexible_type>(),
135 std::map<std::string, any>(),
139 static std::shared_ptr<query_operator> from_planner_node(
140 std::shared_ptr<planner_node> pnode) {
141 ASSERT_EQ((
int)pnode->operator_type,
142 (
int)planner_node_type::LOGICAL_FILTER_NODE);
143 ASSERT_EQ(pnode->inputs.size(), 2);
144 return std::make_shared<operator_impl>();
147 static std::vector<flex_type_enum> infer_type(
148 std::shared_ptr<planner_node> pnode) {
149 ASSERT_EQ((
int)pnode->operator_type,
150 (
int)planner_node_type::LOGICAL_FILTER_NODE);
151 ASSERT_EQ(pnode->inputs.size(), 2);
155 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
159 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
160 ASSERT_EQ(pnode->inputs.size(), 2);
161 return std::string(
"Filter(") + get_tag(pnode->inputs[0]) +
"[" + get_tag(pnode->inputs[1]) +
"])";
172 #endif // TURI_SFRAME_QUERY_MANAGER_LOGICAL_FILTER_HPP
void execute(query_context &context)
int num_inputs
Number of inputs expected to the operator.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
size_t block_size() const
std::shared_ptr< query_operator > clone() const
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
void skip_next(size_t input_number)