6 #ifndef TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP 10 #include <core/data/flexible_type/flexible_type.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/operators/sarray_source.hpp> 13 #include <core/storage/query_engine/execution/query_context.hpp> 14 #include <core/storage/query_engine/operators/operator_properties.hpp> 15 #include <core/storage/sframe_data/sframe.hpp> 19 namespace query_eval {
33 DECL_CORO_STATE(execute);
35 std::shared_ptr<sframe_rows> rows;
36 size_t block_size = 0;
37 bool skip_next_block =
false;
42 static std::string name() {
return "sframe_source"; }
44 inline operator_impl(
sframe source,
size_t begin_index = 0,
size_t end_index =
size_t(-1) )
46 , m_begin_index(begin_index)
47 , m_end_index(end_index ==
size_t(-1) ? m_source.size() : end_index)
59 inline std::shared_ptr<query_operator>
clone()
const {
60 return std::make_shared<operator_impl>(m_source);
63 inline bool coro_running()
const {
64 return CORO_RUNNING(execute);
68 if (!m_reader) m_reader = m_source.get_reader();
69 start = m_begin_index;
73 while (start != m_end_index) {
75 end = std::min(start + block_size, m_end_index);
76 if (skip_next_block ==
false) {
77 m_reader->read_rows(start, end, *rows);
81 context.
emit(
nullptr);
90 static std::shared_ptr<planner_node> make_planner_node(
91 sframe source,
size_t begin_index = 0,
size_t _end_index = -1) {
92 std::stringstream strm;
97 size_t end_index = (_end_index == size_t(-1)) ? source.
size() : _end_index;
98 DASSERT_LE(begin_index, end_index);
99 DASSERT_LE(end_index, source.
size());
102 for (
size_t i = 0; i < types.size(); ++i) {
110 {{
"index", strm.str()},
111 {
"types", type_list},
112 {
"begin_index", begin_index},
113 {
"end_index", end_index}},
114 {{
"sframe",
any(source)}});
117 static std::shared_ptr<query_operator> from_planner_node(
118 std::shared_ptr<planner_node> pnode) {
119 ASSERT_EQ((
int)pnode->operator_type,
120 (
int)planner_node_type::SFRAME_SOURCE_NODE);
122 ASSERT_TRUE(pnode->any_operator_parameters.count(
"sframe"));
123 auto source = pnode->any_operator_parameters.at(
"sframe").as<
sframe>();
125 size_t begin_index = pnode->operator_parameters.at(
"begin_index");
126 size_t end_index = pnode->operator_parameters.at(
"end_index");
128 return std::make_shared<operator_impl>(source, begin_index, end_index);
131 static std::vector<flex_type_enum> infer_type(
132 std::shared_ptr<planner_node> pnode) {
133 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::SFRAME_SOURCE_NODE);
134 flex_list type = pnode->operator_parameters.at(
"types");
135 std::vector<flex_type_enum> ret;
140 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
141 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::SFRAME_SOURCE_NODE);
142 flex_int length = (pnode->operator_parameters.at(
"end_index")
143 - pnode->operator_parameters.at(
"begin_index"));
147 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
148 std::ostringstream out;
150 auto source = pnode->any_operator_parameters.at(
"sframe").as<
sframe>();
158 columns[i] = op_sarray_source::unique_sarray_tag(source.
select_column(i));
162 std::vector<std::vector<size_t> > groups{ {columns[0]} };
164 for(
size_t i = 1; i < columns.size();++i) {
165 if(columns[i] == groups.back().back() + 1)
166 groups.back().push_back(columns[i]);
168 groups.push_back({columns[i]});
171 bool is_first =
true;
172 for(
const std::vector<size_t>& grp : groups) {
180 out <<
"S" << grp[0];
183 out <<
"S" << grp[0] <<
",S" << grp[1];
186 out <<
"S" << grp.front() <<
",...,S" << grp.back();
194 size_t begin_index = pnode->operator_parameters.at(
"begin_index");
195 size_t end_index = pnode->operator_parameters.at(
"end_index");
197 if(begin_index != 0 || end_index != source.
num_rows()) {
198 out <<
"[" << begin_index <<
"," << end_index <<
"]";
206 size_t m_begin_index, m_end_index;
207 std::shared_ptr<sframe_reader> m_reader;
216 #endif // TURI_SFRAME_QUERY_MANAGER_SFRAME_SOURCE_HPP
int num_inputs
Number of inputs expected to the operator.
std::shared_ptr< query_operator > clone() const
size_t num_rows() const
Returns the length of each sarray.
size_t attribute_bitfield
A bitfield of the attribute enum.
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
const sframe_index_file_information get_index_info() const
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
#define ASSERT_TRUE(cond)
size_t block_size() const
size_t num_columns() const
Returns the number of columns in the SFrame. Does not throw.
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
void execute(query_context &context)
std::vector< flex_type_enum > column_types() const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
std::vector< flexible_type > flex_list