Turi Create  4.0
range.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_RANGE_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_RANGE_HPP
8 #include <core/data/flexible_type/flexible_type.hpp>
9 #include <core/storage/query_engine/operators/operator.hpp>
10 #include <core/storage/query_engine/execution/query_context.hpp>
11 #include <core/storage/query_engine/operators/operator_properties.hpp>
12 #include <core/util/coro.hpp>
13 namespace turi {
14 namespace query_eval {
15 
16 /**
17  * \ingroup sframe_query_engine
18  * \addtogroup operators Logical Operators
19  * \{
20  */
21 
22 /**
23  * A "range" operator which simply generates a range of integer values.
24  */
25 template <>
26 struct operator_impl<planner_node_type::RANGE_NODE> : public query_operator {
27  public:
28  DECL_CORO_STATE(execute);
29  flex_int cur;
30 
31  inline planner_node_type type() const { return planner_node_type::RANGE_NODE; }
32 
33  static std::string name() { return "range"; }
34 
35  static query_operator_attributes attributes() {
38  ret.num_inputs = 0;
39  return ret;
40  }
41 
42  inline operator_impl(flex_int start, flex_int end)
43  : m_start(start)
44  , m_end(end)
45  {
46  ASSERT_LE(m_start, m_end);
47  }
48 
49  inline std::shared_ptr<query_operator> clone() const {
50  return std::make_shared<operator_impl>(m_start, m_end);
51  }
52  inline bool coro_running() const {
53  return CORO_RUNNING(execute);
54  }
55 
56  inline void execute(query_context& context) {
57 
58  CORO_BEGIN(execute)
59  cur = m_start;
60 
61  while(cur < m_end) {
62  {
63  auto ret = context.get_output_buffer();
64  size_t len = std::min<size_t>(m_end - cur, context.block_size());
65 
66  ret->resize(1, len);
67  for (auto& value: *(ret->get_columns()[0])) {
68  value = cur;
69  ++cur;
70  }
71  context.emit(ret);
72  }
73  CORO_YIELD();
74  }
75  CORO_END
76  }
77 
78  static std::shared_ptr<planner_node> make_planner_node(
79  flex_int start, flex_int end) {
80  return planner_node::make_shared(planner_node_type::RANGE_NODE,
81  {{"start", start},
82  {"begin_index", 0},
83  {"end_index", end - start}});
84  }
85 
86  static std::shared_ptr<query_operator> from_planner_node(
87  std::shared_ptr<planner_node> pnode) {
88  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::RANGE_NODE);
89  ASSERT_TRUE(pnode->operator_parameters.count("start"));
90  ASSERT_TRUE(pnode->operator_parameters.count("begin_index"));
91  ASSERT_TRUE(pnode->operator_parameters.count("end_index"));
92 
93  flex_int start = (flex_int)pnode->operator_parameters["start"];
94  size_t begin_index = pnode->operator_parameters["begin_index"];
95  size_t end_index = pnode->operator_parameters["end_index"];
96  return std::make_shared<operator_impl>(start + begin_index,
97  start + end_index);
98  }
99 
100  static std::vector<flex_type_enum> infer_type(
101  std::shared_ptr<planner_node> pnode) {
102  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::RANGE_NODE);
103  return {flex_type_enum::INTEGER};
104  }
105 
106 
107  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
108  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::RANGE_NODE);
109  size_t count = pnode->operator_parameters["end_index"] -
110  pnode->operator_parameters["begin_index"];
111  return count;
112  }
113 
114  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
115  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::RANGE_NODE);
116  ASSERT_TRUE(pnode->operator_parameters.count("start"));
117  ASSERT_TRUE(pnode->operator_parameters.count("begin_index"));
118  ASSERT_TRUE(pnode->operator_parameters.count("end_index"));
119 
120  flex_int start = pnode->operator_parameters["start"];
121  size_t begin_index = pnode->operator_parameters["begin_index"];
122  size_t end_index = pnode->operator_parameters["end_index"];
123 
124  std::ostringstream ss;
125 
126  ss << "Sequence(" << start << ")[" << begin_index << ":" << end_index << "]";
127 
128  return ss.str();
129  }
130 
131  private:
132  // m_start to m_end defines the range
133  flex_int m_start; // inclusive
134  flex_int m_end; // exclusive
135 };
136 
138 
139 /// \}
140 } // query_eval
141 } // turicreate
142 
143 #endif // TURI_SFRAME_QUERY_MANAGER_RANGE_HPP
std::shared_ptr< query_operator > clone() const
Definition: range.hpp:49
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())