6 #ifndef TURI_SFRAME_QUERY_MANAGER_GENERALIZED_UNION_PROJECT_NODE_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_GENERALIZED_UNION_PROJECT_NODE_HPP 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/storage/query_engine/operators/operator.hpp> 11 #include <core/storage/query_engine/execution/query_context.hpp> 12 #include <core/storage/query_engine/operators/operator_properties.hpp> 16 namespace query_eval {
31 DECL_CORO_STATE(execute);
33 std::vector<std::shared_ptr<const sframe_rows> > input_v;
38 std::vector<turi::flexible_type> > > > input_columns;
40 planner_node_type type()
const {
return planner_node_type::GENERALIZED_UNION_PROJECT_NODE; }
42 static std::string name() {
return "union-project"; }
51 inline operator_impl(
size_t _num_inputs) : num_inputs(_num_inputs) {}
53 inline std::shared_ptr<query_operator>
clone()
const {
54 return std::make_shared<operator_impl>(*this);
57 inline bool coro_running()
const {
58 return CORO_RUNNING(execute);
62 input_v.resize(num_inputs);
63 input_columns.resize(num_inputs);
67 bool all_null =
true, any_null =
false;
68 for(
size_t i = 0; i < num_inputs; ++i) {
70 if(input_v[i] ==
nullptr)
81 for(
size_t i = 0; i < num_inputs; ++i) {
82 input_columns[i] = std::move(input_v[i]->get_columns());
86 auto& out_columns = out->get_columns();
89 for(
const std::pair<size_t, size_t>& p : index_map) {
90 out_columns.push_back(input_columns[p.first][p.second]);
101 size_t num_inputs = 0;
104 std::vector<std::pair<size_t, size_t> > index_map;
108 static std::shared_ptr<planner_node> make_planner_node(
109 const std::vector<std::shared_ptr<planner_node> >& inputs,
110 const std::vector<std::pair<size_t, size_t> >& index_mappings) {
112 ASSERT_GE(inputs.size(), 1);
114 flex_dict _index_map(index_mappings.begin(), index_mappings.end());
117 planner_node_type::GENERALIZED_UNION_PROJECT_NODE,
118 { {
"index_map", _index_map} },
119 std::map<std::string, any>(),
123 static std::shared_ptr<query_operator> from_planner_node(
124 std::shared_ptr<planner_node> pnode) {
125 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
126 ASSERT_GE(pnode->inputs.size(), 1);
128 auto ret = std::make_shared<operator_impl>(pnode->inputs.size());
130 const flex_dict& _index_map = pnode->operator_parameters.at(
"index_map").get<
flex_dict>();
131 ret->index_map.assign(_index_map.begin(), _index_map.end());
136 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
137 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
138 ASSERT_GE(pnode->inputs.size(), 1);
140 std::vector<std::vector<flex_type_enum> > raw_types(pnode->inputs.size());
142 for(
size_t i = 0; i < pnode->inputs.size(); ++i) {
146 const flex_dict& _index_map = pnode->operator_parameters.at(
"index_map").get<
flex_dict>();
147 std::vector<flex_type_enum> out_types(_index_map.size());
149 for(
size_t i = 0; i < _index_map.size(); ++i) {
150 out_types[i] = raw_types[_index_map[i].first.get<
flex_int>()][_index_map[i].second.get<
flex_int>()];
156 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
157 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
161 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
162 std::ostringstream out;
165 const flex_dict& _index_map = pnode->operator_parameters.at(
"index_map").get<
flex_dict>();
167 if(!_index_map.empty()) {
169 std::vector<std::pair<size_t, std::vector<size_t> > >
170 groups{ {size_t(_index_map[0].first), { size_t(_index_map[0].second)} } };
172 for(
size_t i = 1; i < _index_map.size(); ++i) {
173 if(_index_map[i].first == groups.back().first
174 && _index_map[i].second == groups.back().second.back() + 1) {
175 groups.back().second.push_back(
size_t(_index_map[i].second));
177 groups.push_back({_index_map[i].first, {_index_map[i].second} });
181 bool is_first =
true;
182 for(
size_t i = 0; i < groups.size(); ++i) {
183 const auto& grp = groups[i];
186 if(groups[i - 1].first != groups[i].first)
194 if(i == 0 || grp.first != groups[i-1].first)
195 out << get_tag(pnode->inputs[grp.first]) <<
':';
197 switch(grp.second.size()) {
199 out << grp.second[0];
202 out << grp.second[0] <<
',' << grp.second[1];
205 out << grp.second[0] <<
',' << grp.second[1] <<
',' << grp.second[2];
208 out << grp.second.front() <<
",...," << grp.second.back();
226 #endif // TURI_SFRAME_QUERY_MANAGER_UNION_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
std::shared_ptr< query_operator > clone() const
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
void execute(query_context &context)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::vector< std::pair< flexible_type, flexible_type > > flex_dict