Turi Create  4.0
generalized_union_project.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_GENERALIZED_UNION_PROJECT_NODE_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_GENERALIZED_UNION_PROJECT_NODE_HPP
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/storage/query_engine/operators/operator.hpp>
11 #include <core/storage/query_engine/execution/query_context.hpp>
12 #include <core/storage/query_engine/operators/operator_properties.hpp>
13 #include <core/util/coro.hpp>
14 
15 namespace turi {
16 namespace query_eval {
17 
18 /**
19  * \ingroup sframe_query_engine
20  * \addtogroup operators Logical Operators
21  * \{
22  */
23 
24 /**
25  * A "union" operator combine two input stream by horizontally concat
26  * the values. This is really a "zip" operator and not the SQL union.
27  */
28 template <>
29 struct operator_impl<planner_node_type::GENERALIZED_UNION_PROJECT_NODE> : public query_operator {
30  public:
31  DECL_CORO_STATE(execute);
32 
33  std::vector<std::shared_ptr<const sframe_rows> > input_v;
34 
35  std::vector<
36  std::vector<
37  std::shared_ptr<
38  std::vector<turi::flexible_type> > > > input_columns;
39 
40  planner_node_type type() const { return planner_node_type::GENERALIZED_UNION_PROJECT_NODE; }
41 
42  static std::string name() { return "union-project"; }
43 
44  static query_operator_attributes attributes() {
46  ret.attribute_bitfield = query_operator_attributes::LINEAR;
47  ret.num_inputs = -1;
48  return ret;
49  }
50 
51  inline operator_impl(size_t _num_inputs) : num_inputs(_num_inputs) {}
52 
53  inline std::shared_ptr<query_operator> clone() const {
54  return std::make_shared<operator_impl>(*this);
55  }
56 
57  inline bool coro_running() const {
58  return CORO_RUNNING(execute);
59  }
60  inline void execute(query_context& context) {
61  CORO_BEGIN(execute)
62  input_v.resize(num_inputs);
63  input_columns.resize(num_inputs);
64 
65  while(1) {
66  {
67  bool all_null = true, any_null = false;
68  for(size_t i = 0; i < num_inputs; ++i) {
69  input_v[i] = context.get_next(i);
70  if(input_v[i] == nullptr)
71  any_null = true;
72  else
73  all_null = false;
74  }
75 
76  if(any_null) {
77  ASSERT_TRUE(all_null);
78  break;
79  }
80 
81  for(size_t i = 0; i < num_inputs; ++i) {
82  input_columns[i] = std::move(input_v[i]->get_columns());
83  }
84 
85  auto out = context.get_output_buffer();
86  auto& out_columns = out->get_columns();
87  out_columns.clear();
88 
89  for(const std::pair<size_t, size_t>& p : index_map) {
90  out_columns.push_back(input_columns[p.first][p.second]);
91  }
92 
93  context.emit(out);
94  }
95  CORO_YIELD();
96  }
97  CORO_END
98  }
99 
100  private:
101  size_t num_inputs = 0;
102 
103  // List of (input, columns) making up the output column
104  std::vector<std::pair<size_t, size_t> > index_map;
105 
106  public:
107 
108  static std::shared_ptr<planner_node> make_planner_node(
109  const std::vector<std::shared_ptr<planner_node> >& inputs,
110  const std::vector<std::pair<size_t, size_t> >& index_mappings) {
111 
112  ASSERT_GE(inputs.size(), 1);
113 
114  flex_dict _index_map(index_mappings.begin(), index_mappings.end());
115 
117  planner_node_type::GENERALIZED_UNION_PROJECT_NODE,
118  { {"index_map", _index_map} },
119  std::map<std::string, any>(),
120  inputs);
121  }
122 
123  static std::shared_ptr<query_operator> from_planner_node(
124  std::shared_ptr<planner_node> pnode) {
125  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
126  ASSERT_GE(pnode->inputs.size(), 1);
127 
128  auto ret = std::make_shared<operator_impl>(pnode->inputs.size());
129 
130  const flex_dict& _index_map = pnode->operator_parameters.at("index_map").get<flex_dict>();
131  ret->index_map.assign(_index_map.begin(), _index_map.end());
132 
133  return ret;
134  }
135 
136  static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
137  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
138  ASSERT_GE(pnode->inputs.size(), 1);
139 
140  std::vector<std::vector<flex_type_enum> > raw_types(pnode->inputs.size());
141 
142  for(size_t i = 0; i < pnode->inputs.size(); ++i) {
143  raw_types[i] = infer_planner_node_type(pnode->inputs[i]);
144  }
145 
146  const flex_dict& _index_map = pnode->operator_parameters.at("index_map").get<flex_dict>();
147  std::vector<flex_type_enum> out_types(_index_map.size());
148 
149  for(size_t i = 0; i < _index_map.size(); ++i) {
150  out_types[i] = raw_types[_index_map[i].first.get<flex_int>()][_index_map[i].second.get<flex_int>()];
151  }
152 
153  return out_types;
154  }
155 
156  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
157  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::GENERALIZED_UNION_PROJECT_NODE);
158  return infer_planner_node_length(pnode->inputs[0]);
159  }
160 
161  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
162  std::ostringstream out;
163  out << "UP(";
164 
165  const flex_dict& _index_map = pnode->operator_parameters.at("index_map").get<flex_dict>();
166 
167  if(!_index_map.empty()) {
168 
169  std::vector<std::pair<size_t, std::vector<size_t> > >
170  groups{ {size_t(_index_map[0].first), { size_t(_index_map[0].second)} } };
171 
172  for(size_t i = 1; i < _index_map.size(); ++i) {
173  if(_index_map[i].first == groups.back().first
174  && _index_map[i].second == groups.back().second.back() + 1) {
175  groups.back().second.push_back(size_t(_index_map[i].second));
176  } else {
177  groups.push_back({_index_map[i].first, {_index_map[i].second} });
178  }
179  }
180 
181  bool is_first = true;
182  for(size_t i = 0; i < groups.size(); ++i) {
183  const auto& grp = groups[i];
184 
185  if(!is_first) {
186  if(groups[i - 1].first != groups[i].first)
187  out << ";";
188  else
189  out << ",";
190  }
191 
192  is_first = false;
193 
194  if(i == 0 || grp.first != groups[i-1].first)
195  out << get_tag(pnode->inputs[grp.first]) << ':';
196 
197  switch(grp.second.size()) {
198  case 1:
199  out << grp.second[0];
200  break;
201  case 2:
202  out << grp.second[0] << ',' << grp.second[1];
203  break;
204  case 3:
205  out << grp.second[0] << ',' << grp.second[1] << ',' << grp.second[2];
206  break;
207  default:
208  out << grp.second.front() << ",...," << grp.second.back();
209  break;
210  }
211  }
212  }
213 
214  out << ')';
215 
216  return out.str();
217  }
218 };
219 
221 
222 /// \}
223 } // query_eval
224 } // turicreate
225 
226 #endif // TURI_SFRAME_QUERY_MANAGER_UNION_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::vector< std::pair< flexible_type, flexible_type > > flex_dict