6 #ifndef TURI_ML2_DATA_BASIC_COLUMN_STATISTICS_H_ 7 #define TURI_ML2_DATA_BASIC_COLUMN_STATISTICS_H_ 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/logging/assertions.hpp> 11 #include <core/storage/serialization/serialization_includes.hpp> 12 #include <toolkits/ml_data_2/ml_data_column_modes.hpp> 13 #include <core/parallel/pthread_tools.hpp> 14 #include <boost/thread/lock_algorithms.hpp> 17 namespace turi {
namespace v2 {
namespace ml_data_internal {
19 extern size_t ML_DATA_STATS_PARALLEL_ACCESS_THRESHOLD;
37 return total_row_count;
43 size_t count(
size_t index)
const {
45 if(mode == ml_column_mode::NUMERIC
46 || mode == ml_column_mode::NUMERIC_VECTOR) {
48 return total_row_count;
50 return index < counts.size() ? counts[index] : 0;
57 double mean(
size_t index)
const {
58 if(mode == ml_column_mode::CATEGORICAL
59 || mode == ml_column_mode::CATEGORICAL_VECTOR) {
61 double p = double(count(index)) / (std::max(1.0,
double(total_row_count)));
69 return index < statistics.size() ? statistics[index].mean : 0;
76 double stdev(
size_t index)
const {
77 if(mode == ml_column_mode::CATEGORICAL
78 || mode == ml_column_mode::CATEGORICAL_VECTOR) {
81 double p = mean(index);
83 if (total_row_count > 1) {
84 stdev = std::sqrt(total_row_count * p * (1 - p) / (total_row_count-1));
96 return index < statistics.size() ? statistics[index].stdev : 0;
122 void _finalize_threadlocal(
size_t,
bool,
bool);
123 void _finalize_global(
size_t,
bool,
bool);
131 size_t get_version()
const {
return 2; }
149 std::shared_ptr<column_statistics> create_cleared_copy()
const;
162 std::vector<size_t> counts;
169 std::vector<element_statistics> statistics;
170 size_t total_row_count = 0;
178 size_t parallel_threshhold = 1024*1024;
180 std::vector<size_t> by_thread_row_counts;
187 std::vector<std::vector<size_t> > by_thread_element_counts;
188 std::vector<std::vector<element_statistics_accumulator> > by_thread_mean_var_acc;
191 static constexpr
size_t n_locks = 64;
193 std::array<simple_spinlock, n_locks> global_element_locks;
194 std::vector<size_t> global_element_counts;
195 std::vector<element_statistics_accumulator> global_mean_var_acc;
197 volatile size_t global_size = 0;
202 inline size_t get_lock_index(
size_t idx)
const {
203 size_t lock_idx = (idx % n_locks);
209 template <
typename T>
210 inline void check_local_array_size(
size_t idx, std::vector<T>& v) {
211 DASSERT_LT(idx, parallel_threshhold);
214 if(idx >= v.size() ) {
216 if(UNLIKELY(v.capacity() < idx + 1)) {
217 size_t new_capacity = std::min(parallel_threshhold, 3 * (idx + 1) / 2);
220 if(new_capacity > parallel_threshhold / 2)
221 new_capacity = parallel_threshhold;
223 v.reserve(new_capacity);
232 template <
typename T>
233 inline void check_global_array_size(
size_t idx, std::vector<T>& v) {
236 if(global_size <= idx) {
239 size_t g = global_size;
253 if(UNLIKELY(idx >= v.size() )) {
256 size_t new_size = 2 * (parallel_threshhold + idx + 1);
259 std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
260 for(
size_t i = 0; i < n_locks; ++i)
261 all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
266 boost::lock(all_locks.begin(), all_locks.end());
269 if(v.size() < idx + 1)
287 void set_data(
const std::map<std::string, variant_type>& params);
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
void update_categorical_statistics(size_t thread_idx, const std::vector< size_t > &cat_index_vect) GL_HOT_FLATTEN
Update categorical statistics for a batch of categorical indices.
void update_dict_statistics(size_t thread_idx, const std::vector< std::pair< size_t, double > > &dict) GL_HOT_FLATTEN
Update statistics after observing a dictionary.
Inheriting from this type will force the serializer to treat the derived type as a POD type...
size_t num_observations() const
bool atomic_compare_and_swap(T &a, T oldval, T newval)
void initialize()
Initialize the statistics – counting, mean, and stdev.
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
void update_numeric_statistics(size_t thread_idx, const std::vector< double > &value_vect) GL_HOT_FLATTEN
Update categorical statistics for a batch of real values.
#define DASSERT_TRUE(cond)