6 #ifndef TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP 9 #include <core/storage/serialization/serialization_includes.hpp> 10 #include <core/data/flexible_type/flexible_type.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/execution/query_context.hpp> 13 #include <core/storage/query_engine/planning/planner_node.hpp> 14 #include <core/storage/query_engine/operators/operator_properties.hpp> 15 #include <core/storage/fileio/fs_utils.hpp> 16 #include <core/storage/sframe_data/sarray.hpp> 20 namespace query_eval {
24 struct operator_impl<
planner_node_type::SARRAY_SOURCE_NODE> :
public query_operator{
36 DECL_CORO_STATE(execute);
38 size_t block_size = 0;
39 bool skip_next_block =
false;
41 std::shared_ptr<sframe_rows> rows;
45 static std::string name() {
return "sarray_source"; }
47 inline operator_impl(std::shared_ptr<sarray<flexible_type> > source,
48 size_t begin_index = 0,
size_t end_index =
size_t(-1) )
50 , m_begin_index(begin_index)
51 , m_end_index(end_index == size_t(-1) ? m_source->size() : end_index)
54 static query_operator_attributes attributes() {
55 query_operator_attributes ret;
62 inline std::shared_ptr<query_operator> clone()
const {
63 return std::make_shared<operator_impl>(m_source);
66 inline bool coro_running()
const {
67 return CORO_RUNNING(execute);
69 inline void execute(query_context& context) {
71 if (!m_reader) m_reader = m_source->get_reader();
72 start = m_begin_index;
73 block_size = context.block_size();
74 skip_next_block = context.should_skip();
76 while (start != m_end_index) {
77 rows = context.get_output_buffer();
78 end = std::min(start + block_size, m_end_index);
79 if (skip_next_block ==
false) {
80 m_reader->read_rows(start, end, *rows);
84 context.emit(
nullptr);
87 skip_next_block = context.should_skip();
93 static std::shared_ptr<planner_node> make_planner_node(
94 std::shared_ptr<sarray<flexible_type> > source,
size_t begin_index = 0,
size_t _end_index = -1) {
95 std::stringstream strm;
97 oarc << source->get_index_info();
98 auto type = source->get_type();
100 size_t end_index = (_end_index == size_t(-1)) ? source->size() : _end_index;
102 DASSERT_LE(begin_index, end_index);
103 DASSERT_LE(end_index, source->size());
108 {{
"index", strm.str()},
110 {
"begin_index", begin_index},
111 {
"end_index", end_index}},
112 {{
"sarray", any(source)}});
115 static std::shared_ptr<query_operator> from_planner_node(
116 std::shared_ptr<planner_node> pnode) {
117 ASSERT_EQ((
int)pnode->operator_type,
118 (
int)planner_node_type::SARRAY_SOURCE_NODE);
119 ASSERT_TRUE(pnode->any_operator_parameters.count(
"sarray"));
120 auto source = pnode->any_operator_parameters[
"sarray"]
121 .as<std::shared_ptr<sarray<flexible_type>>>();
123 size_t begin_index = pnode->operator_parameters.at(
"begin_index");
124 size_t end_index = pnode->operator_parameters.at(
"end_index");
126 return std::make_shared<operator_impl>(source, begin_index, end_index);
129 static std::vector<flex_type_enum> infer_type(
130 std::shared_ptr<planner_node> pnode) {
131 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::SARRAY_SOURCE_NODE);
137 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
138 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::SARRAY_SOURCE_NODE);
139 flex_int length = (pnode->operator_parameters.at(
"end_index")
140 - pnode->operator_parameters.at(
"begin_index"));
149 static mutex access_lock;
150 std::lock_guard<mutex> _lg(access_lock);
152 static size_t current_number = 0;
153 static std::map<ptrdiff_t, std::pair<std::weak_ptr<sarray<flexible_type> >,
size_t> > tracked_numbers;
155 ptrdiff_t key = ptrdiff_t(sa.get());
157 auto it = tracked_numbers.find(key);
159 if(it != tracked_numbers.end()) {
160 if(!it->second.first.expired())
161 return it->second.second;
163 tracked_numbers.erase(it);
169 if(current_number % 256 == 0) {
170 for(
auto it = tracked_numbers.begin(); it != tracked_numbers.end();) {
171 if(it->second.first.expired())
172 it = tracked_numbers.erase(it);
178 tracked_numbers[key] = {sa, current_number};
179 return current_number;
182 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger&) {
183 std::ostringstream out;
185 auto source = pnode->any_operator_parameters[
"sarray"]
186 .as<std::shared_ptr<sarray<flexible_type> > >();
188 out <<
"S" << unique_sarray_tag(source);
190 size_t begin_index = pnode->operator_parameters.at(
"begin_index");
191 size_t end_index = pnode->operator_parameters.at(
"end_index");
193 if(begin_index != 0 || end_index != source->size()) {
194 out <<
"[" << begin_index <<
"," << end_index <<
"]";
200 std::shared_ptr<sarray<flexible_type>> m_source;
201 size_t m_begin_index, m_end_index;
202 std::shared_ptr<sarray_reader<flexible_type>> m_reader;
205 typedef operator_impl<planner_node_type::SARRAY_SOURCE_NODE> op_sarray_source;
211 #endif // TURI_SFRAME_QUERY_MANAGER_SARRAY_SOURCE_HPP
static size_t unique_sarray_tag(const std::shared_ptr< sarray< flexible_type > > &sa)
#define ASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())