6 #ifndef TURI_TRANSFORM_UTILS_H_ 7 #define TURI_TRANSFORM_UTILS_H_ 9 #include <boost/regex.hpp> 11 #include <core/data/sframe/gl_sframe.hpp> 12 #include <core/util/try_finally.hpp> 13 #include <core/parallel/lambda_omp.hpp> 15 #include <core/data/sframe/gl_sframe.hpp> 16 #include <core/data/sframe/gl_sarray.hpp> 18 #include <toolkits/feature_engineering/topk_indexer.hpp> 19 #include <toolkits/feature_engineering/statistics_tracker.hpp> 22 namespace transform_utils{
34 inline void validate_feature_columns(
35 const std::vector<std::string>& data_column_names,
36 const std::vector<std::string>& feature_column_names,
39 if(feature_column_names.empty()) {
40 log_and_throw(
"No input features are specified.");
43 std::set<std::string> data_column_set(data_column_names.begin(),
44 data_column_names.end());
45 std::set<std::string> feature_column_set(feature_column_names.begin(),
46 feature_column_names.end());
48 std::vector<std::string> result;
50 data_column_set.begin(), data_column_set.end(),
51 inserter(result, result.end()));
53 if (result.size() > 0 && verbose){
54 std::stringstream err_msg;
55 err_msg <<
"Feature(s) ";
56 for (
size_t i=0; i < result.size()-1; i++) {
57 err_msg << result[i] <<
", ";
59 err_msg << result[result.size()-1]
60 <<
" are missing from the dataset." << std::endl;
61 log_and_throw(err_msg.str());
76 inline std::string get_unique_feature_name(
77 const std::vector<std::string>& feature_columns,
78 const std::string& output_column_name) {
80 std::string output_name = output_column_name;
82 while (std::find(feature_columns.begin(), feature_columns.end(),
83 output_name) != feature_columns.end()) {
85 output_name = output_column_name +
"." + std::to_string(counter);
100 inline void validate_feature_types(
101 const std::vector<std::string>& feature_names,
102 const std::map<std::string, flex_type_enum>& feature_types,
103 const gl_sframe& data) {
105 for (
auto& col_name : feature_names){
107 auto fit_type = feature_types.at(col_name);
108 auto transform_type = data[col_name].dtype();
110 if (fit_type != transform_type) {
111 log_and_throw(
"Column '" + col_name +
"' was of type " +
141 inline std::vector<std::string> get_column_names(
const gl_sframe& data,
143 const flexible_type& feature_columns) {
145 std::vector<std::string> feature_columns_vector;
147 feature_columns_vector = data.column_names();
148 }
else if (!feature_columns.get<
flex_list>().size()) {
149 feature_columns_vector = data.column_names();
151 feature_columns_vector = variant_get_value<std::vector<std::string>>(
156 std::vector<std::string> data_column_names = data.column_names();
157 std::set<std::string> total_set(data_column_names.begin(),
158 data_column_names.end());
159 std::set<std::string> exclude_set(feature_columns_vector.begin(),
160 feature_columns_vector.end());
161 std::set<std::string> result;
163 exclude_set.begin(), exclude_set.end(),
164 inserter(result, result.begin()));
165 return std::vector<std::string>(result.begin(),result.end());
167 return feature_columns_vector;
180 inline std::vector<std::string> select_feature_subset(
const gl_sframe& data,
181 const std::vector<std::string>& feature_columns) {
183 std::vector<std::string> data_column_names = data.column_names();
184 std::set<std::string> total_set(data_column_names.begin(),
185 data_column_names.end());
186 std::set<std::string> feature_set(feature_columns.begin(),
187 feature_columns.end());
188 std::set<std::string> result;
189 std::set_intersection(total_set.begin(), total_set.end(),
190 feature_set.begin(), feature_set.end(),
191 inserter(result, result.begin()));
193 if (result.size() != feature_columns.size()) {
195 << feature_columns.size() <<
" feature columns but only " 196 << result.size() <<
" were present during transform(). " 197 <<
"Proceeding with transform by ignoring the missing columns." 202 std::vector<std::string> ret;
203 ret.reserve(result.size());
205 for(
const auto& s : feature_columns) {
206 if(result.count(s)) {
226 inline flex_dict flexible_type_to_flex_dict(
const flexible_type& in){
232 out[0] = std::make_pair(0,in);
235 out[0] = std::make_pair(in,1);
238 out.resize(list.size());
239 for (
size_t i = 0 ; i < list.size(); i++){
240 out[i] = std::make_pair(i,list[i]);
244 out.resize(vec.size());
245 for (
size_t i = 0 ; i < vec.size(); i++){
246 out[i] = std::make_pair(i,vec[i]);
248 }
else if (is_numeric_type(in.get_type())){
250 out[0] = std::make_pair(0, in);
259 inline gl_sframe extract_columns(
const gl_sframe& training_data,
260 std::vector<std::string>& feature_columns,
262 if (!feature_columns.size()) {
263 feature_columns = training_data.column_names();
266 std::vector<std::string> training_data_column_names =
267 training_data.column_names();
268 std::set<std::string> total_set(training_data_column_names.begin(),
269 training_data_column_names.end());
270 std::set<std::string> exclude_set(feature_columns.begin(),
271 feature_columns.end());
273 std::set<std::string> result;
275 exclude_set.begin(), exclude_set.end(),
276 inserter(result, result.begin()));
277 return training_data.select_columns(
278 std::vector<std::string>(result.begin(),result.end()));
280 return training_data.select_columns(feature_columns);
305 inline std::vector<std::string> select_valid_features_nothrow(
const gl_sframe&
307 const std::vector<std::string>& features,
308 const std::vector<flex_type_enum>& valid_feature_types,
309 bool verbose =
true){
312 std::vector<flex_type_enum> col_types;
313 std::map<std::string, flex_type_enum> col_type_map;
314 for (
size_t i=0; i < features.size(); i++) {
315 col_types.push_back(dataset[features[i]].dtype());
316 col_type_map[features[i]] = dataset[features[i]].dtype();
320 std::vector<std::string> valid_features;
321 for (
size_t i=0; i < features.size(); i++) {
322 auto col = features[i];
323 auto coltype = col_types[i];
326 if (std::find(valid_feature_types.begin(), valid_feature_types.end(),
327 coltype) == valid_feature_types.end()) {
330 <<
"' is excluded due to invalid column type (" 335 valid_features.push_back(col);
339 return valid_features;
363 inline std::vector<std::string> select_valid_features(
const gl_sframe& dataset,
364 const std::vector<std::string>& features,
365 const std::vector<flex_type_enum>& valid_feature_types,
366 bool verbose =
true){
369 std::vector<std::string> valid_features =
370 select_valid_features_nothrow(
371 dataset, features, valid_feature_types, verbose);
374 if (valid_features.size() == 0 && verbose ) {
375 std::string err_msg =
"The input data does not contain any usable feature" 376 " columns. This model only supports features of type: ";
377 for (
size_t k = 0; k < valid_feature_types.size() - 1; ++k){
378 err_msg += std::string(
381 err_msg += std::string(
383 log_and_throw(err_msg);
385 return valid_features;
413 inline void create_topk_index_mapping(
const gl_sarray& src,
414 std::shared_ptr<topk_indexer> indexer) {
420 indexer->initialize();
421 size_t src_size = src.size();
424 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
427 size_t start_idx = src_size * thread_idx / num_threads;
428 size_t end_idx = src_size * (thread_idx + 1) / num_threads;
430 for (
const auto& v: src.range_iterator(start_idx, end_idx)) {
436 indexer->insert_or_update(v, thread_idx);
443 size_t n_values = vv.size();
445 for(
size_t k = 0; k < n_values; ++k) {
446 indexer->insert_or_update(vv[k], thread_idx);
455 size_t n_values = dv.size();
457 for(
size_t k = 0; k < n_values; ++k) {
458 const std::pair<flexible_type, flexible_type>& kvp = dv[k];
459 flexible_type out_key =
461 indexer->insert_or_update(out_key, thread_idx);
486 inline size_t validate_list_vec_length(
const gl_sarray& src,
const std::string& column_name){
488 size_t src_size = src.size();
492 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
494 size_t start_idx = src_size * thread_idx / num_threads;
495 size_t end_idx = src_size * (thread_idx + 1) / num_threads;
497 flexible_type length = flex_undefined();
498 flexible_type old_length = flex_undefined();
500 for (
const auto& v: src.range_iterator(start_idx, end_idx)){
507 log_and_throw(
"All list/vectors in column" + column_name +
"must be of same length or None.");
514 length_list[thread_idx] = length;
517 flexible_type total_length = flex_undefined();
518 flexible_type old_total_length = flex_undefined();
519 for (
const auto& l: length_list){
524 log_and_throw(
"All list/vectors in column" + column_name +
"must be of same length or None.");
526 old_total_length = total_length;
531 log_and_throw(
"At least one value in column_name" + column_name +
"must have" 532 " a non-None value");
547 inline void num_sparse_features(
const gl_sarray& src,
const std::string& column_name, std::set<flexible_type>& out_set){
550 size_t src_size = src.size();
551 std::vector<std::set<flexible_type>> threadlocal_key_set;
554 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
556 size_t start_idx = src_size * thread_idx / num_threads;
557 size_t end_idx = src_size * (thread_idx + 1) / num_threads;
561 for (
const auto& d: src.range_iterator(start_idx, end_idx)){
564 for (
const auto& kvp: dd){
565 threadlocal_key_set[thread_idx].insert(kvp.first);
571 for (
const auto& t: threadlocal_key_set){
572 for (
const auto& k: t){
577 if(out_set.size() == 0 ){
578 log_and_throw(
"There must be at least one non-None value in dictionary" 579 " column for mean imputation");
594 inline void create_mean_mapping(
const gl_sarray& src,
595 const std::string& column_name,
596 std::shared_ptr<statistics_tracker> tracker) {
602 tracker->initialize();
603 size_t src_size = src.size();
604 size_t vec_list_length;
605 std::set<flexible_type> sparse_features;
610 vec_list_length = validate_list_vec_length(src, column_name);
612 num_sparse_features(src, column_name, sparse_features);
616 in_parallel([&](
size_t thread_idx,
size_t num_threads) {
619 size_t start_idx = src_size * thread_idx / num_threads;
620 size_t end_idx = src_size * (thread_idx + 1) / num_threads;
622 for (
const auto& v: src.range_iterator(start_idx, end_idx)) {
627 flexible_type key = 0;
628 tracker->insert_or_update(key, v, thread_idx);
638 size_t n_values = vec_list_length;
639 for(
size_t k = 0; k < n_values; ++k) {
641 if (!transform_utils::is_numeric_type(vv[k].get_type())
643 log_and_throw(
"All list elements must be numeric for mean" 646 tracker->insert_or_update(k, vv[k], thread_idx);
648 tracker->insert_or_update(k, flex_undefined(), thread_idx);
660 size_t n_values = vec_list_length;
661 for(
size_t k = 0; k < n_values; ++k) {
663 tracker->insert_or_update(k, vv[k], thread_idx);
665 tracker->insert_or_update(k, flex_undefined(), thread_idx);
677 size_t n_values = dv.size();
679 for(
size_t k = 0; k < n_values; ++k) {
680 const std::pair<flexible_type, flexible_type>& kvp = dv[k];
681 if (!transform_utils::is_numeric_type(kvp.second.get_type())
683 log_and_throw(
"All dictionary entries must be numeric for mean" 686 tracker->insert_or_update(kvp.first,kvp.second, thread_idx);
690 for(
const auto& v : sparse_features){
691 tracker->insert_or_update(v, flex_undefined(), thread_idx);
707 tracker->finalize(src_size);
715 typedef std::function<bool(const std::string&)> string_filter_condition;
717 typedef std::vector<std::pair<boost::regex, string_filter_condition> > string_filter_list;
730 static const string_filter_list ptb_filters = {
733 std::string(
"([+.-]?[0-9]+([.,()-]+[0-9]+)*)|") +
734 std::string(
"([^\\w\\s])|") +
735 std::string(
"(\\b[^\\w\\s]+)|") +
736 std::string(
"([\\w]([^\\s]*[\\w])?)|") +
737 std::string(
"([^\\w\\s]+\\b)")),
738 [](
const std::string& current){
return true;}),
741 std::string(
"([nN]?'\\w*)|([^\\s']*[^nN\\s'])")),
742 [](
const std::string& current){
return current.find(
"'") != std::string::npos;})
762 inline flex_list tokenize_string(
const std::string& to_tokenize,
763 const string_filter_list& filter_list,
764 const bool to_lower) {
766 boost::sregex_token_iterator end;
769 std::string str_copy = std::string(to_tokenize);
770 std::transform(str_copy.begin(), str_copy.end(), str_copy.begin(), ::tolower);
771 previous = {str_copy};
784 for (
const auto& filter : filter_list)
786 std::vector<std::string> current;
787 boost::smatch matches;
788 const boost::regex& expr = filter.first;
789 string_filter_condition condition = filter.second;
791 for (
const auto& token : previous)
793 const std::string& token_string = token.to<std::string>();
795 if ( condition(token_string) )
797 boost::sregex_token_iterator match(token_string.begin(), token_string.end(), expr, 0);
799 for ( ; match != end; ++match)
801 current.push_back(*match);
806 current.push_back(token);
810 previous =
flex_list(current.begin(), current.end());
std::vector< double > flex_vec
const char * flex_type_enum_to_name(flex_type_enum en)
static size_t cpu_count()
#define logprogress_stream
variant_type to_variant(const T &f)
std::set< T > set_difference(const std::set< T > &a, const std::set< T > &b)
void in_parallel(const std::function< void(size_t thread_id, size_t num_threads)> &fn)
std::vector< std::pair< flexible_type, flexible_type > > flex_dict
std::vector< flexible_type > flex_list
#define DASSERT_TRUE(cond)