6 #ifndef TURI_SFRAME_QUERY_MANAGER_REDUCE_HPP 7 #define TURI_SFRAME_QUERY_MANAGER_REDUCE_HPP 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/storage/sframe_data/group_aggregate_value.hpp> 11 #include <core/storage/query_engine/operators/operator.hpp> 12 #include <core/storage/query_engine/execution/query_context.hpp> 13 #include <core/storage/query_engine/operators/operator_properties.hpp> 18 namespace query_eval {
33 DECL_CORO_STATE(execute);
37 static std::string name() {
return "reduce"; }
39 inline operator_impl(std::shared_ptr<group_aggregate_value> aggregator,
41 : m_aggregator(aggregator)
42 , m_output_type(output_type) { }
51 inline std::shared_ptr<query_operator>
clone()
const {
52 std::shared_ptr<group_aggregate_value> agg(m_aggregator->new_instance());
53 return std::make_shared<operator_impl>(agg, m_output_type);
56 inline bool coro_running()
const {
57 return CORO_RUNNING(execute);
66 for (
const auto& row : *rows) {
68 if (row.size() == 1) m_aggregator->add_element_simple(row[0]);
69 else m_aggregator->add_element(std::vector<flexible_type>(row));
74 (*out)[0][0] = m_aggregator->emit();
81 static std::shared_ptr<planner_node> make_planner_node(
82 std::shared_ptr<planner_node> source,
85 std::shared_ptr<group_aggregate_value> agg(aggregator.
new_instance());
87 {{
"output_type", (int)(output_type)}},
88 {{
"aggregator",
any(agg)}},
92 static std::shared_ptr<query_operator> from_planner_node(
94 std::shared_ptr<planner_node> pnode) {
95 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::REDUCE_NODE);
96 ASSERT_EQ(pnode->inputs.size(), 1);
97 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
98 ASSERT_TRUE(pnode->any_operator_parameters.count(
"aggregator"));
100 std::shared_ptr<group_aggregate_value> aggregator =
101 pnode->any_operator_parameters[
"aggregator"].as<std::shared_ptr<group_aggregate_value>>();
105 std::shared_ptr<group_aggregate_value> agg(aggregator->new_instance());
106 return std::make_shared<operator_impl>(agg, output_type);
109 static std::vector<flex_type_enum> infer_type(std::shared_ptr<planner_node> pnode) {
110 ASSERT_EQ((
int)pnode->operator_type, (
int)planner_node_type::REDUCE_NODE);
111 ASSERT_TRUE(pnode->operator_parameters.count(
"output_type"));
112 return {(
flex_type_enum)(
int)(pnode->operator_parameters[
"output_type"])};
115 static int64_t infer_length(std::shared_ptr<planner_node> pnode) {
119 static std::string print(std::shared_ptr<planner_node> pnode) {
120 ASSERT_TRUE(pnode->any_operator_parameters.count(
"aggregator"));
122 std::shared_ptr<group_aggregate_value> aggregator =
123 pnode->any_operator_parameters[
"aggregator"].as<std::shared_ptr<group_aggregate_value> >();
125 return std::string(
"Agg.") + aggregator->
name();
129 std::shared_ptr<group_aggregate_value> m_aggregator;
139 #endif // TURI_SFRAME_QUERY_MANAGER_TRANSFORM_HPP
int num_inputs
Number of inputs expected to the operator.
size_t attribute_bitfield
A bitfield of the attribute enum.
std::shared_ptr< const sframe_rows > get_next(size_t input_number)
virtual group_aggregate_value * new_instance() const =0
void emit(const std::shared_ptr< sframe_rows > &rows)
std::shared_ptr< sframe_rows > get_output_buffer()
std::shared_ptr< query_operator > clone() const
#define ASSERT_TRUE(cond)
void execute(query_context &context)
static std::shared_ptr< planner_node > make_shared(planner_node_type operator_type, const std::map< std::string, flexible_type > &operator_parameters=std::map< std::string, flexible_type >(), const std::map< std::string, any > &any_operator_parameters=std::map< std::string, any >(), const std::vector< std::shared_ptr< planner_node >> &inputs=std::vector< std::shared_ptr< planner_node >>())
virtual std::string name() const =0