6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_PROJECTION_TRANSFORMS_H_ 7 #define TURI_SFRAME_QUERY_OPTIMIZATION_PROJECTION_TRANSFORMS_H_ 9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp> 10 #include <core/storage/query_engine/planning/optimization_engine.hpp> 11 #include <core/storage/query_engine/operators/all_operators.hpp> 12 #include <core/storage/query_engine/planning/optimization_node_info.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 14 #include <core/data/flexible_type/flexible_type.hpp> 19 namespace query_eval {
21 class opt_project_transform :
public opt_transform {
23 return (t == planner_node_type::PROJECT_NODE);
31 std::string description() {
return "project(source) -> source"; }
35 if(n->inputs[0]->type != planner_node_type::SFRAME_SOURCE_NODE)
38 auto flex_indices = n->p(
"indices").get<
flex_list>();
44 if(flex_indices.size() > old_sf.num_columns())
47 std::vector<std::shared_ptr<sarray<flexible_type> > > columns;
49 for(
const auto& idx_f : flex_indices) {
51 DASSERT_LT(idx, old_sf.num_columns());
52 columns.push_back(old_sf.select_column(idx));
57 size_t begin_index = n->inputs[0]->p(
"begin_index");
58 size_t end_index = n->inputs[0]->p(
"end_index");
60 pnode_ptr new_pnode = op_sframe_source::make_planner_node(
sframe(columns), begin_index, end_index);
62 opt_manager->replace_node(n, new_pnode);
72 std::string description() {
return "project(a, {0,1,...,num_columns(a)}) -> a"; }
76 const auto& flex_indices = n->p(
"indices").get<
flex_list>();
78 size_t nc = n->inputs[0]->num_columns();
80 if(flex_indices.size() != nc)
83 for(
size_t idx = 0; idx < flex_indices.size(); ++idx) {
84 if(flex_indices[idx].get<flex_int>() !=
flex_int(idx))
88 opt_manager->replace_node(n, n->inputs[0]->pnode);
98 std::string description() {
return "project1(project2(a)) -> project3(a)"; }
102 DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
104 if(n->inputs[0]->type != planner_node_type::PROJECT_NODE)
107 const auto& iv_1 = n->inputs[0]->p(
"indices").get<
flex_list>();
108 const auto& iv_2 = n->p(
"indices").get<
flex_list>();
114 bool first_is_contraction = (n->inputs[0]->inputs[0]->num_columns() > iv_1.size());
115 bool second_is_expansion = (iv_2.size() > iv_1.size());
117 if(first_is_contraction && second_is_expansion)
120 std::vector<size_t> iv_out;
122 for(
const auto& idx_f : iv_2) {
124 DASSERT_LT(idx, iv_1.size());
125 iv_out.push_back((
size_t)(iv_1[idx]));
128 pnode_ptr out = op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_out);
130 opt_manager->replace_node(n, out);
141 std::string description() {
return "project(append(a,b)) -> append(project(a), project(b))"; }
145 DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
147 if(n->inputs[0]->type != planner_node_type::APPEND_NODE)
151 const auto& iv = n->p(
"indices").get<
flex_list>();
154 if(iv.size() > n->inputs[0]->num_columns())
157 std::vector<size_t> iv_s(iv.begin(), iv.end());
160 op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_s),
161 op_project::make_planner_node(n->inputs[0]->inputs[1]->pnode, iv_s));
163 opt_manager->replace_node(n, out);
171 class opt_project_logical_filter_exchange :
public opt_project_transform {
173 std::string description() {
return "project(logical_filter(a), mask) -> logical_filter(project(a), mask)"; }
177 DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
179 if(n->inputs[0]->type != planner_node_type::LOGICAL_FILTER_NODE)
182 const auto& iv = n->p(
"indices").get<
flex_list>();
184 if(iv.size() > n->inputs[0]->num_columns())
187 std::vector<size_t> iv_s(iv.begin(), iv.end());
189 pnode_ptr out = op_logical_filter::make_planner_node(
190 op_project::make_planner_node(n->inputs[0]->inputs[0]->pnode, iv_s),
191 n->inputs[0]->inputs[1]->pnode);
193 opt_manager->replace_node(n, out);
222 std::string description() {
return "partitionable_project(union(a,...)) ?->? union(project1(a), ...)"; }
226 DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
228 if(n->inputs[0]->type != planner_node_type::UNION_NODE)
231 cnode_info_ptr u_node = n->inputs[0];
237 const auto& out_iv = n->p(
"indices").get<
flex_list>();
239 std::vector<int> output_used(u_node->num_columns(),
false);
241 for(
const auto& idx : out_iv) {
242 DASSERT_LT(idx, output_used.size());
243 output_used[idx.get<
flex_int>()] =
true;
247 if(std::all_of(output_used.begin(), output_used.end(), [](
int t) {
return t; } ) )
251 std::vector<size_t> remapped_indices(u_node->num_columns(), size_t(-1));
253 std::vector<std::vector<size_t> > projections_by_input(u_node->inputs.size());
254 std::vector<bool> input_needs_projection(u_node->inputs.size(),
false);
259 size_t current_offset = 0;
260 size_t current_input = 0;
261 size_t current_input_idx = 0;
262 for(
size_t i = 0; i < u_node->num_columns(); ++i) {
265 projections_by_input[current_input].push_back(current_input_idx);
266 remapped_indices[i] = i - current_offset;
269 input_needs_projection[current_input] =
true;
274 if(current_input_idx == u_node->inputs[current_input]->num_columns()) {
276 current_input_idx = 0;
284 std::vector<pnode_ptr> inputs(u_node->inputs.size());
286 for(
size_t i = 0; i < u_node->inputs.size(); ++i) {
287 if(projections_by_input[i].empty()) {
289 }
else if(input_needs_projection[i]) {
290 inputs[i] = op_project::make_planner_node(u_node->inputs[i]->pnode, projections_by_input[i]);
292 inputs[i] = u_node->inputs[i]->pnode;
297 auto new_end = std::remove_if(inputs.begin(), inputs.end(),
298 [](
const pnode_ptr& p) {
return p ==
nullptr; });
299 inputs.resize(new_end - inputs.begin());
304 std::vector<size_t> new_projection_indices(out_iv.size());
306 for(
size_t i = 0; i < out_iv.size(); ++i) {
307 DASSERT_LT(out_iv[i], remapped_indices.size());
308 DASSERT_NE(remapped_indices[out_iv[i]],
size_t(-1));
310 new_projection_indices[i] = remapped_indices[out_iv[i]];
314 if(inputs.size() == 1) {
315 opt_manager->replace_node(
316 n, op_project::make_planner_node(inputs[0], new_projection_indices));
319 opt_manager->replace_node(
320 n, op_project::make_planner_node(
321 op_union::make_planner_node(inputs), new_projection_indices));
335 std::string description() {
336 return "project(a, ...) ?->? expanding_project(contracting_project(a, ...), ...)";
341 DASSERT_TRUE(n->type == planner_node_type::PROJECT_NODE);
345 const auto& iv = n->p(
"indices").get<
flex_list>();
347 std::set<size_t> used_indices(iv.begin(), iv.end());
349 size_t n_cols = n->inputs[0]->num_columns();
351 if( ! (used_indices.size() < n_cols && iv.size() >= n_cols) )
354 std::vector<size_t> indices_1;
355 indices_1.reserve(used_indices.size());
356 std::vector<size_t> indices_2(iv.size());
358 std::map<size_t, size_t> idx_map;
360 for(
size_t i = 0; i < iv.size(); ++i) {
363 auto it = idx_map.lower_bound(idx);
365 if(it != idx_map.end() && it->first == idx) {
366 indices_2[i] = it->second;
368 size_t new_idx = indices_1.size();
369 idx_map.insert(it, {idx, new_idx});
370 indices_1.push_back(idx);
371 indices_2[i] = new_idx;
375 pnode_ptr p_1 = op_project::make_planner_node(n->inputs[0]->pnode, indices_1);
376 pnode_ptr p_2 = op_project::make_planner_node(p_1, indices_2);
378 opt_manager->replace_node(n, p_2);
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_planner_node(std::shared_ptr< planner_node > left, std::shared_ptr< planner_node > right)