Turi Create  4.0
sframe_source.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP
8 
9 #include <sstream>
10 #include <core/data/flexible_type/flexible_type.hpp>
11 #include <core/storage/query_engine/operators/operator.hpp>
12 #include <core/storage/query_engine/operators/sarray_source.hpp>
13 #include <core/storage/query_engine/execution/query_context.hpp>
14 #include <core/storage/query_engine/operators/operator_properties.hpp>
15 #include <core/storage/sframe_data/sframe.hpp>
16 #include <core/util/coro.hpp>
17 
18 namespace turi {
19 namespace query_eval {
20 
21 /**
22  * \ingroup sframe_query_engine
23  * \addtogroup operators Logical Operators
24  * \{
25  */
26 
27 /**
28  * A "sframe_source" operator generates values from a physical sarray.
29  */
30 template <>
31 struct operator_impl<planner_node_type::SFRAME_SOURCE_NODE> : public query_operator {
32  public:
33  DECL_CORO_STATE(execute);
34  size_t start = 0;
35  std::shared_ptr<sframe_rows> rows;
36  size_t block_size = 0;
37  bool skip_next_block = false;
38  size_t end = 0;
39 
40  planner_node_type type() const { return planner_node_type::SFRAME_SOURCE_NODE; }
41 
42  static std::string name() { return "sframe_source"; }
43 
44  inline operator_impl(sframe source, size_t begin_index = 0, size_t end_index = size_t(-1) )
45  : m_source(source)
46  , m_begin_index(begin_index)
47  , m_end_index(end_index == size_t(-1) ? m_source.size() : end_index)
48  { }
49 
50  static query_operator_attributes attributes() {
54  ret.num_inputs = 0;
55  return ret;
56  }
57 
58 
59  inline std::shared_ptr<query_operator> clone() const {
60  return std::make_shared<operator_impl>(m_source);
61  }
62 
63  inline bool coro_running() const {
64  return CORO_RUNNING(execute);
65  }
66  inline void execute(query_context& context) {
67  CORO_BEGIN(execute)
68  if (!m_reader) m_reader = m_source.get_reader();
69  start = m_begin_index;
70  block_size = context.block_size();
71  skip_next_block = context.should_skip();
72 
73  while (start != m_end_index) {
74  rows = context.get_output_buffer();
75  end = std::min(start + block_size, m_end_index);
76  if (skip_next_block == false) {
77  m_reader->read_rows(start, end, *rows);
78  context.emit(rows);
79  CORO_YIELD();
80  } else {
81  context.emit(nullptr);
82  CORO_YIELD();
83  }
84  skip_next_block = context.should_skip();
85  start = end;
86  }
87  CORO_END
88  }
89 
90  static std::shared_ptr<planner_node> make_planner_node(
91  sframe source, size_t begin_index = 0, size_t _end_index = -1) {
92  std::stringstream strm;
93  oarchive oarc(strm);
94  oarc << source.get_index_info();
95  auto types = source.column_types();
96 
97  size_t end_index = (_end_index == size_t(-1)) ? source.size() : _end_index;
98  DASSERT_LE(begin_index, end_index);
99  DASSERT_LE(end_index, source.size());
100 
101  flex_list type_list(types.size());
102  for (size_t i = 0; i < types.size(); ++i) {
103  type_list[i] = flex_int(types[i]);
104  }
105 
106 
107  // we need to keep a copy of the source in the node for reference counting
108  // reasons.
109  return planner_node::make_shared(planner_node_type::SFRAME_SOURCE_NODE,
110  {{"index", strm.str()},
111  {"types", type_list},
112  {"begin_index", begin_index},
113  {"end_index", end_index}},
114  {{"sframe", any(source)}});
115  }
116 
117  static std::shared_ptr<query_operator> from_planner_node(
118  std::shared_ptr<planner_node> pnode) {
119  ASSERT_EQ((int)pnode->operator_type,
120  (int)planner_node_type::SFRAME_SOURCE_NODE);
121 
122  ASSERT_TRUE(pnode->any_operator_parameters.count("sframe"));
123  auto source = pnode->any_operator_parameters.at("sframe").as<sframe>();
124 
125  size_t begin_index = pnode->operator_parameters.at("begin_index");
126  size_t end_index = pnode->operator_parameters.at("end_index");
127 
128  return std::make_shared<operator_impl>(source, begin_index, end_index);
129  }
130 
131  static std::vector<flex_type_enum> infer_type(
132  std::shared_ptr<planner_node> pnode) {
133  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::SFRAME_SOURCE_NODE);
134  flex_list type = pnode->operator_parameters.at("types");
135  std::vector<flex_type_enum> ret;
136  for (auto t: type) ret.push_back((flex_type_enum)(flex_int)(t));
137  return ret;
138  }
139 
140  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
141  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::SFRAME_SOURCE_NODE);
142  flex_int length = (pnode->operator_parameters.at("end_index")
143  - pnode->operator_parameters.at("begin_index"));
144  return length;
145  }
146 
147  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
148  std::ostringstream out;
149 
150  auto source = pnode->any_operator_parameters.at("sframe").as<sframe>();
151 
152  out << "SF(";
153 
154  if(source.num_columns() > 0) {
155  std::vector<size_t> columns(source.num_columns());
156 
157  for(size_t i = 0; i < source.num_columns(); ++i) {
158  columns[i] = op_sarray_source::unique_sarray_tag(source.select_column(i));
159  }
160 
161  // Now, make subsequent numbers the same
162  std::vector<std::vector<size_t> > groups{ {columns[0]} };
163 
164  for(size_t i = 1; i < columns.size();++i) {
165  if(columns[i] == groups.back().back() + 1)
166  groups.back().push_back(columns[i]);
167  else
168  groups.push_back({columns[i]});
169  }
170 
171  bool is_first = true;
172  for(const std::vector<size_t>& grp : groups) {
173  if(!is_first)
174  out << ",";
175 
176  is_first = false;
177 
178  switch(grp.size()) {
179  case 1:
180  out << "S" << grp[0];
181  break;
182  case 2:
183  out << "S" << grp[0] << ",S" << grp[1];
184  break;
185  default:
186  out << "S" << grp.front() << ",...,S" << grp.back();
187  break;
188  }
189  }
190 
191  }
192  out << ")";
193 
194  size_t begin_index = pnode->operator_parameters.at("begin_index");
195  size_t end_index = pnode->operator_parameters.at("end_index");
196 
197  if(begin_index != 0 || end_index != source.num_rows()) {
198  out << "[" << begin_index << "," << end_index << "]";
199  }
200 
201  return out.str();
202  }
203 
204  private:
205  sframe m_source;
206  size_t m_begin_index, m_end_index;
207  std::shared_ptr<sframe_reader> m_reader;
208 };
209 
211 
212 /// \}
213 } // query_eval
214 } // turicreate
215 
216 #endif // TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t size() const
Definition: sframe.hpp:354
size_t num_rows() const
Returns the length of each sarray.
Definition: sframe.hpp:346
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
const sframe_index_file_information get_index_info() const
Definition: sframe.hpp:472
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
Definition: sframe.hpp:341
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > column_types() const
Definition: sframe.hpp:407
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
std::vector< flexible_type > flex_list