6 #ifndef _CANVAS_STREAMING_TRANSFORMATION 7 #define _CANVAS_STREAMING_TRANSFORMATION 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/parallel/lambda_omp.hpp> 13 namespace visualization {
15 class transformation_output {
17 virtual ~transformation_output() =
default;
18 virtual std::string vega_column_data(
bool sframe =
false)
const = 0;
21 class sframe_transformation_output :
public transformation_output {
23 virtual std::string vega_summary_data()
const = 0;
26 class transformation_base {
28 virtual ~transformation_base() =
default;
29 virtual std::shared_ptr<transformation_output>
get() = 0;
30 virtual bool eof()
const = 0;
31 double get_percent_complete()
const;
32 virtual size_t get_batch_size()
const = 0;
33 virtual flex_int get_total_rows()
const = 0;
34 virtual flex_int get_rows_processed()
const = 0;
37 class transformation_collection :
public std::vector<std::shared_ptr<transformation_base>> {
43 template<
typename InputIterable,
45 class transformation :
public transformation_base {
48 InputIterable m_source;
49 std::shared_ptr<Output> m_transformer;
50 size_t m_currentIdx = 0;
51 bool m_initialized =
false;
54 void check_init(
const char * msg,
bool initialized)
const {
55 if (initialized != m_initialized) {
59 void require_init()
const {
60 check_init(
"Transformer must be initialized before performing this operation.",
true);
66 virtual Output get_current() {
67 return *m_transformer;
70 virtual std::vector<Output> split_input(
size_t num_threads) {
71 return std::vector<Output>(num_threads);
74 virtual void merge_results(std::vector<Output>& transformers) = 0;
77 virtual void init(
const InputIterable& source,
size_t batch_size) {
78 check_init(
"Transformer is already initialized.",
false);
79 m_batch_size = batch_size;
81 m_transformer = std::make_shared<Output>();
85 virtual bool eof()
const override {
87 DASSERT_LE(m_currentIdx, m_source.size());
88 return m_currentIdx == m_source.size();
90 virtual flex_int get_rows_processed()
const override {
92 DASSERT_LE(m_currentIdx, m_source.size());
95 virtual flex_int get_total_rows()
const override {
97 return m_source.size();
99 virtual std::shared_ptr<transformation_output>
get()
override {
103 return m_transformer;
107 const size_t start = m_currentIdx;
108 const size_t input_size = std::min(m_batch_size, m_source.size() - m_currentIdx);
109 const size_t end = start + input_size;
110 auto transformers = this->split_input(num_threads_reported);
111 const auto& source = this->m_source;
113 [&transformers, &source, input_size, start]
114 (
size_t thread_idx,
size_t num_threads) {
116 DASSERT_LE(transformers.size(), num_threads);
117 if (thread_idx >= transformers.size()) {
123 auto& transformer = transformers[thread_idx];
124 size_t thread_input_size = input_size / transformers.size();
125 size_t thread_start = start + (thread_idx * thread_input_size);
126 size_t thread_end = thread_idx == transformers.size() - 1 ?
128 thread_start + thread_input_size;
129 DASSERT_LE(thread_end, start + input_size);
130 for (
const auto& value : source.range_iterator(thread_start, thread_end)) {
131 transformer.add_element_simple(value);
135 this->merge_results(transformers);
138 return m_transformer;
141 virtual size_t get_batch_size()
const override {
static thread_pool & get_instance()
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)