6 #ifndef TURI_DML_DATA_COLUMN_STATISTICS_H_ 7 #define TURI_DML_DATA_COLUMN_STATISTICS_H_ 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/logging/assertions.hpp> 11 #include <core/storage/serialization/serialization_includes.hpp> 12 #include <ml/ml_data/ml_data_column_modes.hpp> 13 #include <core/parallel/pthread_tools.hpp> 14 #include <boost/thread/lock_algorithms.hpp> 17 namespace turi {
namespace ml_data_internal {
19 extern size_t ML_DATA_STATS_PARALLEL_ACCESS_THRESHOLD;
55 return total_row_count;
61 size_t count(
size_t index=0)
const {
63 if(mode == ml_column_mode::NUMERIC
64 || mode == ml_column_mode::NUMERIC_VECTOR) {
66 return total_row_count;
68 size_t ret = index < counts.size() ? counts[index] : 0;
69 DASSERT_LE(ret, total_row_count);
77 double mean(
size_t index=0)
const {
78 if(mode == ml_column_mode::CATEGORICAL
79 || mode == ml_column_mode::CATEGORICAL_VECTOR) {
81 double p = double(count(index)) / (std::max(1.0,
double(total_row_count)));
93 return index < statistics.size() ? statistics[index].mean : 0;
100 double stdev(
size_t index=0)
const {
101 if(mode == ml_column_mode::CATEGORICAL
102 || mode == ml_column_mode::CATEGORICAL_VECTOR) {
105 double p = mean(index);
110 if (total_row_count > 1) {
111 stdev = std::sqrt(total_row_count * p * (1 - p) / (total_row_count-1));
122 if(total_row_count) {
126 return index < statistics.size() ? statistics[index].stdev : 0;
141 || mode == ml_column_mode::CATEGORICAL_VECTOR);
143 std::vector<size_t>& counts = by_thread_element_counts[thread_idx];
146 DASSERT_TRUE(std::is_sorted(cat_index_vect.begin(), cat_index_vect.end()));
151 cv_idx < cat_index_vect.size()
152 && cat_index_vect[cv_idx] < parallel_threshhold;
155 size_t idx = cat_index_vect[cv_idx];
157 check_local_array_size(idx, counts);
159 if(cv_idx == 0 || (idx != cat_index_vect[cv_idx - 1]) ) {
164 if(cv_idx != cat_index_vect.size() ) {
165 for(; cv_idx < cat_index_vect.size(); ++cv_idx) {
166 size_t idx = cat_index_vect[cv_idx] - parallel_threshhold;
168 check_global_array_sizes(idx, global_element_counts);
170 if(cv_idx == 0 || (idx != cat_index_vect[cv_idx - 1]) ) {
171 std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
172 ++(global_element_counts[idx]);
177 ++by_thread_row_counts[thread_idx];
184 || mode == ml_column_mode::NUMERIC_VECTOR
185 || mode == ml_column_mode::NUMERIC_ND_VECTOR);
189 if(value_vect.empty()) {
195 size_t& count = by_thread_row_counts[thread_idx];
196 auto& stats = by_thread_mean_var_acc[thread_idx];
200 DASSERT_EQ(
size_t(count), 0);
202 stats.resize(value_vect.size());
204 for(
size_t i = 0; i < value_vect.size(); ++i) {
205 stats[i].mean = value_vect[i];
206 stats[i].var_sum = 0;
210 DASSERT_EQ(stats.size(), value_vect.size());
227 for(
size_t i = 0; i < value_vect.size(); i++){
228 double& mean = stats[i].mean;
229 double& var_sum = stats[i].var_sum;
231 double old_mean = mean;
232 double v = value_vect[i];
234 mean += (v - old_mean) / (count + 1);
238 var_sum += (v - old_mean) * (v - mean);
257 size_t& row_count = by_thread_row_counts[thread_idx];
260 auto update_f = [&](
size_t& count,
double& mean,
double& var_sum,
double v) {
280 double old_mean = mean;
283 mean += (v - old_mean) / count;
284 var_sum += (v - old_mean) * (v - mean);
289 std::vector<size_t>& counts = by_thread_element_counts[thread_idx];
290 std::vector<element_statistics_accumulator>& stats = by_thread_mean_var_acc[thread_idx];
294 for(; d_idx < dict.size() && dict[d_idx].first < parallel_threshhold; ++d_idx) {
295 size_t idx = dict[d_idx].first;
296 double v = dict[d_idx].second;
298 check_local_array_size(idx, counts);
299 check_local_array_size(idx, stats);
301 update_f(counts[idx], stats[idx].mean, stats[idx].var_sum, v);
305 if(d_idx != dict.size()) {
307 for(; d_idx < dict.size(); ++d_idx) {
309 size_t idx = dict[d_idx].first - parallel_threshhold;
310 double v = dict[d_idx].second;
312 check_global_array_sizes(idx, global_element_counts, global_mean_var_acc);
314 std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
316 update_f(global_element_counts[idx],
317 global_mean_var_acc[idx].mean,
318 global_mean_var_acc[idx].var_sum, v);
331 void reindex(
const std::vector<size_t>& new_index_map,
size_t new_column_size);
339 std::pair<bool, bool> _get_using_flags()
const;
341 void _finalize_threadlocal(
size_t,
bool,
bool);
342 void _finalize_global(
size_t,
bool,
bool);
378 void _debug_check_is_approx_equal(std::shared_ptr<column_statistics> other)
const;
380 void _debug_check_is_approx_equal(std::shared_ptr<column_statistics> other)
const {}
388 std::string column_name;
400 std::vector<size_t> counts;
407 std::vector<element_statistics> statistics;
408 size_t total_row_count = 0;
416 size_t parallel_threshhold = 1024*1024;
418 std::vector<size_t> by_thread_row_counts;
425 std::vector<std::vector<size_t> > by_thread_element_counts;
426 std::vector<std::vector<element_statistics_accumulator> > by_thread_mean_var_acc;
430 static constexpr
size_t n_locks = 64;
432 std::array<simple_spinlock, n_locks> global_element_locks;
433 std::vector<size_t> global_element_counts;
434 std::vector<element_statistics_accumulator> global_mean_var_acc;
436 volatile size_t global_size = 0;
437 volatile size_t global_array_buffer_size = 0;
438 std::mutex _array_resize_lock;
443 inline size_t get_lock_index(
size_t idx)
const {
444 size_t lock_idx = (idx % n_locks);
450 template <
typename T>
451 inline void check_local_array_size(
size_t idx, std::vector<T>& v) {
452 DASSERT_LT(idx, parallel_threshhold);
455 if(idx >= v.size() ) {
457 if(UNLIKELY(v.capacity() < idx + 1)) {
458 size_t new_capacity = std::min(parallel_threshhold, 3 * (idx + 1) / 2);
461 if(new_capacity > parallel_threshhold / 2)
462 new_capacity = parallel_threshhold;
464 v.reserve(new_capacity);
473 template <
typename... V>
474 inline void check_global_array_sizes(
size_t idx, V&... vv) {
477 if(UNLIKELY(idx >= global_size)) {
481 if(UNLIKELY(idx >= global_array_buffer_size)) {
482 resize_global_arrays(idx, vv...);
487 template <
typename T>
488 inline void __resize_global_array(
size_t new_size, std::vector<T>& v) {
490 DASSERT_EQ(v.size(), global_array_buffer_size);
494 template <
typename V1,
typename... VV>
495 inline void __resize_global_array(
size_t new_size, V1& v, VV&... other_v) {
496 __resize_global_array(new_size, v);
497 __resize_global_array(new_size, other_v...);
501 template <
typename... V>
502 void resize_global_arrays(
size_t idx, V&... vv) {
505 size_t new_size = 2 * (parallel_threshhold + idx + 1);
510 std::lock_guard<std::mutex> lg(_array_resize_lock);
513 if(global_array_buffer_size <= idx) {
514 std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
516 for(
size_t i = 0; i < n_locks; ++i) {
517 all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
523 boost::lock(all_locks.begin(), all_locks.end());
525 __resize_global_array(new_size, vv...);
527 global_array_buffer_size = new_size;
544 size_t version = m->get_version();
550 } END_OUT_OF_PLACE_SAVE()
555 arc >> is_not_nullptr;
563 m->load_version(arc, version);
566 m = std::shared_ptr<ml_data_internal::column_statistics>(
nullptr);
568 } END_OUT_OF_PLACE_LOAD()
#define BEGIN_OUT_OF_PLACE_LOAD(arc, tname, tval)
Macro to make it easy to define out-of-place loads.
void update_numeric_statistics(size_t thread_idx, const std::vector< double > &value_vect) GL_HOT
Update categorical statistics for a batch of real values.
size_t get_version() const
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
void merge_in(const column_statistics &other)
size_t num_observations() const
void save_impl(turi::oarchive &oarc) const
void reindex(const std::vector< size_t > &new_index_map, size_t new_column_size)
Inheriting from this type will force the serializer to treat the derived type as a POD type...
void load_version(turi::iarchive &iarc, size_t version)
#define DASSERT_FALSE(cond)
void update_categorical_statistics(size_t thread_idx, const std::vector< size_t > &cat_index_vect) GL_HOT
Update categorical statistics for a batch of categorical indices.
void initialize()
Initialize the statistics – counting, mean, and stdev.
bool operator!=(const column_statistics &other) const
T atomic_set_max(T &max_value, T new_value)
bool operator==(const column_statistics &other) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
void update_dict_statistics(size_t thread_idx, const std::vector< std::pair< size_t, double > > &dict) GL_HOT
Update statistics after observing a dictionary.
bool is_equal(const column_statistics *other_ptr) const
#define DASSERT_TRUE(cond)
#define BEGIN_OUT_OF_PLACE_SAVE(arc, tname, tval)
Macro to make it easy to define out-of-place saves.