Turi Create  4.0
sarray_source.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP
7 #define TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP
8 #include <sstream>
9 #include <core/storage/serialization/serialization_includes.hpp>
10 #include <core/data/flexible_type/flexible_type.hpp>
11 #include <core/storage/query_engine/operators/operator.hpp>
12 #include <core/storage/query_engine/execution/query_context.hpp>
13 #include <core/storage/query_engine/planning/planner_node.hpp>
14 #include <core/storage/query_engine/operators/operator_properties.hpp>
15 #include <core/storage/fileio/fs_utils.hpp>
16 #include <core/storage/sframe_data/sarray.hpp>
17 #include <core/util/coro.hpp>
18 
19 namespace turi {
20 namespace query_eval {
21 
22 
23 template <>
24 struct operator_impl<planner_node_type::SARRAY_SOURCE_NODE> : public query_operator{
25 
26 /**
27  * \ingroup sframe_query_engine
28  * \addtogroup operators Logical Operators
29  * \{
30  */
31 
32  /**
33  * A "sarray_source" operator generates values from a physical sarray.
34  */
35  public:
36  DECL_CORO_STATE(execute);
37  size_t start = 0;
38  size_t block_size = 0;
39  bool skip_next_block = false;
40  size_t end;
41  std::shared_ptr<sframe_rows> rows;
42 
43  planner_node_type type() const { return planner_node_type::SARRAY_SOURCE_NODE; }
44 
45  static std::string name() { return "sarray_source"; }
46 
47  inline operator_impl(std::shared_ptr<sarray<flexible_type> > source,
48  size_t begin_index = 0, size_t end_index = size_t(-1) )
49  : m_source(source)
50  , m_begin_index(begin_index)
51  , m_end_index(end_index == size_t(-1) ? m_source->size() : end_index)
52  { }
53 
54  static query_operator_attributes attributes() {
55  query_operator_attributes ret;
56  ret.attribute_bitfield = query_operator_attributes::SOURCE |
58  ret.num_inputs = 0;
59  return ret;
60  }
61 
62  inline std::shared_ptr<query_operator> clone() const {
63  return std::make_shared<operator_impl>(m_source);
64  }
65 
66  inline bool coro_running() const {
67  return CORO_RUNNING(execute);
68  }
69  inline void execute(query_context& context) {
70  CORO_BEGIN(execute)
71  if (!m_reader) m_reader = m_source->get_reader();
72  start = m_begin_index;
73  block_size = context.block_size();
74  skip_next_block = context.should_skip();
75 
76  while (start != m_end_index) {
77  rows = context.get_output_buffer();
78  end = std::min(start + block_size, m_end_index);
79  if (skip_next_block == false) {
80  m_reader->read_rows(start, end, *rows);
81  context.emit(rows);
82  CORO_YIELD();
83  } else {
84  context.emit(nullptr);
85  CORO_YIELD();
86  }
87  skip_next_block = context.should_skip();
88  start = end;
89  }
90  CORO_END
91  }
92 
93  static std::shared_ptr<planner_node> make_planner_node(
94  std::shared_ptr<sarray<flexible_type> > source, size_t begin_index = 0, size_t _end_index = -1) {
95  std::stringstream strm;
96  oarchive oarc(strm);
97  oarc << source->get_index_info();
98  auto type = source->get_type();
99 
100  size_t end_index = (_end_index == size_t(-1)) ? source->size() : _end_index;
101 
102  DASSERT_LE(begin_index, end_index);
103  DASSERT_LE(end_index, source->size());
104 
105  // we need to keep a copy of the source in the node for reference counting
106  // reasons.
107  return planner_node::make_shared(planner_node_type::SARRAY_SOURCE_NODE,
108  {{"index", strm.str()},
109  {"type", (flex_int)type},
110  {"begin_index", begin_index},
111  {"end_index", end_index}},
112  {{"sarray", any(source)}});
113  }
114 
115  static std::shared_ptr<query_operator> from_planner_node(
116  std::shared_ptr<planner_node> pnode) {
117  ASSERT_EQ((int)pnode->operator_type,
118  (int)planner_node_type::SARRAY_SOURCE_NODE);
119  ASSERT_TRUE(pnode->any_operator_parameters.count("sarray"));
120  auto source = pnode->any_operator_parameters["sarray"]
121  .as<std::shared_ptr<sarray<flexible_type>>>();
122 
123  size_t begin_index = pnode->operator_parameters.at("begin_index");
124  size_t end_index = pnode->operator_parameters.at("end_index");
125 
126  return std::make_shared<operator_impl>(source, begin_index, end_index);
127  }
128 
129  static std::vector<flex_type_enum> infer_type(
130  std::shared_ptr<planner_node> pnode) {
131  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::SARRAY_SOURCE_NODE);
132  flex_type_enum type =
133  (flex_type_enum)(flex_int)(pnode->operator_parameters["type"]);
134  return {type};
135  }
136 
137  static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
138  ASSERT_EQ((int)pnode->operator_type, (int)planner_node_type::SARRAY_SOURCE_NODE);
139  flex_int length = (pnode->operator_parameters.at("end_index")
140  - pnode->operator_parameters.at("begin_index"));
141  return length;
142  }
143 
144  /** Given an sarray, returns a small number uniquely associated with
145  * that sarray. This number is unique over the course of the
146  * program run.
147  */
148  static size_t unique_sarray_tag(const std::shared_ptr<sarray<flexible_type> >& sa) {
149  static mutex access_lock;
150  std::lock_guard<mutex> _lg(access_lock);
151 
152  static size_t current_number = 0;
153  static std::map<ptrdiff_t, std::pair<std::weak_ptr<sarray<flexible_type> >, size_t> > tracked_numbers;
154 
155  ptrdiff_t key = ptrdiff_t(sa.get());
156 
157  auto it = tracked_numbers.find(key);
158 
159  if(it != tracked_numbers.end()) {
160  if(!it->second.first.expired())
161  return it->second.second;
162  else
163  tracked_numbers.erase(it);
164  }
165 
166  ++current_number;
167 
168  // Purge out expired weak pointers if they are present
169  if(current_number % 256 == 0) {
170  for(auto it = tracked_numbers.begin(); it != tracked_numbers.end();) {
171  if(it->second.first.expired())
172  it = tracked_numbers.erase(it);
173  else
174  ++it;
175  }
176  }
177 
178  tracked_numbers[key] = {sa, current_number};
179  return current_number;
180  }
181 
182  static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
183  std::ostringstream out;
184 
185  auto source = pnode->any_operator_parameters["sarray"]
186  .as<std::shared_ptr<sarray<flexible_type> > >();
187 
188  out << "S" << unique_sarray_tag(source);
189 
190  size_t begin_index = pnode->operator_parameters.at("begin_index");
191  size_t end_index = pnode->operator_parameters.at("end_index");
192 
193  if(begin_index != 0 || end_index != source->size()) {
194  out << "[" << begin_index << "," << end_index << "]";
195  }
196  return out.str();
197  }
198 
199  private:
200  std::shared_ptr<sarray<flexible_type>> m_source;
201  size_t m_begin_index, m_end_index;
202  std::shared_ptr<sarray_reader<flexible_type>> m_reader;
203 };
204 
205 typedef operator_impl<planner_node_type::SARRAY_SOURCE_NODE> op_sarray_source;
206 
207 /// \}
208 } // query_eval
209 } // turicreate
210 
211 #endif // TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP
static size_t unique_sarray_tag(const std::shared_ptr< sarray< flexible_type > > &sa)
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())