Turi Create  4.0
constant.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_CONSTANT_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_CONSTANT_HPP
8 #include <core/logging/assertions.hpp>
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/storage/query_engine/operators/operator.hpp>
11 #include <core/storage/query_engine/execution/query_context.hpp>
12 #include <core/storage/query_engine/operators/operator_properties.hpp>
13 #include <core/util/coro.hpp>
14 namespace turi {
15 namespace query_eval {
16 
17 /**
18  * \ingroup sframe_query_engine
19  * \addtogroup operators Logical Operators
20  * \{
21  */
22 
23 /**
24  * A "constant" operator which simply generates "len" instances of a particular
25  * value.
26  */
27 template <>
28 struct operator_impl<planner_node_type::CONSTANT_NODE> : public query_operator {
29  public:
30 
31  DECL_CORO_STATE(execute);
32  std::shared_ptr<sframe_rows> retbuf;
33  size_t i;
34  size_t len;
35 
36  planner_node_type type() const { return planner_node_type::CONSTANT_NODE; }
37 
38  static std::string name() { return "constant"; }
39 
40  static query_operator_attributes attributes() {
43  ret.num_inputs = 0;
44  return ret;
45  }
46 
47  inline operator_impl(flexible_type value, size_t len)
48  : m_value(value)
49  , m_len(len)
50  { }
51 
52  inline std::string print() const {
53  return std::string("constant(") + (std::string)m_value + ")";
54  }
55 
56  inline std::shared_ptr<query_operator> clone() const {
57  return std::make_shared<operator_impl>(m_value, m_len);
58  }
59 
60  inline bool coro_running() const {
61  return CORO_RUNNING(execute);
62  }
63  inline void execute(query_context& context) {
64  CORO_BEGIN(execute)
65  i = 0;
66  while(i < m_len) {
67  retbuf = context.get_output_buffer();
68  len = std::min(m_len - i, context.block_size());
69 
70  retbuf->resize(1, len);
71  for (auto& value: *(retbuf->get_columns()[0])) value = m_value;
72  context.emit(retbuf);
73  CORO_YIELD();
74  i += len;
75  }
76  CORO_END
77  }
78 
79  static std::shared_ptr<planner_node> make_planner_node(const flexible_type& val,
80  flex_type_enum type,
81  size_t count) {
82  ASSERT_TRUE(val.get_type() == type || val.get_type() == flex_type_enum::UNDEFINED);
83  return planner_node::make_shared(planner_node_type::CONSTANT_NODE,
84  {{"value", val},
85  {"type", (int)type},
86  {"begin_index", 0},
87  {"end_index", count}});
88  }
89 
90  static std::shared_ptr<query_operator> from_planner_node(
91  std::shared_ptr<planner_node> pnode) {
92 
93  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::CONSTANT_NODE);
94  ASSERT_TRUE(pnode->operator_parameters.count("value"));
95  ASSERT_TRUE(pnode->operator_parameters.count("begin_index"));
96  ASSERT_TRUE(pnode->operator_parameters.count("end_index"));
97  ASSERT_TRUE(pnode->operator_parameters.count("type"));
98  size_t count = (flex_int)pnode->operator_parameters["end_index"] -
99  (flex_int)pnode->operator_parameters["begin_index"];
100  return std::make_shared<operator_impl>(pnode->operator_parameters["value"], count);
101  }
102 
103  static std::vector<flex_type_enum> infer_type(
104  std::shared_ptr<planner_node> pnode) {
105  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::CONSTANT_NODE);
106  ASSERT_TRUE(pnode->operator_parameters.count("type"));
107  return {(flex_type_enum)(int)(pnode->operator_parameters["type"])};
108  }
109 
110  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
111  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::CONSTANT_NODE);
112  size_t count = (flex_int)pnode->operator_parameters["end_index"] -
113  (flex_int)pnode->operator_parameters["begin_index"];
114  return count;
115  }
116 
117  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
118  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::CONSTANT_NODE);
119  ASSERT_TRUE(pnode->operator_parameters.count("value"));
120  ASSERT_TRUE(pnode->operator_parameters.count("begin_index"));
121  ASSERT_TRUE(pnode->operator_parameters.count("end_index"));
122 
123  flexible_type v = pnode->operator_parameters["value"];
124  flex_int begin_index = pnode->operator_parameters["begin_index"];
125  flex_int end_index = pnode->operator_parameters["end_index"];
126 
127  std::ostringstream ss;
128 
129  ss << "Const(" << v << ")[" << begin_index << ":" << end_index << "]";
130 
131  return ss.str();
132  }
133 
134  private:
135  flexible_type m_value;
136  size_t m_len = 0;
137 };
138 
140 
141 /// \}
142 } // query_eval
143 } // turicreate
144 
145 #endif // TURI_SFRAME_QUERY_MANAGER_CONSTANT_HPP
int num_inputs
Number of inputs expected to the operator.
Definition: operator.hpp:56
size_t attribute_bitfield
A bitfield of the attribute enum.
Definition: operator.hpp:55
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
flex_type_enum get_type() const
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())