6 #ifndef TURI_SFRAME_QUERY_ENGINE_OPERATORS_EXECUTION_NODE_HPP 7 #define TURI_SFRAME_QUERY_ENGINE_OPERATORS_EXECUTION_NODE_HPP 14 #include <core/data/flexible_type/flexible_type.hpp> 15 #include <core/storage/query_engine/operators/operator.hpp> 16 #include <core/storage/query_engine/util/broadcast_queue.hpp> 22 struct broadcast_queue_serializer<
std::shared_ptr<sframe_rows>> {
23 void save(oarchive& oarc,
const std::shared_ptr<sframe_rows>& t) {
26 void load(iarchive& iarc, std::shared_ptr<sframe_rows>& t) {
27 t = std::make_shared<sframe_rows>();
32 namespace query_eval {
159 explicit execution_node(
const std::shared_ptr<query_operator>& op,
160 const std::vector<std::shared_ptr<execution_node> >& inputs
161 = std::vector<std::shared_ptr<execution_node>>());
173 void init(
const std::shared_ptr<query_operator>& op,
174 const std::vector<std::shared_ptr<execution_node> >& inputs
175 = std::vector<std::shared_ptr<execution_node>>());
182 size_t register_consumer();
187 std::shared_ptr<sframe_rows> get_next(
size_t consumer_id,
bool skip=
false);
193 return m_inputs.size();
196 inline std::shared_ptr<execution_node> get_input_node(
size_t i)
const {
197 return m_inputs[i].m_node;
212 return m_exception_occured;
227 void add_operator_output(
const std::shared_ptr<sframe_rows>& rows);
235 std::shared_ptr<sframe_rows> get_next_from_input(
size_t input_id,
bool skip);
240 void start_coroutines();
243 std::shared_ptr<query_operator> m_operator;
245 std::shared_ptr<query_context> m_context;
253 std::shared_ptr<execution_node> m_node;
254 size_t m_consumer_id = 0;
256 std::vector<input_node> m_inputs;
258 std::unique_ptr<broadcast_queue<std::shared_ptr<sframe_rows> > > m_output_queue;
260 bool m_coroutines_started =
false;
261 bool m_skip_next_block =
false;
264 std::vector<size_t> m_consumer_pos;
267 bool m_exception_occured =
false;
268 std::exception_ptr m_exception;
269 bool supports_skipping;
270 bool is_linear_operator;
size_t num_inputs() const
bool exception_occurred() const
std::exception_ptr get_exception() const