Turi Create  4.0
union.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_UNION_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_UNION_HPP
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/storage/query_engine/operators/operator.hpp>
11 #include <core/storage/query_engine/execution/query_context.hpp>
12 #include <core/storage/query_engine/operators/operator_properties.hpp>
13 #include <core/util/coro.hpp>
14 
15 namespace turi {
16 namespace query_eval {
17 
18 /**
19  * \ingroup sframe_query_engine
20  * \addtogroup operators Logical Operators
21  * \{
22  */
23 
24 /**
25  * A "union" operator combine two input stream by horizontally concat
26  * the values. This is really a "zip" operator and not the SQL union.
27  */
28 template <>
29 struct operator_impl<planner_node_type::UNION_NODE> : public query_operator {
30  public:
31  DECL_CORO_STATE(execute);
32  std::vector<std::shared_ptr<const sframe_rows> > input_v;
33 
34  planner_node_type type() const { return planner_node_type::UNION_NODE; }
35 
36  static std::string name() { return "union"; }
37 
38  static query_operator_attributes attributes() {
40  ret.attribute_bitfield = query_operator_attributes::LINEAR;
41  ret.num_inputs = -1;
42  return ret;
43  }
44 
45  inline operator_impl(size_t _num_inputs = 2) : num_inputs(_num_inputs) {}
46 
47  inline std::shared_ptr<query_operator> clone() const {
48  return std::make_shared<operator_impl>(*this);
49  }
50 
51  inline bool coro_running() const {
52  return CORO_RUNNING(execute);
53  }
54  inline void execute(query_context& context) {
55 
56  CORO_BEGIN(execute)
57  input_v.resize(num_inputs);
58 
59  while(1) {
60  {
61  bool all_null = true, any_null = false;
62  for(size_t i = 0; i < num_inputs; ++i) {
63  input_v[i] = context.get_next(i);
64  if(input_v[i] == nullptr)
65  any_null = true;
66  else
67  all_null = false;
68  }
69 
70  if(any_null) {
71  ASSERT_TRUE(all_null);
72  break;
73  }
74 
75  auto out = context.get_output_buffer();
76  auto& out_columns = out->get_columns();
77  out_columns.clear();
78 
79  for(size_t i = 0; i < num_inputs; ++i) {
80  std::copy(input_v[i]->get_columns().begin(), input_v[i]->get_columns().end(),
81  std::back_inserter(out_columns));
82  }
83 
84  context.emit(out);
85  }
86  CORO_YIELD();
87  }
88  CORO_END
89  }
90 
91  private:
92  size_t num_inputs = 0;
93 
94  public:
95  static std::shared_ptr<planner_node> make_planner_node(
96  std::shared_ptr<planner_node> left,
97  std::shared_ptr<planner_node> right) {
98  return planner_node::make_shared(planner_node_type::UNION_NODE,
99  std::map<std::string, flexible_type>(),
100  std::map<std::string, any>(),
101  {left, right});
102  }
103 
104  static std::shared_ptr<planner_node> make_planner_node(
105  const std::vector<std::shared_ptr<planner_node> >& nodes) {
106 
107  ASSERT_GE(nodes.size(), 1);
108 
109  return planner_node::make_shared(planner_node_type::UNION_NODE,
110  std::map<std::string, flexible_type>(),
111  std::map<std::string, any>(),
112  nodes);
113  }
114 
115  static std::shared_ptr<query_operator> from_planner_node(
116  std::shared_ptr<planner_node> pnode) {
117  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::UNION_NODE);
118  ASSERT_GE(pnode->inputs.size(), 1);
119 
120  return std::make_shared<operator_impl>(pnode->inputs.size());
121  }
122 
123  static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
124  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::UNION_NODE);
125  ASSERT_GE(pnode->inputs.size(), 1);
126 
127  std::vector<flex_type_enum> types = infer_planner_node_type(pnode->inputs[0]);
128 
129  for(size_t i = 1; i < pnode->inputs.size(); ++i) {
130  auto input_types = infer_planner_node_type(pnode->inputs[i]);
131  types.insert(types.end(), input_types.begin(), input_types.end());
132  }
133 
134  return types;
135  }
136 
137  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
138  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::UNION_NODE);
139  return infer_planner_node_length(pnode->inputs[0]);
140  }
141 
142  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
143  std::ostringstream ss;
144  ss << "Union(";
145  bool is_first = true;
146  for(const auto& pn : pnode->inputs) {
147  if(!is_first) ss << ',';
148  is_first = false;
149  ss << get_tag(pn);
150  }
151  ss << ')';
152 
153  return ss.str();
154  }
155 };
156 
158 
159 /// \}
160 } // query_eval
161 } // turicreate
162 
163 #endif // TURI_SFRAME_QUERY_MANAGER_UNION_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
void copy(const std::string src, const std::string dest)
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::shared_ptr< query_operator > clone() const
Definition: union.hpp:47