Turi Create  4.0
query_context.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SFRAME_QUERY_ENGINE_OPERATORS_QUERY_CONTEXT_HPP
7 #define TURI_SFRAME_QUERY_ENGINE_OPERATORS_QUERY_CONTEXT_HPP
8 #include <memory>
9 #include <vector>
10 #include <core/storage/sframe_data/sframe_rows.hpp>
11 namespace turi {
12 namespace query_eval {
13 class execution_node;
14 
15 
16 /**
17  * \ingroup sframe_query_engine
18  * \addtogroup execution Execution
19  * \{
20  */
21 
22 /**
23  * This is the object passed to the coroutine which allows the coroutine
24  * to read and write values. The expected usage pattern of the coroutine is:
25  *
26  * \code
27  * void fn(query_context& ctx) {
28  * while(1) {
29  * auto input = ctx.get_next(0); // from 1st input. get_next(1) for 2nd input, etc.
30  * auto output = get_output_buffer();
31  * // fill output buffer. Its just an sframe_rows
32  * ctx.emit(output); // only if output is non-empty.
33  * }
34  * }
35  * \endcode
36  */
38  public:
39  query_context();
40  query_context(const query_context&) = default;
41  query_context(query_context&&) = default;
42  query_context& operator=(const query_context&) = default;
43  query_context& operator=(query_context&&) = default;
44  ~query_context();
45  query_context(execution_node* exec_node,
46  size_t m_buffer_size);
47 
48  /**
49  * Requests for the next block for the given input.
50  */
51  std::shared_ptr<const sframe_rows> get_next(size_t input_number);
52 
53  /**
54  * Requests for the next block for the given input to the skipped.
55  */
56  void skip_next(size_t input_number);
57 
58  /**
59  * Returns a pointer to an output buffer.
60  */
61  std::shared_ptr<sframe_rows> get_output_buffer();
62 
63  /**
64  * Emits a collection of rows. The number of rows emitted
65  * MUST be the same as block_size(), except for the very last block
66  * of rows. Should yield immediately after emitting a block.
67  */
68  void emit(const std::shared_ptr<sframe_rows>& rows);
69 
70  /**
71  * Returns true if the operator should try to skip a block.
72  */
73  bool should_skip();
74 
75  /**
76  * The commmunication block size.
77  */
78  inline size_t block_size() const {
79  return m_max_buffer_size;
80  }
81  private:
82 
83  /// Maximum buffer size
84  size_t m_max_buffer_size = 256; // some arbitrary default
85 
86  // we only need 1 buffer and to cycle between both since the linear
87  // assumption means that at most one buffer may be used or given away at
88  // any one point.
89  std::shared_ptr<sframe_rows> m_buffers;
90  execution_node* m_exec_node;
91 };
92 
93 /// \}
94 } // query_eval
95 } // turicreate
96 #endif // TURI_SFRAME_QUERY_ENGINE_OPERATORS_QUERY_CONTEXT_HPP
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
void skip_next(size_t input_number)