6 #ifndef TURI_SFRAME_GROUPBY_HPP 7 #define TURI_SFRAME_GROUPBY_HPP 9 #include <core/parallel/mutex.hpp> 12 #include<core/storage/sframe_data/sarray.hpp> 13 #include<core/storage/sframe_data/sframe.hpp> 29 sframe
group(sframe sframe_in, std::string key_column);
86 typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
92 comparator_type comparator = std::less<value_type>()
94 sarray_sink.reset(
new sink_type());
95 sarray_sink->open_for_write(num_buckets);
96 for (
size_t i = 0; i < num_buckets; ++i) {
109 void add(
const value_type& val,
size_t bucketid) {
110 DASSERT_LT(bucketid, buckets.size());
111 buckets[bucketid]->add(val);
115 template<
typename SIterableType>
116 void sort_and_write(SIterableType& out) {
117 parallel_for (0, num_buckets(), [&](
size_t i) { buckets[i]->flush();} );
118 sarray_sink->close();
119 typedef typename SIterableType::iterator OutIterator;
120 DASSERT_EQ(out.num_segments(), buckets.size());
123 buckets[i]->template sort_and_write<OutIterator>(out.get_output_iterator(i));
129 size_t num_buckets()
const {
return buckets.size(); }
135 const size_t buffer_size = 1024 * 1024;
138 std::vector<hash_bucket<value_type>*> buckets;
141 std::shared_ptr<sarray<std::string>> sarray_sink;
153 typedef T value_type;
156 typedef std::function<bool(const value_type&, const value_type&)> comparator_type;
161 std::shared_ptr<sink_type> sink,
163 comparator_type comparator,
164 bool deduplicate =
false);
171 void add(
const value_type& val);
172 void add(value_type&& val);
179 template<
typename OutIterator>
180 void sort_and_write(OutIterator out);
185 void save_buffer(std::vector<value_type>& swap_buffer);
191 std::shared_ptr<sarray<std::string>> sink;
194 sink_iterator_type out_iter;
197 std::vector<size_t> chunk_size;
203 std::vector<value_type> buffer;
216 comparator_type comparator;
222 inline value_type deserialize(
const std::string& buf) {
224 iarchive iarc(buf.c_str(), buf.length());
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
sframe group(sframe sframe_in, std::string key_column)
hash_bucket_container(size_t num_buckets, comparator_type comparator=std::less< value_type >())
Constructs a container with n buckets, and a comparator for sorting the values.