Turi Create  4.0
union_transforms.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_UNION_TRANSFORMS_H_
7 #define TURI_SFRAME_QUERY_OPTIMIZATION_UNION_TRANSFORMS_H_
8 
9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp>
10 #include <core/storage/query_engine/planning/optimization_engine.hpp>
11 #include <core/storage/query_engine/operators/all_operators.hpp>
12 #include <core/storage/query_engine/planning/optimization_node_info.hpp>
13 #include <core/storage/query_engine/operators/operator_properties.hpp>
14 #include <core/data/flexible_type/flexible_type.hpp>
15 
16 #include <array>
17 
18 namespace turi {
19 namespace query_eval {
20 
21 // This class applies to the union
22 
23 class opt_union_transform : public opt_transform {
24  bool transform_applies(planner_node_type t) {
25  return t == planner_node_type::UNION_NODE;
26  }
27 };
28 
29 class opt_union_merge : public opt_union_transform {
30 
31  std::string description() { return "union(a, union(b,c)) -> union(a,b,c)"; }
32 
33  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
34 
35  DASSERT_TRUE(n->type == planner_node_type::UNION_NODE);
36 
37  if(! n->input_type_present(planner_node_type::UNION_NODE))
38  return false;
39 
40  std::vector<pnode_ptr> inputs;
41 
42  std::function<void(const cnode_info_ptr&)> add;
43 
44  add = [&](const cnode_info_ptr& nn) {
45  if(nn->type == planner_node_type::UNION_NODE) {
46  for(const auto& nnn : nn->inputs) add(nnn);
47  } else {
48  inputs.push_back(nn->pnode);
49  }
50  };
51 
52  add(n);
53 
54  pnode_ptr new_pnode = op_union::make_planner_node(inputs);
55  opt_manager->replace_node(n, new_pnode);
56 
57  return true;
58  }
59 };
60 
61 /** Transform a union of two source nodes into a source node.
62  *
63  */
64 class opt_union_on_source : public opt_union_transform {
65 
66  std::string description() { return "union(source, source) -> source"; }
67 
68  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
69 
70  DASSERT_TRUE(n->type == planner_node_type::UNION_NODE);
71 
72  // Do a quick check to see if this transformation is applicable.
73  size_t num_sources_present = 0;
74  for(size_t i = 0; i < n->inputs.size(); ++i) {
75  auto t = n->inputs[i]->type;
76  if(t == planner_node_type::SFRAME_SOURCE_NODE
77  || t == planner_node_type::SARRAY_SOURCE_NODE) {
78  ++num_sources_present;
79  }
80  }
81 
82  if(num_sources_present < 2)
83  return false;
84 
85  ////////////////////////////////////////////////////////////////////////////////
86  // Now, do a second pass to make sure that at least some of the
87  // sources can be merged. This isn't always the case.
88 
89  typedef std::array<size_t, 3> key_type;
90 
91  std::vector<key_type> input_keys(n->inputs.size());
92 
93  // Go though and extract the keys, and test for uniqueness.
94  {
95  std::set<key_type> distinct_input_ranges;
96 
97  for(size_t i = 0; i < n->inputs.size(); ++i) {
98  auto t = n->inputs[i]->type;
99  if(t == planner_node_type::SFRAME_SOURCE_NODE
100  || t == planner_node_type::SARRAY_SOURCE_NODE) {
101 
102  size_t begin_index = n->inputs[i]->p("begin_index");
103  size_t end_index = n->inputs[i]->p("end_index");
104 
105  size_t size = 0;
106 
107  if(t == planner_node_type::SFRAME_SOURCE_NODE) {
108  // Get the size of the sframe
109  size = n->inputs[i]->any_p<sframe>("sframe").num_rows();
110  } else {
111  // Get the size of the sarray
112  size = n->inputs[i]->any_p<std::shared_ptr<sarray<flexible_type> > >("sarray")->size();
113  }
114 
115  // Store the key, since it's somewhat expensive to extract.
116  input_keys[i] = key_type{{begin_index, end_index, size}};
117 
118  distinct_input_ranges.insert(input_keys[i]);
119  }
120  }
121 
122  if(distinct_input_ranges.size() == num_sources_present) {
123  return false;
124  }
125  }
126 
127  ////////////////////////////////////////////////////////////////////////////////
128  // Now, we know we'll end up doing something. Let's go for it.
129 
130  struct merge_info {
131  bool is_sarray = false;
132 
133  // Non sarray inputs get mapped to this.
134  size_t input_index;
135  size_t output_index_start;
136  size_t output_index_end;
137 
138  // If it's an sarray, then this is the one.
139  size_t begin_index = 0;
140  size_t end_index = 0;
141 
142  // Pair is (original output index, sarray).
143  std::vector<std::pair<size_t, std::shared_ptr<sarray<flexible_type> > > > sa_v;
144  };
145 
146  std::vector<merge_info> map_info;
147  std::map<key_type, size_t> merge_groups;
148 
149  size_t current_output_idx = 0;
150  for(size_t i = 0; i < n->inputs.size(); ++i) {
151  auto t = n->inputs[i]->type;
152  if(t == planner_node_type::SFRAME_SOURCE_NODE
153  || t == planner_node_type::SARRAY_SOURCE_NODE) {
154 
155  size_t begin_index = n->inputs[i]->p("begin_index");
156  size_t end_index = n->inputs[i]->p("end_index");
157 
158  const auto& key = input_keys[i];
159 
160  auto it = merge_groups.find(key);
161 
162  size_t map_idx;
163  if(it == merge_groups.end()) {
164  merge_groups[key] = map_idx = map_info.size();
165  map_info.emplace_back();
166  map_info.back().is_sarray = true;
167  map_info.back().begin_index = begin_index;
168  map_info.back().end_index = end_index;
169  } else {
170  map_idx = it->second;
171  }
172 
173  // Add this to the appropriate info struct
174 
175  if(t == planner_node_type::SFRAME_SOURCE_NODE) {
176 
177  sframe sf = n->inputs[i]->any_p<sframe>("sframe");
178 
179  for(size_t i = 0; i < sf.num_columns(); ++i) {
180  map_info[map_idx].sa_v.push_back({current_output_idx, sf.select_column(i)});
181  ++current_output_idx;
182  }
183  } else {
184  map_info[map_idx].sa_v.push_back(
185  {current_output_idx,
186  n->inputs[i]->any_p<std::shared_ptr<sarray<flexible_type> > >("sarray")});
187  ++current_output_idx;
188  }
189 
190  } else {
191  map_info.emplace_back();
192  map_info.back().is_sarray = false;
193  map_info.back().input_index = i;
194  map_info.back().output_index_start = current_output_idx;
195  current_output_idx += n->inputs[i]->num_columns();
196  map_info.back().output_index_end = current_output_idx;
197  }
198  }
199 
200  size_t num_outputs = current_output_idx;
201 
202  ////////////////////////////////////////////////////////////////////////////////
203  // First, take care of the case where it's only one output.
204 
205  if(map_info.size() == 1) {
206  const auto& m = map_info.front();
207  DASSERT_TRUE(m.is_sarray);
208 
209  std::vector<std::shared_ptr<sarray<flexible_type> > > columns(m.sa_v.size());
210  for(size_t i = 0; i < columns.size(); ++i) {
211  columns[i] = m.sa_v[i].second;
212  if(i + 1 < columns.size()) {
213  DASSERT_EQ(m.sa_v[i].first + 1, m.sa_v[i+1].first);
214  }
215  }
216 
217  pnode_ptr rep = op_sframe_source::make_planner_node(sframe(columns), m.begin_index, m.end_index);
218  opt_manager->replace_node(n, rep);
219  return true;
220  }
221 
222  ////////////////////////////////////////////////////////////////////////////////
223  // Now, it's possible we have to actually follow this with a
224  // projection operator to reorder the columns. Thus keep track of
225  // these output indices.
226 
227  std::vector<size_t> output_projection_indices(num_outputs, size_t(-1));
228 
229  std::vector<pnode_ptr> inputs;
230  inputs.reserve(map_info.size());
231 
232  current_output_idx = 0;
233 
234  for(const auto& m : map_info) {
235  if(m.is_sarray) {
236  size_t n_columns = m.sa_v.size();
237 
238  std::vector<std::shared_ptr<sarray<flexible_type> > > columns(n_columns);
239 
240  for(size_t i = 0; i < n_columns; ++i) {
241  output_projection_indices[m.sa_v[i].first] = current_output_idx;
242  columns[i] = m.sa_v[i].second;
243  ++current_output_idx;
244  }
245 
246  inputs.push_back(op_sframe_source::make_planner_node(sframe(columns), m.begin_index, m.end_index));
247  } else {
248 
249  inputs.push_back(n->inputs[m.input_index]->pnode);
250 
251  for(size_t i = m.output_index_start; i < m.output_index_end; ++i) {
252  output_projection_indices[i] = current_output_idx;
253  ++current_output_idx;
254  }
255  }
256  }
257 
258 #ifndef NDEBUG
259  for(size_t i : output_projection_indices) {
260  DASSERT_NE(i, size_t(-1));
261  DASSERT_LT(i, num_outputs);
262  }
263 #endif
264 
265  ////////////////////////////////////////////////////////////////////////////////
266  // Construct the union node
267 
268  pnode_ptr new_pnode = op_union::make_planner_node(inputs);
269 
270  ////////////////////////////////////////////////////////////////////////////////
271  // Do we need to follow it by a project node?
272 
273  bool all_consecutive = true;
274  for(size_t i = 0; i + 1 < output_projection_indices.size(); ++i) {
275  if(output_projection_indices[i] + 1 != output_projection_indices[i + 1]) {
276  all_consecutive = false;
277  break;
278  }
279  }
280 
281  if(!all_consecutive)
282  new_pnode = op_project::make_planner_node(new_pnode, output_projection_indices);
283 
284  opt_manager->replace_node(n, new_pnode);
285  return true;
286  }
287 };
288 
289 class opt_eliminate_singleton_union : public opt_union_transform {
290 
291  std::string description() { return "union(a) -> a"; }
292 
293  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
294 
295  DASSERT_TRUE(n->type == planner_node_type::UNION_NODE);
296 
297  if(n->inputs.size() == 1) {
298  opt_manager->replace_node(n, n->inputs[0]->pnode);
299  return true;
300  } else {
301  return false;
302  }
303  }
304 };
305 
306 
307 /**
308  * Merges a connected subtree of unions into a minimal set of unions.
309  * For instance:
310  *
311  * U(P(A, [1,2]), U(P(A, [3,4]), B)) --> U(P(A, [1,2,3,4]), B)
312  *
313  * It does so by collapsing to a list of sources. Then rebuilds the list
314  * with a minimal set of unions.
315  */
316 class opt_union_project_merge : public opt_union_transform {
317 
318  std::string description() { return "union(project1(a), ..., project2(a)) -> union(project3(a...), ...)"; }
319 
320  bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
321 
322  // Must be at least 2 projects here for this to apply.
323  if(! n->input_type_present(planner_node_type::PROJECT_NODE, 2))
324  return false;
325 
326  {
327  // Do a quick check to see if any inputs can be merged.
328  std::set<cnode_info_ptr> project_inputs;
329  bool mergable_projection_present = false;
330  for(const auto& nn : n->inputs) {
331  if(nn->type == planner_node_type::PROJECT_NODE) {
332  if(project_inputs.count(nn->inputs[0])) {
333  mergable_projection_present = true;
334  break;
335  } else {
336  project_inputs.insert(nn->inputs[0]);
337  }
338  }
339  }
340 
341  if(!mergable_projection_present)
342  return false;
343  }
344 
345  ////////////////////////////////////////////////////////////////////////////////
346  // Now, go through and build out the projection points
347 
348  struct input_info {
349  // If a projection, this is the node it's projecting from. If
350  // not, it's just that node.
351  cnode_info_ptr in;
352 
353  // If projection
354  std::vector<size_t> indices;
355  std::vector<size_t> output_indices;
356  };
357 
358  std::vector<input_info> input_v;
359  input_v.reserve(n->inputs.size());
360 
361  // Collect a list of the project nodes that can be merged.
362  std::map<cnode_info_ptr, size_t> mergable_inputs;
363 
364  size_t current_output_index_start = 0;
365  for(cnode_info_ptr nn : n->inputs) {
366 
367  size_t current_output_index_end = current_output_index_start + nn->num_columns();
368 
369  if(nn->type == planner_node_type::PROJECT_NODE) {
370  auto it = mergable_inputs.find(nn->inputs[0]);
371 
372  if(it != mergable_inputs.end()) {
373  auto& ii = input_v[it->second];
374  DASSERT_TRUE(nn->inputs[0] == ii.in);
375 
376  const auto& p_iv = nn->p("indices").get<flex_list>();
377  ii.indices.insert(ii.indices.end(), p_iv.begin(), p_iv.end());
378 
379  for(size_t out_idx = current_output_index_start; out_idx < current_output_index_end; ++out_idx) {
380  ii.output_indices.push_back(out_idx);
381  }
382  } else {
383  input_info ii;
384  ii.in = nn->inputs[0];
385 
386  const auto& p_iv = nn->p("indices").get<flex_list>();
387  ii.indices.assign(p_iv.begin(), p_iv.end());
388 
389  ii.output_indices.resize(nn->num_columns());
390  std::iota(ii.output_indices.begin(), ii.output_indices.end(), current_output_index_start);
391 
392  mergable_inputs[nn->inputs[0]] = input_v.size();
393  input_v.push_back(std::move(ii));
394  }
395  } else {
396  auto it = mergable_inputs.find(nn);
397 
398  if(it != mergable_inputs.end()) {
399  auto& ii = input_v[it->second];
400  DASSERT_TRUE(nn == ii.in);
401 
402  for(size_t i = 0; i < nn->num_columns(); ++i) {
403  ii.indices.push_back(i);
404  ii.output_indices.push_back(current_output_index_start + i);
405  }
406  } else {
407  input_info ii;
408  ii.in = nn;
409 
410  ii.indices.resize(nn->num_columns());
411  std::iota(ii.indices.begin(), ii.indices.end(), 0);
412 
413  ii.output_indices.resize(nn->num_columns());
414  std::iota(ii.output_indices.begin(), ii.output_indices.end(), current_output_index_start);
415 
416  mergable_inputs[nn] = input_v.size();
417  input_v.push_back(std::move(ii));
418  }
419  }
420 
421  current_output_index_start = current_output_index_end;
422  }
423 
424  ////////////////////////////////////////////////////////////////////////////////
425  // Now that we have all of the info, remap all of the inputs
426 
427  std::vector<pnode_ptr> new_inputs;
428  new_inputs.reserve(input_v.size());
429 
430  std::vector<size_t> final_projection(n->num_columns(), size_t(-1));
431 
432  size_t mapped_idx = 0;
433  for(const auto& ii : input_v) {
434  for(size_t out_idx : ii.output_indices) {
435  DASSERT_EQ(final_projection[out_idx], size_t(-1));
436  final_projection[out_idx] = mapped_idx;
437  ++mapped_idx;
438  }
439 
440  bool is_contiguous = true;
441  if(ii.indices.size() == ii.in->num_columns()) {
442  for(size_t i = 0; i + 1 < ii.indices.size(); ++i) {
443  if(ii.indices[i] + 1 != ii.indices[i+1]) {
444  is_contiguous = false;
445  break;
446  }
447  }
448  } else {
449  is_contiguous = false;
450  }
451 
452  if(is_contiguous) {
453  new_inputs.push_back(ii.in->pnode);
454  } else {
455  new_inputs.push_back(op_project::make_planner_node(ii.in->pnode, ii.indices));
456  }
457  }
458 
459 #ifndef NDEBUG
460  {
461  DASSERT_FALSE(new_inputs.empty());
462  std::set<size_t> idx_set;
463  for(size_t idx : final_projection) {
464  DASSERT_NE(idx, size_t(-1));
465  idx_set.insert(idx);
466  }
467 
468  {
469  // Do a quick check to see if any inputs can be merged.
470  std::set<pnode_ptr> project_inputs;
471  bool mergable_projection_present = false;
472  for(const auto& nn : new_inputs) {
473  if(nn->operator_type == planner_node_type::PROJECT_NODE) {
474  if(project_inputs.count(nn->inputs[0])) {
475  mergable_projection_present = true;
476  break;
477  } else {
478  project_inputs.insert(nn->inputs[0]);
479  }
480  }
481  }
482 
483  DASSERT_FALSE(mergable_projection_present);
484  }
485  }
486 #endif
487 
488  ////////////////////////////////////////////////////////////////////////////////
489  // Now, it's time to dump it out.
490 
491  pnode_ptr in_node;
492  if(new_inputs.size() == 1)
493  in_node = new_inputs[0];
494  else
495  in_node = op_union::make_planner_node(new_inputs);
496 
497  opt_manager->replace_node(n, op_project::make_planner_node(in_node, final_projection));
498  return true;
499  }
500 };
501 
502 }}
503 
504 #endif /* _PROJECTION_TRANSFORMS_H_ */
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
Definition: sframe.hpp:341
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364