Turi Create  4.0
project_transforms.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_PROJECTION_TRANSFORMS_H_
7 #define TURI_SFRAME_QUERY_OPTIMIZATION_PROJECTION_TRANSFORMS_H_
8 
9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp>
10 #include <core/storage/query_engine/planning/optimization_engine.hpp>
11 #include <core/storage/query_engine/operators/all_operators.hpp>
12 #include <core/storage/query_engine/planning/optimization_node_info.hpp>
13 #include <core/storage/query_engine/operators/operator_properties.hpp>
14 #include <core/data/flexible_type/flexible_type.hpp>
15 
16 #include <array>
17 
18 namespace turi {
19 namespace query_eval {
20 
21 class opt_project_transform : public opt_transform {
22  bool transform_applies(planner_node_type t) {
23  return (t == planner_node_type::PROJECT_NODE);
24  }
25 };
26 
27 /** Transform the projection on a source node to a source node.
28  */
29 class opt_project_on_source : public opt_project_transform {
30 
31  std::string description() { return "project(source) -> source"; }
32 
33  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
34 
35  if(n->inputs[0]->type != planner_node_type::SFRAME_SOURCE_NODE)
36  return false;
37 
38  auto flex_indices = n->p("indices").get<flex_list>();
39 
40  sframe old_sf = n->inputs[0]->any_p<sframe>("sframe");
41 
42  // Only apply this if the projection does not expand the number of
43  // columns present.
44  if(flex_indices.size() > old_sf.num_columns())
45  return false;
46 
47  std::vector<std::shared_ptr<sarray<flexible_type> > > columns;
48 
49  for(const auto& idx_f : flex_indices) {
50  size_t idx = idx_f;
51  DASSERT_LT(idx, old_sf.num_columns());
52  columns.push_back(old_sf.select_column(idx));
53  }
54 
55  DASSERT_TRUE(!columns.empty());
56 
57  size_t begin_index = n->inputs[0]->p("begin_index");
58  size_t end_index = n->inputs[0]->p("end_index");
59 
60  pnode_ptr new_pnode = op_sframe_source::make_planner_node(sframe(columns), begin_index, end_index);
61 
62  opt_manager->replace_node(n, new_pnode);
63  return true;
64  }
65 };
66 
67 
68 /** Eliminate unneeded projections.
69  */
70 class opt_eliminate_identity_project : public opt_project_transform {
71 
72  std::string description() { return "project(a, {0,1,...,num_columns(a)}) -> a"; }
73 
74  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
75 
76  const auto& flex_indices = n->p("indices").get<flex_list>();
77 
78  size_t nc = n->inputs[0]->num_columns();
79 
80  if(flex_indices.size() != nc)
81  return false;
82 
83  for(size_t idx = 0; idx < flex_indices.size(); ++idx) {
84  if(flex_indices[idx].get<flex_int>() != flex_int(idx))
85  return false;
86  }
87 
88  opt_manager->replace_node(n, n->inputs[0]->pnode);
89  return true;
90  }
91 };
92 
93 
94 /** Merge subsequent projections.
95  */
96 class opt_merge_projects : public opt_project_transform {
97 
98  std::string description() { return "project1(project2(a)) -> project3(a)"; }
99 
100  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
101 
102  DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
103 
104  if(n->inputs[0]->type != planner_node_type::PROJECT_NODE)
105  return false;
106 
107  const auto& iv_1 = n->inputs[0]->p("indices").get<flex_list>();
108  const auto& iv_2 = n->p("indices").get<flex_list>();
109 
110  // Do this under three cases -- if both are expansions, or both
111  // are contractions. But ignore them if an expansion is
112  // downstream of a contraction.
113 
114  bool first_is_contraction = (n->inputs[0]->inputs[0]->num_columns() > iv_1.size());
115  bool second_is_expansion = (iv_2.size() > iv_1.size());
116 
117  if(first_is_contraction && second_is_expansion)
118  return false;
119 
120  std::vector<size_t> iv_out;
121 
122  for(const auto& idx_f : iv_2) {
123  size_t idx = idx_f;
124  DASSERT_LT(idx, iv_1.size());
125  iv_out.push_back((size_t)(iv_1[idx]));
126  }
127 
128  pnode_ptr out = op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_out);
129 
130  opt_manager->replace_node(n, out);
131 
132  return true;
133  }
134 };
135 
136 /** Allows a projection to propegate through an append.
137  *
138  */
139 class opt_project_append_exchange : public opt_project_transform {
140 
141  std::string description() { return "project(append(a,b)) -> append(project(a), project(b))"; }
142 
143  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
144 
145  DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
146 
147  if(n->inputs[0]->type != planner_node_type::APPEND_NODE)
148  return false;
149 
150  // Determine what kind of transformation we can do. It's possible that we don't do any.
151  const auto& iv = n->p("indices").get<flex_list>();
152 
153  // Propegate downstream only if it's not an expansion.
154  if(iv.size() > n->inputs[0]->num_columns())
155  return false;
156 
157  std::vector<size_t> iv_s(iv.begin(), iv.end());
158 
160  op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_s),
161  op_project::make_planner_node(n->inputs[0]->inputs[1]->pnode, iv_s));
162 
163  opt_manager->replace_node(n, out);
164 
165  return true;
166  }
167 
168 };
169 
170 
171 class opt_project_logical_filter_exchange : public opt_project_transform {
172 
173  std::string description() { return "project(logical_filter(a), mask) -> logical_filter(project(a), mask)"; }
174 
175  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
176 
177  DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
178 
179  if(n->inputs[0]->type != planner_node_type::LOGICAL_FILTER_NODE)
180  return false;
181 
182  const auto& iv = n->p("indices").get<flex_list>();
183 
184  if(iv.size() > n->inputs[0]->num_columns())
185  return false;
186 
187  std::vector<size_t> iv_s(iv.begin(), iv.end());
188 
189  pnode_ptr out = op_logical_filter::make_planner_node(
190  op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_s),
191  n->inputs[0]->inputs[1]->pnode);
192 
193  opt_manager->replace_node(n, out);
194 
195  return true;
196  }
197 
198 };
199 
200 /** Selectively pass a project through a union.
201  *
202  * The goal of this operator is to pass a project through a union
203  * with the hope of pruning the tree before the union. However,
204  * order must be preserved. Therefore, this operator actually has a
205  * number of possible output transformations.
206  *
207  * 1. One side is eliminated. In this case, the union is dropped,
208  * and the projection simply is translated to the pre-union indices.
209  *
210  * 2. The projection operator maintains the partitioning between
211  * the two union inputs. In this case, it is replaced with
212  * a union of two projection nodes.
213  *
214  * 3. The projection reduces the number of input columns to the
215  * union. In this case, it is replaced with a union of two
216  * projections followed by a transposing projection operator. This
217  * allows eliminations to propegate up the tree.
218  *
219  */
220 class opt_union_project_exchange : public opt_project_transform {
221 
222  std::string description() { return "partitionable_project(union(a,...)) ?->? union(project1(a), ...)"; }
223 
224  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
225 
226  DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
227 
228  if(n->inputs[0]->type != planner_node_type::UNION_NODE)
229  return false;
230 
231  cnode_info_ptr u_node = n->inputs[0];
232 
233  DASSERT_TRUE(!u_node->inputs.empty());
234 
235  // Determine what kind of transformation we can do. It's possible
236  // that we don't do any.
237  const auto& out_iv = n->p("indices").get<flex_list>();
238 
239  std::vector<int> output_used(u_node->num_columns(), false);
240 
241  for(const auto& idx : out_iv) {
242  DASSERT_LT(idx, output_used.size());
243  output_used[idx.get<flex_int>()] = true;
244  }
245 
246  // See if there are any that can get eliminated. If not, we're done.
247  if(std::all_of(output_used.begin(), output_used.end(), [](int t) { return t; } ) )
248  return false;
249 
250  // Remap the indices
251  std::vector<size_t> remapped_indices(u_node->num_columns(), size_t(-1));
252 
253  std::vector<std::vector<size_t> > projections_by_input(u_node->inputs.size());
254  std::vector<bool> input_needs_projection(u_node->inputs.size(), false);
255 
256  ////////////////////////////////////////////////////////////////////////////////
257  // Build the projection mappings
258 
259  size_t current_offset = 0;
260  size_t current_input = 0;
261  size_t current_input_idx = 0;
262  for(size_t i = 0; i < u_node->num_columns(); ++i) {
263 
264  if(output_used[i]) {
265  projections_by_input[current_input].push_back(current_input_idx);
266  remapped_indices[i] = i - current_offset;
267  } else {
268  ++current_offset;
269  input_needs_projection[current_input] = true;
270  }
271 
272  ++current_input_idx;
273 
274  if(current_input_idx == u_node->inputs[current_input]->num_columns()) {
275  ++current_input;
276  current_input_idx = 0;
277  }
278  }
279 
280  ////////////////////////////////////////////////////////////////////////////////
281  // Remap the current inputs. Maybe they need projections, and
282  // maybe they don't.
283 
284  std::vector<pnode_ptr> inputs(u_node->inputs.size());
285 
286  for(size_t i = 0; i < u_node->inputs.size(); ++i) {
287  if(projections_by_input[i].empty()) {
288  inputs[i].reset();
289  } else if(input_needs_projection[i]) {
290  inputs[i] = op_project::make_planner_node(u_node->inputs[i]->pnode, projections_by_input[i]);
291  } else {
292  inputs[i] = u_node->inputs[i]->pnode;
293  }
294  }
295 
296  // Clear out the empty ones.
297  auto new_end = std::remove_if(inputs.begin(), inputs.end(),
298  [](const pnode_ptr& p) { return p == nullptr; });
299  inputs.resize(new_end - inputs.begin());
300 
301  ////////////////////////////////////////////////////////////////////////////////
302  // Rebuilding the new projection
303 
304  std::vector<size_t> new_projection_indices(out_iv.size());
305 
306  for(size_t i = 0; i < out_iv.size(); ++i) {
307  DASSERT_LT(out_iv[i], remapped_indices.size());
308  DASSERT_NE(remapped_indices[out_iv[i]], size_t(-1));
309 
310  new_projection_indices[i] = remapped_indices[out_iv[i]];
311  }
312 
313  // If the union isn't needed, don't use it.
314  if(inputs.size() == 1) {
315  opt_manager->replace_node(
316  n, op_project::make_planner_node(inputs[0], new_projection_indices));
317  } else {
318  // It's done, folks!
319  opt_manager->replace_node(
320  n, op_project::make_planner_node(
321  op_union::make_planner_node(inputs), new_projection_indices));
322  }
323 
324  return true;
325  }
326 };
327 
328 /** If a projection node results in more columns, but it is only
329  * choosing a subset of the columns, split it in to two. The
330  * optimizations are designed so that expansive projections move
331  * downstream, and contractive expansions move upstream.
332  */
333 class opt_split_contractive_expansive_projection : public opt_project_transform {
334 
335  std::string description() {
336  return "project(a, ...) ?->? expanding_project(contracting_project(a, ...), ...)";
337  }
338 
339  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
340 
341  DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
342 
343  // Determine what kind of transformation we can do. It's possible
344  // that we don't do any.
345  const auto& iv = n->p("indices").get<flex_list>();
346 
347  std::set<size_t> used_indices(iv.begin(), iv.end());
348 
349  size_t n_cols = n->inputs[0]->num_columns();
350 
351  if( ! (used_indices.size() < n_cols && iv.size() >= n_cols) )
352  return false;
353 
354  std::vector<size_t> indices_1;
355  indices_1.reserve(used_indices.size());
356  std::vector<size_t> indices_2(iv.size());
357 
358  std::map<size_t, size_t> idx_map;
359 
360  for(size_t i = 0; i < iv.size(); ++i) {
361  size_t idx = iv[i];
362 
363  auto it = idx_map.lower_bound(idx);
364 
365  if(it != idx_map.end() && it->first == idx) {
366  indices_2[i] = it->second;
367  } else {
368  size_t new_idx = indices_1.size();
369  idx_map.insert(it, {idx, new_idx});
370  indices_1.push_back(idx);
371  indices_2[i] = new_idx;
372  }
373  }
374 
375  pnode_ptr p_1 = op_project::make_planner_node(n->inputs[0]->pnode, indices_1);
376  pnode_ptr p_2 = op_project::make_planner_node(p_1, indices_2);
377 
378  opt_manager->replace_node(n, p_2);
379  return true;
380  }
381 };
382 
383 }}
384 
385 #endif /* _PROJECTION_TRANSFORMS_H_ */
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
static std::shared_ptr< planner_node > make_planner_node(std::shared_ptr< planner_node > left, std::shared_ptr< planner_node > right)
Definition: append.hpp:108