Turi Create
4.0
|
#include <core/storage/query_engine/execution/execution_node.hpp>
Public Member Functions | |
execution_node (const std::shared_ptr< query_operator > &op, const std::vector< std::shared_ptr< execution_node > > &inputs=std::vector< std::shared_ptr< execution_node >>()) | |
void | init (const std::shared_ptr< query_operator > &op, const std::vector< std::shared_ptr< execution_node > > &inputs=std::vector< std::shared_ptr< execution_node >>()) |
size_t | register_consumer () |
std::shared_ptr< sframe_rows > | get_next (size_t consumer_id, bool skip=false) |
size_t | num_inputs () const |
void | reset () |
bool | exception_occurred () const |
std::exception_ptr | get_exception () const |
The execution node provides a wrapper around an operator. It
Essentially, calling a coroutine, causes a context switch to occur starting the coroutine. Then within the coroutine, a "sink()" function can be called which context switches and resumes execution where the coroutine was initially triggered.
The classical example is a producer-consumer queue
Here, we are using coroutines to attach and communicate between query operators, so for instance, here is a simple transform on a source.
while the context switch is relatively cheap (boost coroutines promise this at < 100 cycles or so), we still want to avoid performing the context switch for every row, so our unit of communication across coroutines is an sframe_rows object which represents a collection of rows, but represented columnar-wise. Every communicated block must be of a constant number of rows (i.e. SFRAME_READ_BATCH_SIZE. ex: 256), except for the last block which may be smaller. Operators which perform filtering for instance, must hence make sure to buffer accordingly.
One key issue with any of the pipeline models (whether pull-based: like here, or push-based) is about rate-control. For instance, the operator graph corresponding to the following expression has issues:
A solution to this requires the selector_source to buffer its reads while feeding the left logical_filter. The solution to this is to either assume that all connected operators operate at exactly the same rate (we can guarantee this with some care as to how the operator graph is constructed), or we allow buffering. This buffering has to be somewhat intelligent because it may require unbounded buffers.
An earlier version of this execution model used the former procedure (assuming uniform rate), now we use the broadcast_queue to provide unbounfed buffering.
The execution_node is not generally used directly (see the hierarchy of materialize functions). However, usage is not very complicated.
Given an execution_node graph, with a tip you will like to consume data from:
Definition at line 151 of file execution_node.hpp.
|
explicit |
Initializes the execution node with an operator and inputs. Also resets the operator.
|
inline |
Returns true if an exception occured while executing this node
Definition at line 211 of file execution_node.hpp.
|
inline |
If an exception occured while excecuting this node, this returns the last exception exception. Otherwise returns an exception_ptr which compares equal to the null pointer.
Definition at line 220 of file execution_node.hpp.
std::shared_ptr<sframe_rows> turi::query_eval::execution_node::get_next | ( | size_t | consumer_id, |
bool | skip = false |
||
) |
Returns nullptr if there is no more data.
void turi::query_eval::execution_node::init | ( | const std::shared_ptr< query_operator > & | op, |
const std::vector< std::shared_ptr< execution_node > > & | inputs = std::vector< std::shared_ptr< execution_node >>() |
||
) |
Initializes the execution node with an operator and inputs. Also resets the operator.
|
inline |
Returns the number of inputs of the execution node
Definition at line 192 of file execution_node.hpp.
size_t turi::query_eval::execution_node::register_consumer | ( | ) |
Adds an execution consumer. This function call then returns an ID which the caller should use with get_next().
void turi::query_eval::execution_node::reset | ( | ) |
resets the state of this execution node. Note that this does NOT recursively reset all parents (since in a general graph this could cause multiple resets of the same vertex). The caller must ensure that all connected execution nodes are reset.