6 #ifndef TURI_SFRAME_QUERY_MANAGER_APPEND_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_APPEND_HPP 9 #include <core/logging/assertions.hpp> 10 #include <core/data/flexible_type/flexible_type.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/execution/query_context.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 18 namespace query_eval {
31 DECL_CORO_STATE(execute);
35 std::shared_ptr<sframe_rows> out;
38 std::shared_ptr<const sframe_rows> input_rows;
43 static std::string name() {
return "append"; }
45 inline operator_impl() { };
55 inline std::shared_ptr<query_operator>
clone()
const {
56 return std::make_shared<operator_impl>();
59 inline bool coro_running()
const {
60 return CORO_RUNNING(execute);
66 for (input = 0; input < 2; ++input) {
68 input_rows = context.
get_next(input);
69 if (input_rows !=
nullptr && out ==
nullptr) {
72 out->resize(input_rows->num_columns(), context.
block_size());
76 while(input_rows !=
nullptr) {
77 row_iter_begin = input_rows->cbegin();
78 row_iter_end = input_rows->cend();
79 while(row_iter_begin != row_iter_end) {
80 (*out)[outidx] = *row_iter_begin;
87 out->resize(input_rows->num_columns(), context.
block_size());
92 input_rows = context.
get_next(input);
96 out->resize(out->num_columns(), outidx);
109 std::shared_ptr<planner_node> right) {
111 std::map<std::string, flexible_type>(),
112 std::map<std::string, any>(),
116 static std::shared_ptr<query_operator> from_planner_node(
117 std::shared_ptr<planner_node> pnode) {
118 ASSERT_EQ(pnode->inputs.size(), 2);
119 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::APPEND_NODE);
120 return std::make_shared<operator_impl>();
123 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
124 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::APPEND_NODE);
125 std::vector<std::vector<flex_type_enum>> types;
126 for (
auto input: pnode->inputs) {
129 ASSERT_MSG(types.size() != 0,
"Append with no input");
132 for (
size_t i = 1; i < types.size(); ++i) {
133 ASSERT_EQ(types[i].size(), types[0].size());
134 for (
size_t j = 0; j < types[i].size(); ++j) {
135 ASSERT_EQ((
int)types[i][j], (
int)types[0][j]);
141 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
142 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::APPEND_NODE);
143 int64_t ret_length = 0;
144 for (
auto input: pnode->inputs) {
146 if (input_length == -1)
return -1;
147 ret_length += input_length;
152 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
153 ASSERT_EQ(pnode->inputs.size(), 2);
154 return std::string(
"Append(") + get_tag(pnode->inputs[0]) +
"," + get_tag(pnode->inputs[1]) +
")";
166 #endif // TURI_SFRAME_QUERY_MANAGER_APPEND_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
size_t block_size() const
std::shared_ptr< query_operator > clone() const
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
void execute(query_context &context)
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
static std::shared_ptr< planner_node > make_planner_node(std::shared_ptr< planner_node > left, std::shared_ptr< planner_node > right)