6 #ifndef TURI_SFRAME_QUERY_MANAGER_UNION_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_UNION_HPP 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/storage/query_engine/operators/operator.hpp> 11 #include <core/storage/query_engine/execution/query_context.hpp> 12 #include <core/storage/query_engine/operators/operator_properties.hpp> 16 namespace query_eval {
31 DECL_CORO_STATE(execute);
32 std::vector<std::shared_ptr<const sframe_rows> > input_v;
36 static std::string name() {
return "union"; }
45 inline operator_impl(
size_t _num_inputs = 2) : num_inputs(_num_inputs) {}
47 inline std::shared_ptr<query_operator>
clone()
const {
48 return std::make_shared<operator_impl>(*this);
51 inline bool coro_running()
const {
52 return CORO_RUNNING(execute);
57 input_v.resize(num_inputs);
61 bool all_null =
true, any_null =
false;
62 for(
size_t i = 0; i < num_inputs; ++i) {
64 if(input_v[i] ==
nullptr)
76 auto& out_columns = out->get_columns();
79 for(
size_t i = 0; i < num_inputs; ++i) {
80 std::copy(input_v[i]->get_columns().begin(), input_v[i]->get_columns().end(),
81 std::back_inserter(out_columns));
92 size_t num_inputs = 0;
95 static std::shared_ptr<planner_node> make_planner_node(
96 std::shared_ptr<planner_node> left,
97 std::shared_ptr<planner_node> right) {
99 std::map<std::string, flexible_type>(),
100 std::map<std::string, any>(),
104 static std::shared_ptr<planner_node> make_planner_node(
105 const std::vector<std::shared_ptr<planner_node> >& nodes) {
107 ASSERT_GE(nodes.size(), 1);
110 std::map<std::string, flexible_type>(),
111 std::map<std::string, any>(),
115 static std::shared_ptr<query_operator> from_planner_node(
116 std::shared_ptr<planner_node> pnode) {
117 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::UNION_NODE);
118 ASSERT_GE(pnode->inputs.size(), 1);
120 return std::make_shared<operator_impl>(pnode->inputs.size());
123 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
124 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::UNION_NODE);
125 ASSERT_GE(pnode->inputs.size(), 1);
129 for(
size_t i = 1; i < pnode->inputs.size(); ++i) {
131 types.insert(types.end(), input_types.begin(), input_types.end());
137 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
138 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::UNION_NODE);
142 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
143 std::ostringstream ss;
145 bool is_first =
true;
146 for(
const auto& pn : pnode->inputs) {
147 if(!is_first) ss <<
',';
163 #endif // TURI_SFRAME_QUERY_MANAGER_UNION_HPP
void execute(query_context &context)
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
void copy(const std::string src, const std::string dest)
#define ASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::shared_ptr< query_operator > clone() const