Turi Create  4.0
project.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP
8 
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/storage/query_engine/operators/operator.hpp>
11 #include <core/storage/query_engine/execution/query_context.hpp>
12 #include <core/storage/query_engine/operators/operator_properties.hpp>
13 #include <core/util/coro.hpp>
14 
15 namespace turi {
16 namespace query_eval {
17 
18 /**
19  * \ingroup sframe_query_engine
20  * \addtogroup operators Logical Operators
21  * \{
22  */
23 
24 /**
25  * A "project" operator will project the input source to a subset
26  * of indices.
27  */
28 template <>
29 struct operator_impl<planner_node_type::PROJECT_NODE> : public query_operator {
30  public:
31  DECL_CORO_STATE(execute);
32 
33  inline planner_node_type type() const { return planner_node_type::PROJECT_NODE; }
34 
35  static std::string name() { return "project"; }
36 
37  static query_operator_attributes attributes() {
39  ret.attribute_bitfield = query_operator_attributes::LINEAR;
40  ret.num_inputs = 1;
41  return ret;
42  }
43 
44  inline operator_impl(const std::vector<size_t>& indices): m_indices(indices) {
45  ASSERT_GT(m_indices.size(), 0);
46  };
47 
48  inline std::string print() const {
49  std::stringstream str_indices;
50  for (size_t i : m_indices) str_indices << i << " ";
51  return name() + "(" + str_indices.str() + ")";
52  }
53 
54  inline std::shared_ptr<query_operator> clone() const {
55  return std::make_shared<operator_impl>(*this);
56  }
57 
58  inline bool coro_running() const {
59  return CORO_RUNNING(execute);
60  }
61  inline void execute(query_context& context) {
62  CORO_BEGIN(execute)
63  while (1) {
64  {
65  auto rows = context.get_next(0);
66  if (rows == nullptr)
67  break;
68 
69  auto out = context.get_output_buffer();
70  auto& rows_columns = rows->cget_columns();
71  auto& out_columns = out->get_columns();
72  out_columns.clear();
73  for (size_t i = 0;i < m_indices.size(); ++i) {
74  DASSERT_LT(i, m_indices.size());
75  out_columns.push_back(rows_columns[m_indices[i]]);
76  }
77  context.emit(out);
78  }
79  CORO_YIELD();
80  }
81  CORO_END
82  };
83 
84  static std::shared_ptr<planner_node> make_planner_node(
85  std::shared_ptr<planner_node> input,
86  const std::vector<size_t>& indices) {
87 
88  DASSERT_FALSE(indices.empty());
89 
90  std::vector<flexible_type> flex_indices(indices.begin(), indices.end());
91 
92 #ifndef NDEBUG
93  size_t num_columns = infer_planner_node_num_output_columns(input);
94  for(size_t col_idx : indices)
95  DASSERT_LT(col_idx, num_columns);
96 #endif
97 
98  return planner_node::make_shared(planner_node_type::PROJECT_NODE,
99  {{"indices", flex_indices}},
100  std::map<std::string, any>(),
101  {input});
102  }
103 
104  static std::shared_ptr<query_operator> from_planner_node(
105  std::shared_ptr<planner_node> pnode) {
106  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::PROJECT_NODE);
107  ASSERT_TRUE(pnode->operator_parameters.count("indices"));
108  auto flex_indices = pnode->operator_parameters.at("indices").get<flex_list>();
109  std::vector<size_t> indices(flex_indices.begin(), flex_indices.end());
110  return std::make_shared<operator_impl>(indices);
111  }
112 
113  static std::vector<flex_type_enum> infer_type(
114  std::shared_ptr<planner_node> pnode) {
115  ASSERT_EQ((int)pnode->operator_type,
116  (int)planner_node_type::PROJECT_NODE);
117  auto input_type = infer_planner_node_type(pnode->inputs[0]);
118  auto flex_indices = pnode->operator_parameters.at("indices").get<flex_list>();
119  std::vector<flex_type_enum> ret;
120  for (auto i: flex_indices) {
121  ret.push_back(input_type[i.get<flex_int>()]);
122  }
123  return ret;
124  }
125  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
126  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::PROJECT_NODE);
127  return infer_planner_node_length(pnode->inputs[0]);
128  }
129 
130  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& ) {
131  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::PROJECT_NODE);
132  ASSERT_TRUE(pnode->operator_parameters.count("indices"));
133  auto flex_indices = pnode->operator_parameters["indices"].get<flex_list>();
134 
135  std::ostringstream out;
136  out << "PR(";
137 
138  if(flex_indices.size() > 0) {
139 
140  // Now, make subsequent numbers the same
141  std::vector<std::vector<size_t> > groups{ {flex_indices[0]} };
142 
143  for(size_t i = 1; i < flex_indices.size();++i) {
144  if(flex_indices[i] == groups.back().back() + 1)
145  groups.back().push_back(size_t(flex_indices[i]));
146  else
147  groups.push_back({flex_indices[i]});
148  }
149 
150  bool is_first = true;
151  for(const std::vector<size_t>& grp : groups) {
152  if(!is_first)
153  out << ",";
154 
155  is_first = false;
156 
157  switch(grp.size()) {
158  case 1:
159  out << grp[0];
160  break;
161  case 2:
162  out << grp[0] << ',' << grp[1];
163  break;
164  case 3:
165  out << grp[0] << ',' << grp[1] << ',' << grp[2];
166  break;
167  default:
168  out << grp.front() << ",...," << grp.back();
169  break;
170  }
171  }
172  }
173 
174  out << ')';
175 
176  return out.str();
177  }
178 
179  private:
180  std::vector<size_t> m_indices;
181 };
182 
184 
185 /// \}
186 } // query_eval
187 } // turicreate
188 
189 #endif // TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
std::shared_ptr< query_operator > clone() const
Definition: project.hpp:54
size_t infer_planner_node_num_output_columns(std::shared_ptr< planner_node > pnode)
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::vector< flexible_type > flex_list