6 #ifndef TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP 7 #define TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP 13 #include <unordered_set> 14 #include <core/storage/sframe_data/sframe.hpp> 15 #include <core/util/cityhash_tc.hpp> 16 #include <core/parallel/mutex.hpp> 17 #include <core/storage/sframe_data/group_aggregate_value.hpp> 18 #include <core/generics/hopscotch_map.hpp> 33 namespace groupby_aggregate_impl {
56 std::vector<flexible_type>
key;
58 mutable std::vector<std::unique_ptr<group_aggregate_value> >
values;
71 const std::vector<group_descriptor>& group_desc);
79 const std::vector<group_descriptor>& group_desc);
86 void init(std::vector<flexible_type> group_key,
87 const std::vector<group_descriptor>& group_desc);
98 const std::vector<group_descriptor>& group_desc);
121 template <
typename T>
122 void add_element(
const T& val,
123 const std::vector<group_descriptor>& group_desc)
const;
125 static size_t hash_key(
const std::vector<flexible_type>& key);
127 static size_t hash_key(
const std::vector<flexible_type>& key,
size_t keylen);
158 return element.hash();
174 namespace groupby_aggregate_impl {
204 size_t num_segments);
217 std::shared_ptr<group_aggregate_value>
aggregator);
220 void add(
const std::vector<flexible_type>& val,
228 void group_and_write(
sframe& out);
237 inline void throw_if_not_initialized()
const {
238 if (UNLIKELY(!tss_.init_))
239 log_and_throw(
"group_aggregate_container is not initialized");
243 void flush_segment(
size_t segmentid);
245 void merge_local_buffer_set();
249 std::vector<group_descriptor> group_descriptors;
251 const size_t max_buffer_size;
252 const size_t num_segments;
254 struct segment_information {
260 using vec_segment_t = std::vector<segment_information>;
261 using vec_chunk_t = std::vector<size_t>;
263 struct tls_segment_set {
266 vec_segment_t segments_{};
272 static thread_local tls_segment_set tss_;
275 sa_buffer_t() =
default;
277 sa_buffer_t(
size_t num_segments)
279 sa_seg_chunks_(num_segments),
280 sa_seg_locks_(num_segments),
284 std::unique_ptr<sarray<std::string>> sa_buffer_ptr_;
285 std::vector<vec_chunk_t> sa_seg_chunks_;
286 std::vector<turi::simple_spinlock> sa_seg_locks_;
287 turi::atomic<unsigned> refctr_;
290 std::vector<sa_buffer_t> local_buffer_set_;
292 std::atomic<unsigned> merry_go_round_;
299 std::vector<turi::mutex> gl_lock_pool_;
301 std::vector<vec_chunk_t> gl_chunk_size_set_;
303 std::unique_ptr<sarray<std::string>::reader_type> reader;
306 void group_and_write_segment(
sframe& out,
318 #endif // TURI_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
size_t hash_val
A cache of the hash of the key.
std::vector< flexible_type > key
The key of this aggregation.
std::vector< size_t > column_numbers
The column number of operate on.
std::vector< std::unique_ptr< group_aggregate_value > > values
All the aggregated values.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
std::shared_ptr< group_aggregate_value > aggregator
The aggregator.