Turi Create  4.0
turi::query_eval::execution_node Class Reference

#include <core/storage/query_engine/execution/execution_node.hpp>

Public Member Functions

 execution_node (const std::shared_ptr< query_operator > &op, const std::vector< std::shared_ptr< execution_node > > &inputs=std::vector< std::shared_ptr< execution_node >>())
 
void init (const std::shared_ptr< query_operator > &op, const std::vector< std::shared_ptr< execution_node > > &inputs=std::vector< std::shared_ptr< execution_node >>())
 
size_t register_consumer ()
 
std::shared_ptr< sframe_rowsget_next (size_t consumer_id, bool skip=false)
 
size_t num_inputs () const
 
void reset ()
 
bool exception_occurred () const
 
std::exception_ptr get_exception () const
 

Detailed Description

The execution node provides a wrapper around an operator. It

  • manages the coroutine context for the operator
  • manages the connections between the operator and its inputs and outputs.
  • Manages the buffering and transfer of information between the operator,

Essentially, calling a coroutine, causes a context switch to occur starting the coroutine. Then within the coroutine, a "sink()" function can be called which context switches and resumes execution where the coroutine was initially triggered.

The classical example is a producer-consumer queue

void producer() {
while(1) {
a = new work
consumer(a); // or sink(a) in the above syntax
}
}
void consumer() {
while(1) {
a = producer();
// do work on a
}
}

Here, we are using coroutines to attach and communicate between query operators, so for instance, here is a simple transform on a source.

void data_source() {
while(data_source_has_rows) {
rows = read_rows
sink(rows);
}
}
void transform() {
while(1) {
data = source()
if (data == nullptr) {
break;
} else {
transformed_data = apply_transform(data)
sink(transformed_data);
}
}
}

while the context switch is relatively cheap (boost coroutines promise this at < 100 cycles or so), we still want to avoid performing the context switch for every row, so our unit of communication across coroutines is an sframe_rows object which represents a collection of rows, but represented columnar-wise. Every communicated block must be of a constant number of rows (i.e. SFRAME_READ_BATCH_SIZE. ex: 256), except for the last block which may be smaller. Operators which perform filtering for instance, must hence make sure to buffer accordingly.

One key issue with any of the pipeline models (whether pull-based: like here, or push-based) is about rate-control. For instance, the operator graph corresponding to the following expression has issues:

logical_filter(source_A, selector_source) + logical_filter(source_B, selector_source)
  • To compute the "+", the left logical filter operator is invoked
  • To compute the left logical filter, source_A and selector_source is read and they continue to be read until say... 256 rows are generated. This is then sent to the "+" operator which resumes execution.
  • The "+" operator then reads the right logical filter operator.
  • The right logical filter operator now needs to read source_B and selector_source.
  • However, selector_source has already advanced because it was partially consumed for the left logical filter.

A solution to this requires the selector_source to buffer its reads while feeding the left logical_filter. The solution to this is to either assume that all connected operators operate at exactly the same rate (we can guarantee this with some care as to how the operator graph is constructed), or we allow buffering. This buffering has to be somewhat intelligent because it may require unbounded buffers.

An earlier version of this execution model used the former procedure (assuming uniform rate), now we use the broadcast_queue to provide unbounfed buffering.

The execution_node is not generally used directly (see the hierarchy of materialize functions). However, usage is not very complicated.

Given an execution_node graph, with a tip you will like to consume data from:

(tip is a shared_ptr<execution_node>)
// register a new consumer (aka myself)
size_t consumer_id = tip->register_consumer();
while(1) {
auto rows = node->get_next(consumer_id);
// do stuff. rows == nullptr on completion
}

Definition at line 151 of file execution_node.hpp.

Constructor & Destructor Documentation

◆ execution_node()

turi::query_eval::execution_node::execution_node ( const std::shared_ptr< query_operator > &  op,
const std::vector< std::shared_ptr< execution_node > > &  inputs = std::vector< std::shared_ptr< execution_node >>() 
)
explicit

Initializes the execution node with an operator and inputs. Also resets the operator.

Member Function Documentation

◆ exception_occurred()

bool turi::query_eval::execution_node::exception_occurred ( ) const
inline

Returns true if an exception occured while executing this node

Definition at line 211 of file execution_node.hpp.

◆ get_exception()

std::exception_ptr turi::query_eval::execution_node::get_exception ( ) const
inline

If an exception occured while excecuting this node, this returns the last exception exception. Otherwise returns an exception_ptr which compares equal to the null pointer.

Definition at line 220 of file execution_node.hpp.

◆ get_next()

std::shared_ptr<sframe_rows> turi::query_eval::execution_node::get_next ( size_t  consumer_id,
bool  skip = false 
)

Returns nullptr if there is no more data.

◆ init()

void turi::query_eval::execution_node::init ( const std::shared_ptr< query_operator > &  op,
const std::vector< std::shared_ptr< execution_node > > &  inputs = std::vector< std::shared_ptr< execution_node >>() 
)

Initializes the execution node with an operator and inputs. Also resets the operator.

◆ num_inputs()

size_t turi::query_eval::execution_node::num_inputs ( ) const
inline

Returns the number of inputs of the execution node

Definition at line 192 of file execution_node.hpp.

◆ register_consumer()

size_t turi::query_eval::execution_node::register_consumer ( )

Adds an execution consumer. This function call then returns an ID which the caller should use with get_next().

◆ reset()

void turi::query_eval::execution_node::reset ( )

resets the state of this execution node. Note that this does NOT recursively reset all parents (since in a general graph this could cause multiple resets of the same vertex). The caller must ensure that all connected execution nodes are reset.


The documentation for this class was generated from the following file: