Turi Create  4.0
execution_node.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_ENGINE_OPERATORS_EXECUTION_NODE_HPP
7 #define TURI_SFRAME_QUERY_ENGINE_OPERATORS_EXECUTION_NODE_HPP
8 
9 #include <memory>
10 #include <vector>
11 #include <queue>
12 
13 
14 #include <core/data/flexible_type/flexible_type.hpp>
15 #include <core/storage/query_engine/operators/operator.hpp>
16 #include <core/storage/query_engine/util/broadcast_queue.hpp>
17 
18 namespace turi {
19 class sframe_rows;
20 
21 template <>
22 struct broadcast_queue_serializer<std::shared_ptr<sframe_rows>> {
23  void save(oarchive& oarc, const std::shared_ptr<sframe_rows>& t) {
24  oarc << (*t);
25  }
26  void load(iarchive& iarc, std::shared_ptr<sframe_rows>& t) {
27  t = std::make_shared<sframe_rows>();
28  iarc >> (*t);
29  }
30 };
31 
32 namespace query_eval {
33 
34 
35 class query_context;
36 
37 /**
38  * \ingroup sframe_query_engine
39  * \addtogroup execution Execution
40  * \{
41  */
42 
43 /**
44  * The execution node provides a wrapper around an operator. It
45  * - manages the coroutine context for the operator
46  * - manages the connections between the operator and its inputs and outputs.
47  * - Manages the buffering and transfer of information between the operator,
48  *
49  * \subsection execution_node_coroutines Coroutines
50  *
51  * Essentially, calling a coroutine, causes a context switch to occur
52  * starting the coroutine. Then within the coroutine, a "sink()" function can be
53  * called which context switches and resumes execution *where the coroutine was
54  * initially triggered*.
55  *
56  * The classical example is a producer-consumer queue
57  * \code
58  * void producer() {
59  * while(1) {
60  * a = new work
61  * consumer(a); // or sink(a) in the above syntax
62  * }
63  * }
64  *
65  * void consumer() {
66  * while(1) {
67  * a = producer();
68  * // do work on a
69  * }
70  * }
71  *
72  * \endcode
73  *
74  * Here, we are using coroutines to attach and communicate between query
75  * operators, so for instance, here is a simple transform on a source.
76  * \code
77  * void data_source() {
78  * while(data_source_has_rows) {
79  * rows = read_rows
80  * sink(rows);
81  * }
82  * }
83  *
84  * void transform() {
85  * while(1) {
86  * data = source()
87  * if (data == nullptr) {
88  * break;
89  * } else {
90  * transformed_data = apply_transform(data)
91  * sink(transformed_data);
92  * }
93  * }
94  * }
95  * \endcode
96  *
97  * while the context switch is relatively cheap (boost coroutines promise this
98  * at < 100 cycles or so), we still want to avoid performing the context
99  * switch for every row, so our unit of communication across coroutines
100  * is an \ref sframe_rows object which represents a collection of rows, but
101  * represented columnar-wise. Every communicated block must be of a constant
102  * number of rows (i.e. SFRAME_READ_BATCH_SIZE. ex: 256), except for the last
103  * block which may be smaller. Operators which perform filtering for instance,
104  * must hence make sure to buffer accordingly.
105  *
106  * \subsection execution_node_rate_control Rate Control
107  * One key issue with any of the pipeline models (whether pull-based: like here,
108  * or push-based) is about rate-control. For instance, the operator graph
109  * corresponding to the following expression has issues:
110  * \code
111  * logical_filter(source_A, selector_source) + logical_filter(source_B, selector_source)
112  * \endcode
113  * - To compute the "+", the left logical filter operator is invoked
114  * - To compute the left logical filter, source_A and selector_source is read
115  * and they continue to be read until say... 256 rows are generated.
116  * This is then sent to the "+" operator which resumes execution.
117  * - The "+" operator then reads the right logical filter operator.
118  * - The right logical filter operator now needs to read source_B and
119  * selector_source.
120  * - However, selector_source has already advanced because it was partially
121  * consumed for the left logical filter.
122  *
123  * A solution to this requires the selector_source to buffer its reads
124  * while feeding the left logical_filter. The solution to this is to either
125  * assume that all connected operators operate at exactly the same rate
126  * (we can guarantee this with some care as to how the operator graph is
127  * constructed), or we allow buffering. This buffering has to be somewhat
128  * intelligent because it may require unbounded buffers.
129  *
130  * An earlier version of this execution model used the former procedure
131  * (assuming uniform rate), now we use the \ref broadcast_queue to provide
132  * unbounfed buffering.
133  *
134  * \subsection execution_node_usage execution_node Usage
135  *
136  * The execution_node is not generally used directly (see the hierarchy of
137  * materialize functions). However, usage is not very complicated.
138  *
139  * Given an execution_node graph, with a tip you will like to consume data from:
140  * \code
141  * (tip is a shared_ptr<execution_node>)
142  * // register a new consumer (aka myself)
143  * size_t consumer_id = tip->register_consumer();
144  *
145  * while(1) {
146  * auto rows = node->get_next(consumer_id);
147  * // do stuff. rows == nullptr on completion
148  * }
149  * \endcode
150  */
151 class execution_node : public std::enable_shared_from_this<execution_node> {
152  public:
153  execution_node(){}
154 
155  /**
156  * Initializes the execution node with an operator and inputs.
157  * Also resets the operator.
158  */
159  explicit execution_node(const std::shared_ptr<query_operator>& op,
160  const std::vector<std::shared_ptr<execution_node> >& inputs
161  = std::vector<std::shared_ptr<execution_node>>());
162 
163  execution_node(execution_node&&) = default;
164  execution_node& operator=( execution_node&&) = default;
165 
166  execution_node(const execution_node&) = delete;
167  execution_node& operator=(const execution_node&) = delete;
168 
169  /**
170  * Initializes the execution node with an operator and inputs.
171  * Also resets the operator.
172  */
173  void init(const std::shared_ptr<query_operator>& op,
174  const std::vector<std::shared_ptr<execution_node> >& inputs
175  = std::vector<std::shared_ptr<execution_node>>());
176 
177 
178  /**
179  * Adds an execution consumer. This function call then
180  * returns an ID which the caller should use with get_next().
181  */
182  size_t register_consumer();
183 
184 
185  /** Returns nullptr if there is no more data.
186  */
187  std::shared_ptr<sframe_rows> get_next(size_t consumer_id, bool skip=false);
188 
189  /**
190  * Returns the number of inputs of the execution node
191  */
192  inline size_t num_inputs() const {
193  return m_inputs.size();
194  }
195 
196  inline std::shared_ptr<execution_node> get_input_node(size_t i) const {
197  return m_inputs[i].m_node;
198  }
199 
200  /**
201  * resets the state of this execution node. Note that this does NOT
202  * recursively reset all parents (since in a general graph this could cause
203  * multiple resets of the same vertex). The caller must ensure that all
204  * connected execution nodes are reset.
205  */
206  void reset();
207 
208  /**
209  * Returns true if an exception occured while executing this node
210  */
211  bool exception_occurred() const {
212  return m_exception_occured;
213  }
214 
215  /**
216  * If an exception occured while excecuting this node, this returns the
217  * last exception exception. Otherwise returns an exception_ptr which
218  * compares equal to the null pointer.
219  */
220  std::exception_ptr get_exception() const {
221  return m_exception;
222  }
223  private:
224  /**
225  * Internal function used to add to the operator output
226  */
227  void add_operator_output(const std::shared_ptr<sframe_rows>& rows);
228 
229  /**
230  * Internal utility function what pulls the next batch of rows from a input
231  * to this node.
232  * This reads *exactly* one block.
233  * If skip is true, nullptr is always returned.
234  */
235  std::shared_ptr<sframe_rows> get_next_from_input(size_t input_id, bool skip);
236 
237  /**
238  * Starts the coroutines
239  */
240  void start_coroutines();
241 
242  /// The operator implementation
243  std::shared_ptr<query_operator> m_operator;
244 
245  std::shared_ptr<query_context> m_context;
246 
247  /**
248  * The inputs to this execution node:
249  * what execution node they come from, and what is the consumer ID
250  * when trying to pull data from the execution node.
251  */
252  struct input_node {
253  std::shared_ptr<execution_node> m_node;
254  size_t m_consumer_id = 0;
255  };
256  std::vector<input_node> m_inputs;
257 
258  std::unique_ptr<broadcast_queue<std::shared_ptr<sframe_rows> > > m_output_queue;
259  size_t m_head = 0;
260  bool m_coroutines_started = false;
261  bool m_skip_next_block = false;
262 
263  /// m_consumer_pos[i] is the ID which consumer i is consuming next.
264  std::vector<size_t> m_consumer_pos;
265 
266  /// exception handling
267  bool m_exception_occured = false;
268  std::exception_ptr m_exception;
269  bool supports_skipping;
270  bool is_linear_operator;
271 
272  friend class query_context;
273 };
274 
275 /// \}
276 }}
277 
278 #endif /* TURI_SFRAME_QUERY_ENGINE_OPERATORS_EXECUTION_NODE_HPP */
STL namespace.
std::exception_ptr get_exception() const