Turi Create  4.0
append_transforms.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_APPEND_TRANSFORMS_H_
7 #define TURI_SFRAME_QUERY_OPTIMIZATION_APPEND_TRANSFORMS_H_
8 
9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp>
10 #include <core/storage/query_engine/planning/optimization_engine.hpp>
11 #include <core/storage/query_engine/operators/all_operators.hpp>
12 #include <core/storage/query_engine/planning/optimization_node_info.hpp>
13 #include <core/storage/query_engine/operators/operator_properties.hpp>
14 #include <core/data/flexible_type/flexible_type.hpp>
15 
16 #include <array>
17 
18 namespace turi {
19 namespace query_eval {
20 
21 class opt_append_transform : public opt_transform {
22  bool transform_applies(planner_node_type t) {
23  return (t == planner_node_type::APPEND_NODE);
24  }
25 };
26 
27 /** Transform append(source, source) --> source
28  */
29 class opt_append_on_source : public opt_append_transform {
30 
31  std::string description() { return "append(source, source) -> source"; }
32 
33  std::pair<bool, sframe> try_sframe_append(cnode_info_ptr n) {
34  sframe new_sf;
35  for (const auto& input: n->inputs) {
36  if (input->type != planner_node_type::SFRAME_SOURCE_NODE) return {false, new_sf};
37  auto begin = input->p("begin_index");
38  auto end = input->p("end_index");
39 
40  const auto& sf = input->any_p<sframe>("sframe");
41 
42  if (begin == 0 && end == sf.size()) {
43  // stupidly we need the names to match for the append to work...
44  for (size_t i = 0; i < new_sf.num_columns(); ++i) {
45  new_sf.set_column_name(i, sf.column_name(i));
46  }
47 
48  if(begin != end)
49  new_sf = new_sf.append(sf);
50 
51  } else {
52  return {false, new_sf};
53  }
54  }
55 
56  if(new_sf.num_rows() == 0) {
57  new_sf = n->inputs[0]->any_p<sframe>("sframe");
58  }
59 
60  return {true, new_sf};
61  }
62 
63  std::pair<bool, sarray<flexible_type> > try_sarray_append(cnode_info_ptr n) {
64  sarray<flexible_type> new_sa;
65  for (const auto& input: n->inputs) {
66  if (input->type != planner_node_type::SARRAY_SOURCE_NODE) return {false, new_sa};
67  auto begin = input->p("begin_index");
68  auto end = input->p("end_index");
69 
70  auto sa_ptr = input->any_p<std::shared_ptr<sarray<flexible_type> > >("sarray");
71 
72  const auto& sa = *sa_ptr;
73 
74  if (begin == 0 && end == sa.size()) {
75  if(begin != end)
76  new_sa = new_sa.append(sa);
77  } else {
78  return {false, new_sa};
79  }
80  }
81 
82  if(new_sa.size() == 0) {
83  new_sa = *(n->inputs[0]->any_p<std::shared_ptr<sarray<flexible_type> > >("sarray"));
84  }
85 
86  return {true, new_sa};
87  }
88 
89  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
90  // only source nodes accepted
91  // and all have the same begin and end positions
92  ASSERT_NE(n->inputs.size(), 0);
93 
94  // Quickly fail if not dealing with two sframe/sarray sources
95  if(! ((n->inputs[0]->type == planner_node_type::SFRAME_SOURCE_NODE
96  || n->inputs[0]->type == planner_node_type::SARRAY_SOURCE_NODE)
97  &&
98  (n->inputs[1]->type == planner_node_type::SFRAME_SOURCE_NODE
99  || n->inputs[1]->type == planner_node_type::SARRAY_SOURCE_NODE))) {
100 
101  return false;
102  }
103 
104  // Try append as sframe
105  auto sframe_append_result = try_sframe_append(n);
106 
107  if (sframe_append_result.first) {
108  auto& new_sf = sframe_append_result.second;
109  // we can rewrite the current node.
110  pnode_ptr new_pnode = op_sframe_source::make_planner_node(new_sf,
111  0,
112  new_sf.num_rows());
113  opt_manager->replace_node(n, new_pnode);
114  return true;
115  }
116 
117  // Try append as sarray
118  auto sarray_append_result = try_sarray_append(n);
119  if (sarray_append_result.first) {
120  auto& new_sa = sarray_append_result.second;
121  // we can rewrite the current node.
122  pnode_ptr new_pnode = op_sarray_source::make_planner_node(std::make_shared<sarray<flexible_type> >(new_sa),
123  0,
124  new_sa.size());
125  opt_manager->replace_node(n, new_pnode);
126  return true;
127  }
128  return false;
129  }
130 };
131 
132 /** Eliminated by optimization to prune off an append of an empty
133  * transform.
134  */
135 class opt_eliminate_empty_append : public opt_append_transform {
136 
137  std::string description() { return "append(source, empty_source) -> source"; }
138 
139  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
140 
141  if(n->inputs[1]->length() == 0) {
142  opt_manager->replace_node(n, n->inputs[0]->pnode);
143  return true;
144  }
145 
146  if(n->inputs[0]->length() == 0) {
147  opt_manager->replace_node(n, n->inputs[1]->pnode);
148  return true;
149  }
150 
151  return false;
152  }
153 };
154 
155 
156 }}
157 
158 #endif /* _APPEND_TRANSFORMS_H_ */
void set_column_name(size_t column_id, const std::string &name)
size_t size() const
Definition: sarray.hpp:382
size_t num_rows() const
Returns the length of each sarray.
Definition: sframe.hpp:346
sarray append(const sarray &other) const
Definition: sarray.hpp:458
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
Definition: sframe.hpp:341
sframe append(const sframe &other) const