6 #ifndef TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/storage/query_engine/operators/operator.hpp> 11 #include <core/storage/query_engine/execution/query_context.hpp> 12 #include <core/storage/query_engine/operators/operator_properties.hpp> 16 namespace query_eval {
31 DECL_CORO_STATE(execute);
35 static std::string name() {
return "project"; }
44 inline operator_impl(
const std::vector<size_t>& indices): m_indices(indices) {
45 ASSERT_GT(m_indices.size(), 0);
48 inline std::string
print()
const {
49 std::stringstream str_indices;
50 for (
size_t i : m_indices) str_indices << i <<
" ";
51 return name() +
"(" + str_indices.str() +
")";
54 inline std::shared_ptr<query_operator>
clone()
const {
55 return std::make_shared<operator_impl>(*this);
58 inline bool coro_running()
const {
59 return CORO_RUNNING(execute);
70 auto& rows_columns = rows->cget_columns();
71 auto& out_columns = out->get_columns();
73 for (
size_t i = 0;i < m_indices.size(); ++i) {
74 DASSERT_LT(i, m_indices.size());
75 out_columns.push_back(rows_columns[m_indices[i]]);
84 static std::shared_ptr<planner_node> make_planner_node(
85 std::shared_ptr<planner_node> input,
86 const std::vector<size_t>& indices) {
90 std::vector<flexible_type> flex_indices(indices.begin(), indices.end());
94 for(
size_t col_idx : indices)
95 DASSERT_LT(col_idx, num_columns);
99 {{
"indices", flex_indices}},
100 std::map<std::string, any>(),
104 static std::shared_ptr<query_operator> from_planner_node(
105 std::shared_ptr<planner_node> pnode) {
106 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::PROJECT_NODE);
107 ASSERT_TRUE(pnode->operator_parameters.count(
"indices"));
108 auto flex_indices = pnode->operator_parameters.at(
"indices").get<
flex_list>();
109 std::vector<size_t> indices(flex_indices.begin(), flex_indices.end());
110 return std::make_shared<operator_impl>(indices);
113 static std::vector<flex_type_enum> infer_type(
114 std::shared_ptr<planner_node> pnode) {
115 ASSERT_EQ((
int)pnode->operator_type,
116 (
int)planner_node_type::PROJECT_NODE);
118 auto flex_indices = pnode->operator_parameters.at(
"indices").get<
flex_list>();
119 std::vector<flex_type_enum> ret;
120 for (
auto i: flex_indices) {
121 ret.push_back(input_type[i.get<
flex_int>()]);
125 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
126 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::PROJECT_NODE);
130 static std::string repr(std::shared_ptr<planner_node> pnode, pnode_tagger& ) {
131 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::PROJECT_NODE);
132 ASSERT_TRUE(pnode->operator_parameters.count(
"indices"));
133 auto flex_indices = pnode->operator_parameters[
"indices"].get<
flex_list>();
135 std::ostringstream out;
138 if(flex_indices.size() > 0) {
141 std::vector<std::vector<size_t> > groups{ {flex_indices[0]} };
143 for(
size_t i = 1; i < flex_indices.size();++i) {
144 if(flex_indices[i] == groups.back().back() + 1)
145 groups.back().push_back(
size_t(flex_indices[i]));
147 groups.push_back({flex_indices[i]});
150 bool is_first =
true;
151 for(
const std::vector<size_t>& grp : groups) {
162 out << grp[0] <<
',' << grp[1];
165 out << grp[0] <<
',' << grp[1] <<
',' << grp[2];
168 out << grp.front() <<
",...," << grp.back();
180 std::vector<size_t> m_indices;
189 #endif // TURI_SFRAME_QUERY_MANAGER_PROJECT_HPP
int64_t infer_planner_node_length(std::shared_ptr< planner_node > pnode)
int num_inputs
Number of inputs expected to the operator.
std::shared_ptr< query_operator > clone() const
size_t infer_planner_node_num_output_columns(std::shared_ptr< planner_node > pnode)
size_t attribute_bitfield
A bitfield of the attribute enum.
void execute(query_context &context)
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
void emit(const std::shared_ptr< sframe_rows > &rows)
std::string print() const
std::shared_ptr< sframe_rows > get_output_buffer()
#define DASSERT_FALSE(cond)
#define ASSERT_TRUE(cond)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
std::vector< flex_type_enum > infer_planner_node_type(std::shared_ptr< planner_node > pnode)
std::vector< flexible_type > flex_list