Turi Create  4.0
append.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_APPEND_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_APPEND_HPP
8 
9 #include <core/logging/assertions.hpp>
10 #include <core/data/flexible_type/flexible_type.hpp>
11 #include <core/storage/query_engine/operators/operator.hpp>
12 #include <core/storage/query_engine/execution/query_context.hpp>
13 #include <core/storage/query_engine/operators/operator_properties.hpp>
14 #include <core/util/coro.hpp>
15 
16 namespace turi {
17 
18 namespace query_eval {
19 /**
20  * \ingroup sframe_query_engine
21  * \addtogroup operators Logical Operators
22  * \{
23  */
24 
25 /**
26  * A append operator append two input streams; typedefed \ref op_append.
27  */
28 template<>
29 class operator_impl<planner_node_type::APPEND_NODE> : public query_operator {
30  public:
31  DECL_CORO_STATE(execute);
32 
33 
34  // state
35  std::shared_ptr<sframe_rows> out;
36  size_t outidx;
37  size_t input;
38  std::shared_ptr<const sframe_rows> input_rows;
39  sframe_rows::const_iterator row_iter_begin;
40  sframe_rows::const_iterator row_iter_end;
41 
42  planner_node_type type() const { return planner_node_type::APPEND_NODE; }
43  static std::string name() { return "append"; }
44 
45  inline operator_impl() { };
46 
47  static query_operator_attributes attributes() {
50  ret.num_inputs = 2;
51  return ret;
52  }
53 
54 
55  inline std::shared_ptr<query_operator> clone() const {
56  return std::make_shared<operator_impl>();
57  }
58 
59  inline bool coro_running() const {
60  return CORO_RUNNING(execute);
61  }
62 
63  inline void execute(query_context& context) {
64  CORO_BEGIN(execute)
65  outidx = 0;
66  for (input = 0; input < 2; ++input) {
67  // read some input
68  input_rows = context.get_next(input);
69  if (input_rows != nullptr && out == nullptr) {
70  // we have input, but we don't have an output buffer. Make one
71  out = context.get_output_buffer();
72  out->resize(input_rows->num_columns(), context.block_size());
73  outidx = 0;
74  }
75  // keep looping over this input
76  while(input_rows != nullptr) {
77  row_iter_begin = input_rows->cbegin();
78  row_iter_end = input_rows->cend();
79  while(row_iter_begin != row_iter_end) {
80  (*out)[outidx] = *row_iter_begin;
81  ++outidx;
82  // output buffer is full. give it away and acquire a new one
83  if (outidx == context.block_size()) {
84  context.emit(out);
85  CORO_YIELD();
86  out = context.get_output_buffer();
87  out->resize(input_rows->num_columns(), context.block_size());
88  outidx = 0;
89  }
90  ++row_iter_begin;
91  }
92  input_rows = context.get_next(input);
93  }
94  }
95  if (outidx && out) {
96  out->resize(out->num_columns(), outidx);
97  context.emit(out);
98  CORO_YIELD();
99  }
100  CORO_END
101  }
102 
103  ////////////////////////////////////////////////////////////////////////////////
104 
105  /**
106  * Creates a logical append node that appends the left and right nodes.
107  */
108  static std::shared_ptr<planner_node> make_planner_node(std::shared_ptr<planner_node> left,
109  std::shared_ptr<planner_node> right) {
110  return planner_node::make_shared(planner_node_type::APPEND_NODE,
111  std::map<std::string, flexible_type>(),
112  std::map<std::string, any>(),
113  {left, right});
114  }
115 
116  static std::shared_ptr<query_operator> from_planner_node(
117  std::shared_ptr<planner_node> pnode) {
118  ASSERT_EQ(pnode->inputs.size(), 2);
119  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::APPEND_NODE);
120  return std::make_shared<operator_impl>();
121  }
122 
123  static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
124  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::APPEND_NODE);
125  std::vector<std::vector<flex_type_enum>> types;
126  for (auto input: pnode->inputs) {
127  types.push_back(infer_planner_node_type(input));
128  }
129  ASSERT_MSG(types.size() != 0, "Append with no input");
130  // check that all types are equal
131  // check that they all line up with types[0]
132  for (size_t i = 1; i < types.size(); ++i) {
133  ASSERT_EQ(types[i].size(), types[0].size());
134  for (size_t j = 0; j < types[i].size(); ++j) {
135  ASSERT_EQ((int)types[i][j], (int)types[0][j]);
136  }
137  }
138  return types[0];
139  }
140 
141  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
142  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::APPEND_NODE);
143  int64_t ret_length = 0;
144  for (auto input: pnode->inputs) {
145  int64_t input_length = infer_planner_node_length(input);
146  if (input_length == -1) return -1;
147  ret_length += input_length;
148  }
149  return ret_length;
150  }
151 
152  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& get_tag) {
153  ASSERT_EQ(pnode->inputs.size(), 2);
154  return std::string("Append(") + get_tag(pnode->inputs[0]) + "," + get_tag(pnode->inputs[1]) + ")";
155  }
156 
157 };
158 
160 
161 /// \}
162 
163 } // query_eval
164 } // turicreate
165 
166 #endif // TURI_SFRAME_QUERY_MANAGER_APPEND_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
std::shared_ptr< query_operator > clone() const
Definition: append.hpp:55
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
static std::shared_ptr< planner_node > make_planner_node(std::shared_ptr< planner_node > left, std::shared_ptr< planner_node > right)
Definition: append.hpp:108