6 #ifndef TURI_SFRAME_QUERY_OPTIMIZATION_SOURCE_TRANSFORMS_H_ 7 #define TURI_SFRAME_QUERY_OPTIMIZATION_SOURCE_TRANSFORMS_H_ 9 #include <core/storage/query_engine/planning/optimizations/optimization_transforms.hpp> 10 #include <core/storage/query_engine/planning/optimization_engine.hpp> 11 #include <core/storage/query_engine/operators/all_operators.hpp> 12 #include <core/storage/query_engine/planning/optimization_node_info.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 14 #include <core/data/flexible_type/flexible_type.hpp> 19 namespace query_eval {
40 std::string description() {
return "source_a, ..., source_a -> source_a"; }
44 return (t == planner_node_type::IDENTITY_NODE);
47 void fill_source_sets(
const cnode_info_ptr& n,
48 std::set<const node_info*>& seen, std::vector<cnode_info_ptr>& source_nodes) {
50 auto it = seen.lower_bound(n.get());
52 if(it != seen.end() && *it == n.get())
55 seen.insert(it, n.get());
57 if(n->is_source_node())
58 source_nodes.push_back(n);
60 for(
const auto& nn : n->inputs) {
61 fill_source_sets(nn, seen, source_nodes);
69 std::vector<cnode_info_ptr> source_nodes;
70 std::set<const node_info*> seen_nodes;
71 fill_source_sets(n, seen_nodes, source_nodes);
79 bool operator<(
const source_id& s)
const {
80 return (std::make_pair(ptr_key, std::make_pair(begin_index, end_index))
81 < std::make_pair(s.ptr_key, std::make_pair(s.begin_index, s.end_index)));
86 cnode_info_ptr src_node;
90 std::map<source_id, std::vector<source_out> > all_sources;
92 for(
const cnode_info_ptr& sn : source_nodes) {
95 case planner_node_type::SFRAME_SOURCE_NODE: {
96 size_t begin_index = sn->p(
"begin_index");
97 size_t end_index = sn->p(
"end_index");
103 id.begin_index = begin_index;
104 id.end_index = end_index;
108 out.column_index = i;
110 all_sources[id].push_back(out);
116 case planner_node_type::SARRAY_SOURCE_NODE: {
117 auto sa = sn->any_p<std::shared_ptr<sarray<flexible_type> > >(
"sarray");
120 id.ptr_key = ptrdiff_t(sa.get());
121 id.begin_index = sn->p(
"begin_index");
122 id.end_index = sn->p(
"end_index");
126 out.column_index = 0;
128 all_sources[id].push_back(out);
133 case planner_node_type::RANGE_NODE: {
137 id.ptr_key = int(planner_node_type::RANGE_NODE);
138 id.begin_index = sn->p(
"start") + sn->p(
"begin_index");
139 id.end_index = sn->p(
"start") + sn->p(
"end_index");
143 out.column_index = 0;
145 all_sources[id].push_back(out);
156 bool change_occured =
false;
157 for(
auto p : all_sources) {
159 const source_id&
id = p.first;
160 const std::vector<source_out>& out_v = p.second;
162 if(out_v.size() == 1) {
170 size_t sarray_count = 0;
171 size_t sframe_count = 0;
172 size_t range_node_count = 0;
174 for(
const source_out& out : out_v) {
175 switch(out.src_node->type) {
178 case planner_node_type::SFRAME_SOURCE_NODE: {
179 if(out.src_node->num_columns() == 1)
186 case planner_node_type::SARRAY_SOURCE_NODE:
190 case planner_node_type::RANGE_NODE:
198 if(range_node_count > 0) {
200 DASSERT_EQ(sarray_count, 0);
201 DASSERT_EQ(sframe_count, 0);
203 for(
size_t i = 1; i < out_v.size(); ++i) {
204 opt_manager->replace_node(out_v[i].src_node, out_v[0].src_node->pnode);
208 }
else if(sframe_count == 0) {
210 DASSERT_EQ(sarray_count, out_v.size());
212 for(
size_t i = 1; i < out_v.size(); ++i) {
213 opt_manager->replace_node(out_v[i].src_node, out_v[0].src_node->pnode);
215 change_occured =
true;
217 }
else if(sframe_count == 1) {
220 size_t sframe_index = 0;
222 for(
size_t i = 1; i < out_v.size(); ++i) {
223 if(out_v[i].src_node->type == planner_node_type::SFRAME_SOURCE_NODE
224 && out_v[i].src_node->num_columns() > 1) {
230 pnode_ptr out_project = op_project::make_planner_node(
231 out_v[sframe_index].src_node->pnode, {out_v[sframe_index].column_index});
233 for(
size_t i = 0; i < out_v.size(); ++i) {
234 if(i != sframe_index)
235 opt_manager->replace_node(out_v[i].src_node, out_project);
238 change_occured =
true;
247 std::map<void*, size_t> idx_map;
249 std::vector<std::vector<size_t> > projections(out_v.size());
250 std::vector<std::shared_ptr<sarray<flexible_type> > > new_columns;
252 auto get_index = [&](
const std::shared_ptr<sarray<flexible_type> >& s) ->
size_t {
253 auto it = idx_map.lower_bound(s.get());
254 if(it != idx_map.end() && it->first == s.get()) {
257 size_t idx = idx_map.size();
258 idx_map.insert(it, {s.get(), idx});
259 new_columns.push_back(s);
264 for(
size_t i = 0; i < out_v.size(); ++i) {
265 if(out_v[i].src_node->type == planner_node_type::SFRAME_SOURCE_NODE) {
274 }
else if(out_v[i].src_node->type == planner_node_type::SARRAY_SOURCE_NODE) {
275 auto sa = out_v[i].src_node->any_p<std::shared_ptr<sarray<flexible_type> > >(
"sarray");
277 projections[i] = {get_index(sa)};
285 pnode_ptr sf_src = op_sframe_source::make_planner_node(
sframe(new_columns),
id.begin_index,
id.end_index);
287 for(
size_t i = 0; i < out_v.size(); ++i) {
288 pnode_ptr rep_node = op_project::make_planner_node(sf_src, projections[i]);
289 opt_manager->replace_node(out_v[i].src_node, rep_node);
297 return change_occured;
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
std::shared_ptr< planner_node > pnode_ptr
A handy typedef.
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
#define DASSERT_TRUE(cond)