6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_UNION_TRANSFORMS_H_ 7 #define TURI_SFRAME_QUERY_OPTIMIZATION_UNION_TRANSFORMS_H_ 9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp> 10 #include <core/storage/query_engine/planning/optimization_engine.hpp> 11 #include <core/storage/query_engine/operators/all_operators.hpp> 12 #include <core/storage/query_engine/planning/optimization_node_info.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 14 #include <core/data/flexible_type/flexible_type.hpp> 19 namespace query_eval {
23 class opt_union_transform :
public opt_transform {
25 return t == planner_node_type::UNION_NODE;
29 class opt_union_merge :
public opt_union_transform {
31 std::string description() {
return "union(a, union(b,c)) -> union(a,b,c)"; }
33 bool apply_transform(optimization_engine *opt_manager, cnode_info_ptr n) {
37 if(! n->input_type_present(planner_node_type::UNION_NODE))
40 std::vector<pnode_ptr> inputs;
42 std::function<void(const cnode_info_ptr&)> add;
44 add = [&](
const cnode_info_ptr& nn) {
45 if(nn->type == planner_node_type::UNION_NODE) {
46 for(
const auto& nnn : nn->inputs) add(nnn);
48 inputs.push_back(nn->pnode);
54 pnode_ptr new_pnode = op_union::make_planner_node(inputs);
55 opt_manager->replace_node(n, new_pnode);
66 std::string description() {
return "union(source, source) -> source"; }
73 size_t num_sources_present = 0;
74 for(
size_t i = 0; i < n->inputs.size(); ++i) {
75 auto t = n->inputs[i]->type;
76 if(t == planner_node_type::SFRAME_SOURCE_NODE
77 || t == planner_node_type::SARRAY_SOURCE_NODE) {
78 ++num_sources_present;
82 if(num_sources_present < 2)
89 typedef std::array<size_t, 3> key_type;
91 std::vector<key_type> input_keys(n->inputs.size());
95 std::set<key_type> distinct_input_ranges;
97 for(
size_t i = 0; i < n->inputs.size(); ++i) {
98 auto t = n->inputs[i]->type;
99 if(t == planner_node_type::SFRAME_SOURCE_NODE
100 || t == planner_node_type::SARRAY_SOURCE_NODE) {
102 size_t begin_index = n->inputs[i]->p(
"begin_index");
103 size_t end_index = n->inputs[i]->p(
"end_index");
107 if(t == planner_node_type::SFRAME_SOURCE_NODE) {
109 size = n->inputs[i]->any_p<
sframe>(
"sframe").num_rows();
112 size = n->inputs[i]->any_p<std::shared_ptr<sarray<flexible_type> > >(
"sarray")->size();
116 input_keys[i] = key_type{{begin_index, end_index, size}};
118 distinct_input_ranges.insert(input_keys[i]);
122 if(distinct_input_ranges.size() == num_sources_present) {
131 bool is_sarray =
false;
135 size_t output_index_start;
136 size_t output_index_end;
139 size_t begin_index = 0;
140 size_t end_index = 0;
143 std::vector<std::pair<size_t, std::shared_ptr<sarray<flexible_type> > > > sa_v;
146 std::vector<merge_info> map_info;
147 std::map<key_type, size_t> merge_groups;
149 size_t current_output_idx = 0;
150 for(
size_t i = 0; i < n->inputs.size(); ++i) {
151 auto t = n->inputs[i]->type;
152 if(t == planner_node_type::SFRAME_SOURCE_NODE
153 || t == planner_node_type::SARRAY_SOURCE_NODE) {
155 size_t begin_index = n->inputs[i]->p(
"begin_index");
156 size_t end_index = n->inputs[i]->p(
"end_index");
158 const auto& key = input_keys[i];
160 auto it = merge_groups.find(key);
163 if(it == merge_groups.end()) {
164 merge_groups[key] = map_idx = map_info.size();
165 map_info.emplace_back();
166 map_info.back().is_sarray =
true;
167 map_info.back().begin_index = begin_index;
168 map_info.back().end_index = end_index;
170 map_idx = it->second;
175 if(t == planner_node_type::SFRAME_SOURCE_NODE) {
180 map_info[map_idx].sa_v.push_back({current_output_idx, sf.
select_column(i)});
181 ++current_output_idx;
184 map_info[map_idx].sa_v.push_back(
186 n->inputs[i]->any_p<std::shared_ptr<sarray<flexible_type> > >(
"sarray")});
187 ++current_output_idx;
191 map_info.emplace_back();
192 map_info.back().is_sarray =
false;
193 map_info.back().input_index = i;
194 map_info.back().output_index_start = current_output_idx;
195 current_output_idx += n->inputs[i]->num_columns();
196 map_info.back().output_index_end = current_output_idx;
200 size_t num_outputs = current_output_idx;
205 if(map_info.size() == 1) {
206 const auto& m = map_info.front();
209 std::vector<std::shared_ptr<sarray<flexible_type> > > columns(m.sa_v.size());
210 for(
size_t i = 0; i < columns.size(); ++i) {
211 columns[i] = m.sa_v[i].second;
212 if(i + 1 < columns.size()) {
213 DASSERT_EQ(m.sa_v[i].first + 1, m.sa_v[i+1].first);
217 pnode_ptr rep = op_sframe_source::make_planner_node(
sframe(columns), m.begin_index, m.end_index);
218 opt_manager->replace_node(n, rep);
227 std::vector<size_t> output_projection_indices(num_outputs,
size_t(-1));
229 std::vector<pnode_ptr> inputs;
230 inputs.reserve(map_info.size());
232 current_output_idx = 0;
234 for(
const auto& m : map_info) {
236 size_t n_columns = m.sa_v.size();
238 std::vector<std::shared_ptr<sarray<flexible_type> > > columns(n_columns);
240 for(
size_t i = 0; i < n_columns; ++i) {
241 output_projection_indices[m.sa_v[i].first] = current_output_idx;
242 columns[i] = m.sa_v[i].second;
243 ++current_output_idx;
246 inputs.push_back(op_sframe_source::make_planner_node(
sframe(columns), m.begin_index, m.end_index));
249 inputs.push_back(n->inputs[m.input_index]->pnode);
251 for(
size_t i = m.output_index_start; i < m.output_index_end; ++i) {
252 output_projection_indices[i] = current_output_idx;
253 ++current_output_idx;
259 for(
size_t i : output_projection_indices) {
260 DASSERT_NE(i,
size_t(-1));
261 DASSERT_LT(i, num_outputs);
268 pnode_ptr new_pnode = op_union::make_planner_node(inputs);
273 bool all_consecutive =
true;
274 for(
size_t i = 0; i + 1 < output_projection_indices.size(); ++i) {
275 if(output_projection_indices[i] + 1 != output_projection_indices[i + 1]) {
276 all_consecutive =
false;
282 new_pnode = op_project::make_planner_node(new_pnode, output_projection_indices);
284 opt_manager->replace_node(n, new_pnode);
289 class opt_eliminate_singleton_union :
public opt_union_transform {
291 std::string description() {
return "union(a) -> a"; }
297 if(n->inputs.size() == 1) {
298 opt_manager->replace_node(n, n->inputs[0]->pnode);
318 std::string description() {
return "union(project1(a), ..., project2(a)) -> union(project3(a...), ...)"; }
323 if(! n->input_type_present(planner_node_type::PROJECT_NODE, 2))
328 std::set<cnode_info_ptr> project_inputs;
329 bool mergable_projection_present =
false;
330 for(
const auto& nn : n->inputs) {
331 if(nn->type == planner_node_type::PROJECT_NODE) {
332 if(project_inputs.count(nn->inputs[0])) {
333 mergable_projection_present =
true;
336 project_inputs.insert(nn->inputs[0]);
341 if(!mergable_projection_present)
354 std::vector<size_t> indices;
355 std::vector<size_t> output_indices;
358 std::vector<input_info> input_v;
359 input_v.reserve(n->inputs.size());
362 std::map<cnode_info_ptr, size_t> mergable_inputs;
364 size_t current_output_index_start = 0;
365 for(cnode_info_ptr nn : n->inputs) {
367 size_t current_output_index_end = current_output_index_start + nn->num_columns();
369 if(nn->type == planner_node_type::PROJECT_NODE) {
370 auto it = mergable_inputs.find(nn->inputs[0]);
372 if(it != mergable_inputs.end()) {
373 auto& ii = input_v[it->second];
376 const auto& p_iv = nn->p(
"indices").get<
flex_list>();
377 ii.indices.insert(ii.indices.end(), p_iv.begin(), p_iv.end());
379 for(
size_t out_idx = current_output_index_start; out_idx < current_output_index_end; ++out_idx) {
380 ii.output_indices.push_back(out_idx);
384 ii.in = nn->inputs[0];
386 const auto& p_iv = nn->p(
"indices").get<
flex_list>();
387 ii.indices.assign(p_iv.begin(), p_iv.end());
389 ii.output_indices.resize(nn->num_columns());
390 std::iota(ii.output_indices.begin(), ii.output_indices.end(), current_output_index_start);
392 mergable_inputs[nn->inputs[0]] = input_v.size();
393 input_v.push_back(std::move(ii));
396 auto it = mergable_inputs.find(nn);
398 if(it != mergable_inputs.end()) {
399 auto& ii = input_v[it->second];
402 for(
size_t i = 0; i < nn->num_columns(); ++i) {
403 ii.indices.push_back(i);
404 ii.output_indices.push_back(current_output_index_start + i);
410 ii.indices.resize(nn->num_columns());
411 std::iota(ii.indices.begin(), ii.indices.end(), 0);
413 ii.output_indices.resize(nn->num_columns());
414 std::iota(ii.output_indices.begin(), ii.output_indices.end(), current_output_index_start);
416 mergable_inputs[nn] = input_v.size();
417 input_v.push_back(std::move(ii));
421 current_output_index_start = current_output_index_end;
427 std::vector<pnode_ptr> new_inputs;
428 new_inputs.reserve(input_v.size());
430 std::vector<size_t> final_projection(n->num_columns(), size_t(-1));
432 size_t mapped_idx = 0;
433 for(
const auto& ii : input_v) {
434 for(
size_t out_idx : ii.output_indices) {
435 DASSERT_EQ(final_projection[out_idx],
size_t(-1));
436 final_projection[out_idx] = mapped_idx;
440 bool is_contiguous =
true;
441 if(ii.indices.size() == ii.in->num_columns()) {
442 for(
size_t i = 0; i + 1 < ii.indices.size(); ++i) {
443 if(ii.indices[i] + 1 != ii.indices[i+1]) {
444 is_contiguous =
false;
449 is_contiguous =
false;
453 new_inputs.push_back(ii.in->pnode);
455 new_inputs.push_back(op_project::make_planner_node(ii.in->pnode, ii.indices));
462 std::set<size_t> idx_set;
463 for(
size_t idx : final_projection) {
464 DASSERT_NE(idx,
size_t(-1));
470 std::set<pnode_ptr> project_inputs;
471 bool mergable_projection_present =
false;
472 for(
const auto& nn : new_inputs) {
473 if(nn->operator_type == planner_node_type::PROJECT_NODE) {
474 if(project_inputs.count(nn->inputs[0])) {
475 mergable_projection_present =
true;
478 project_inputs.insert(nn->inputs[0]);
492 if(new_inputs.size() == 1)
493 in_node = new_inputs[0];
495 in_node = op_union::make_planner_node(new_inputs);
497 opt_manager->replace_node(n, op_project::make_planner_node(in_node, final_projection));
#define DASSERT_FALSE(cond)
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)