Turi Create  4.0
unity_sframe.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_SFRAME_HPP
7 #define TURI_UNITY_SFRAME_HPP
8 
9 #include <memory>
10 #include <string>
11 #include <vector>
12 #include <model_server/lib/api/unity_sframe_interface.hpp>
13 #include <core/storage/sframe_interface/unity_sarray.hpp>
14 #include <core/storage/sframe_data/group_aggregate_value.hpp>
15 #include <core/storage/sframe_data/sframe_rows.hpp>
16 #include <visualization/server/plot.hpp>
17 
18 namespace turi {
19 
20 // forward declarations
21 class sframe;
22 class dataframe;
23 class sframe_reader;
24 class sframe_iterator;
25 
26 namespace query_eval {
27 struct planner_node;
28 } // query_eval
29 
30 
31 /**
32  * This is the SFrame object exposed to Python. It stores internally an
33  * \ref sframe object which is a collection of named columns, each of flexible
34  * type. The SFrame represents a complete immutable collection of columns.
35  * Once created, it cannot be modified. However, shallow copies or sub-selection
36  * of columns can be created cheaply.
37  *
38  * Internally it is simply a single shared_ptr to a \ref sframe object. The
39  * sframe construction is delayed until one of the construct calls are made.
40  *
41  * \code
42  * unity_sframe frame;
43  * // construct
44  * frame.construct(...)
45  * // frame is now immutable.
46  * \endcode
47  *
48  * The SFrame may require temporary on disk storage which will be deleted
49  * on program termination. Temporary file names are obtained from
50  * \ref turi::get_temp_name
51  */
52 class unity_sframe : public unity_sframe_base {
53  public:
54  /**
55  * Default constructor. Does nothing
56  */
57  unity_sframe();
58 
59  /**
60  * Destructor. Calls clear().
61  */
62  ~unity_sframe();
63 
64  /**
65  * Constructs an Sframe using a dataframe as input.
66  * Dataframe must not contain NaN values.
67  */
68  void construct_from_dataframe(const dataframe_t& df) override;
69 
70  /**
71  * Constructs an Sframe using a sframe as input.
72  */
73  void construct_from_sframe(const sframe& sf);
74 
75  /**
76  * Constructs an SFrame from an existing directory on disk saved with
77  * save_frame() or a on disk sarray prefix (saved with
78  * save_frame_by_index_file()). This function will automatically detect if
79  * the location is a directory, or a file. The files will not be deleted on
80  * destruction. If the current object is already storing an frame, it is
81  * cleared (\ref clear()). May throw an exception on failure. If an exception
82  * occurs, the contents of SArray is empty.
83  */
84  void construct_from_sframe_index(std::string index_file) override;
85 
86  /**
87  * Constructs an SFrame from one or more csv files.
88  * To keep the interface stable, the CSV parsing configuration read from a
89  * map of string->flexible_type called parsing_config. The URL can be a single
90  * filename or a directory name. When passing in a directory and the pattern
91  * is non-empty, we will attempt to treat it as a glob pattern.
92  *
93  * The default parsing configuration is the following:
94  * \code
95  * bool use_header = true;
96  * tokenizer.delimiter = ",";
97  * tokenizer.comment_char = '\0';
98  * tokenizer.escape_char = '\\';
99  * tokenizer.double_quote = true;
100  * tokenizer.quote_char = '\"';
101  * tokenizer.skip_initial_space = true;
102  * \endcode
103  *
104  * The fields in parsing config are:
105  * - use_header : True if not is_zero()
106  * - delimiter : The entire delimiter string
107  * - comment_char : First character if flexible_type is a string
108  * - escape_char : First character if flexible_type is a string
109  * - double_quote : True if not is zero()
110  * - quote_char : First character if flexible_type is a string
111  * - skip_initial_space : True if not is zero()
112  */
113  std::map<std::string, std::shared_ptr<unity_sarray_base>> construct_from_csvs(
114  std::string url,
115  std::map<std::string, flexible_type> parsing_config,
116  std::map<std::string, flex_type_enum> column_type_hints) override;
117 
118  void construct_from_planner_node(std::shared_ptr<query_eval::planner_node> node,
119  const std::vector<std::string>& column_names);
120 
121  /**
122  * Saves a copy of the current sframe into a directory.
123  * Does not modify the current sframe.
124  */
125  void save_frame(std::string target_directory) override;
126 
127  /**
128  * Performs an incomplete save of an existing SFrame into a directory.
129  * This saved SFrame may reference SFrames in other locations *in the same
130  * filesystem* for certain columns/segments/etc.
131  *
132  * Does not modify the current sframe.
133  */
134  void save_frame_reference(std::string target_directory) override;
135 
136 
137  /**
138  * Saves a copy of the current sframe into a target location defined by
139  * an index file. DOes not modify the current sframe.
140  */
141  void save_frame_by_index_file(std::string index_file);
142 
143  /**
144  * Clears the contents of the SFrame.
145  */
146  void clear() override;
147 
148  /**
149  * Returns the number of rows in the SFrame. Returns 0 if the SFrame is empty.
150  */
151  size_t size() override;
152 
153  /**
154  * Returns the number of columns in the SFrame.
155  * Returns 0 if the sframe is empty.
156  */
157  size_t num_columns() override;
158 
159  /**
160  * Returns an array containing the datatype of each column. The length
161  * of the return array is equal to num_columns(). If the sframe is empty,
162  * this returns an empty array.
163  */
164  std::vector<flex_type_enum> dtype() override;
165 
166  /**
167  * Returns the dtype of a particular column.
168  */
169  flex_type_enum dtype(size_t column_index);
170 
171  /**
172  * Returns the dtype of a particular column.
173  */
174  flex_type_enum dtype(const std::string& column_name);
175 
176  /**
177  * Returns an array containing the name of each column. The length
178  * of the return array is equal to num_columns(). If the sframe is empty,
179  * this returns an empty array.
180  */
181  std::vector<std::string> column_names() override;
182 
183  /**
184  * Returns some number of rows of the SFrame in a dataframe representation.
185  * if nrows exceeds the number of rows in the SFrame ( \ref size() ), this
186  * returns only \ref size() rows.
187  */
188  std::shared_ptr<unity_sframe_base> head(size_t nrows) override;
189 
190  /**
191  * Returns the index of the column `name`
192  */
193  size_t column_index(const std::string& name) override;
194 
195  /**
196  * Returns the name of the column in position `index.`
197  */
198  const std::string& column_name(size_t index);
199 
200  /**
201  * Returns true if the column is present in the sframe, and false
202  * otherwise.
203  */
204  bool contains_column(const std::string &name);
205 
206  /**
207  * Same as head, returning dataframe.
208  */
209  dataframe_t _head(size_t nrows) override;
210 
211  /**
212  * Returns some number of rows from the end of the SFrame in a dataframe
213  * representation. If nrows exceeds the number of rows in the SFrame
214  * ( \ref size() ), this returns only \ref size() rows.
215  */
216  std::shared_ptr<unity_sframe_base> tail(size_t nrows) override;
217 
218  /**
219  * Same as head, returning dataframe.
220  */
221  dataframe_t _tail(size_t nrows) override;
222 
223  /**
224  * Returns an SArray with the column that corresponds to 'name'. Throws an
225  * exception if the name is not in the current SFrame.
226  */
227  std::shared_ptr<unity_sarray_base> select_column(const std::string &name) override;
228 
229  /**
230  * Returns an SArray with the column that corresponds to index idx. Throws an
231  * exception if the name is not in the current SFrame.
232  */
233  std::shared_ptr<unity_sarray_base> select_column(size_t idx);
234 
235  /**
236  * Returns a new SFrame which is filtered by a given logical column.
237  * The index array must be the same length as the current array. An output
238  * array is returned containing only the elements in the current where are the
239  * corresponding element in the index array evaluates to true.
240  */
241  std::shared_ptr<unity_sframe_base> logical_filter(std::shared_ptr<unity_sarray_base> index) override;
242 
243  /**
244  * Returns an lazy sframe with the columns that have the given names. Throws an
245  * exception if ANY of the names given are not in the current SFrame.
246  */
247  std::shared_ptr<unity_sframe_base> select_columns(const std::vector<std::string> &names) override;
248 
249  /**
250  * Returns an lazy sframe with the columns given by the indices.
251  */
252  std::shared_ptr<unity_sframe_base> select_columns(const std::vector<size_t>& indices);
253 
254  /**
255  * Returns an lazy sframe which a the copy of the current one
256  */
257  std::shared_ptr<unity_sframe_base> copy();
258 
259  /**
260  * Mutates the current SFrame by adding the given column.
261  *
262  * Throws an exception if:
263  * - The given column has a different number of rows than the SFrame.
264  */
265  void add_column(std::shared_ptr<unity_sarray_base >data, const std::string &name) override;
266 
267  /**
268  * Mutates the current SFrame by adding the given columns.
269  *
270  * Throws an exception if ANY given column cannot be added
271  * (for one of the reasons that add_column can fail).
272  *
273  * \note Currently leaves the SFrame in an unfinished state if one of the
274  * columns fails...the columns before that were added successfully will
275  * be there. This needs to be changed.
276  */
277  void add_columns(std::list<std::shared_ptr<unity_sarray_base>> data_list,
278  std::vector<std::string> name_vec) override;
279 
280  /**
281  * Returns a new sarray which is a transform of each row in the sframe
282  * using a Python lambda function pickled into a string.
283  */
284  std::shared_ptr<unity_sarray_base> transform(const std::string& lambda,
285  flex_type_enum type,
286  bool skip_undefined,
287  uint64_t seed) override;
288 
289  /**
290  * Returns a new sarray which is a transform of each row in the sframe
291  * using a Python lambda function pickled into a string.
292  */
293  std::shared_ptr<unity_sarray_base> transform_native(const function_closure_info& lambda,
294  flex_type_enum type,
295  bool skip_undefined,
296  uint64_t seed) override;
297 
298  /**
299  * Returns a new sarray which is a transform of each row in the sframe
300  * using a Python lambda function pickled into a string.
301  */
302  std::shared_ptr<unity_sarray_base> transform_lambda(
303  std::function<flexible_type(const sframe_rows::row&)> lambda,
304  flex_type_enum type,
305  uint64_t seed);
306 
307  /**
308  * Returns a new sarray which is a transform of each row in the sframe
309  * using a Python lambda function pickled into a string.
310  */
311  std::shared_ptr<unity_sframe_base> flat_map(const std::string& lambda,
312  std::vector<std::string> output_column_names,
313  std::vector<flex_type_enum> output_column_types,
314  bool skip_undefined,
315  uint64_t seed) override;
316 
317  /**
318  * Set the ith column name.
319  *
320  * Throws an exception if index out of bound or name already exists.
321  */
322  void set_column_name(size_t i, std::string name) override;
323 
324  /**
325  * Remove the ith column.
326  */
327  void remove_column(size_t i) override;
328 
329  /**
330  * Swap the ith and jth columns.
331  */
332  void swap_columns(size_t i, size_t j) override;
333 
334  /**
335  * Returns the underlying shared_ptr to the sframe object.
336  */
337  std::shared_ptr<sframe> get_underlying_sframe();
338 
339  /**
340  * Returns the underlying planner pointer
341  */
342  std::shared_ptr<query_eval::planner_node> get_planner_node();
343 
344  /**
345  * Sets the private shared pointer to an sframe.
346  */
347  void set_sframe(const std::shared_ptr<sframe>& sf_ptr);
348 
349  /**
350  * Begin iteration through the SFrame.
351  *
352  * Works together with \ref iterator_get_next(). The usage pattern
353  * is as follows:
354  * \code
355  * sframe.begin_iterator();
356  * while(1) {
357  * auto ret = sframe.iterator_get_next(64);
358  * // do stuff
359  * if (ret.size() < 64) {
360  * // we are done
361  * break;
362  * }
363  * }
364  * \endcode
365  *
366  * Note that use of pretty much any of the other data-dependent SArray
367  * functions will invalidate the iterator.
368  */
369  void begin_iterator() override;
370 
371  /**
372  * Obtains the next block of elements of size len from the SFrame.
373  * Works together with \ref begin_iterator(). See the code example
374  * in \ref begin_iterator() for details.
375  *
376  * This function will always return a vector of length 'len' unless
377  * at the end of the array, or if an error has occured.
378  *
379  * \param len The number of elements to return
380  * \returns The next collection of elements in the array. Returns less then
381  * len elements on end of file or failure.
382  */
383  std::vector< std::vector<flexible_type> > iterator_get_next(size_t len) override;
384 
385  /**
386  * Save the sframe to url in csv format.
387  * To keep the interface stable, the CSV parsing configuration read from a
388  * map of string->flexible_type called writing_config.
389  *
390  * The default writing configuration is the following:
391  * \code
392  * writer.delimiter = ",";
393  * writer.escape_char = '\\';
394  * writer.double_quote = true;
395  * writer.quote_char = '\"';
396  * writer.use_quote_char = true;
397  * \endcode
398  *
399  * For details on the meaning of each config see \ref csv_writer
400  *
401  * The fields in parsing config are:
402  * - delimiter : First character if flexible_type is a string
403  * - escape_char : First character if flexible_type is a string
404  * - double_quote : True if not is zero()
405  * - quote_char : First character if flexible_type is a string
406  * - use_quote_char : First character if flexible_type is a string
407  */
408  void save_as_csv(const std::string& url,
409  std::map<std::string, flexible_type> writing_config) override;
410 
411  /**
412  * Randomly split the sframe into two parts, with ratio = percent, and seed = random_seed.
413  *
414  * Returns a list of size 2 of the unity_sframes resulting from the split.
415  */
416  std::list<std::shared_ptr<unity_sframe_base>> random_split(float percent, uint64_t random_seed, bool exact=false) override;
417 
418  /**
419  * Randomly shuffles the sframe.
420  *
421  * Returns a list of size 2 of the unity_sframes resulting from the split.
422  */
423  std::shared_ptr<unity_sframe_base> shuffle() override;
424 
425  /**
426  * Sample the rows of sframe uniformly with ratio = percent, and seed = random_seed.
427  *
428  * Returns unity_sframe* containing the sampled rows.
429  */
430  std::shared_ptr<unity_sframe_base> sample(float percent, uint64_t random_seed, bool exact=false) override;
431 
432  /**
433  * materialize the sframe, this is different from save() as this is a temporary persist of
434  * all sarrays underneath the sframe to speed up some computation (for example, lambda)
435  * this will NOT create a new uity_sframe.
436  **/
437  void materialize() override;
438 
439  /**
440  * Returns whether or not this sframe is materialized
441  **/
442  bool is_materialized() override;
443 
444  /**
445  * Return the query plan as a string representation of a dot graph.
446  */
447  std::string query_plan_string() override;
448 
449  /**
450  * Return true if the sframe size is known.
451  */
452  bool has_size() override;
453 
454  /**
455  * Returns unity_sframe* where there is one row for each unique value of the
456  * key_column.
457  * group_operations is a collection of pairs of {column_name, operation_name}
458  * where operation_name is a builtin operator.
459  */
460  std::shared_ptr<unity_sframe_base> groupby_aggregate(
461  const std::vector<std::string>& key_columns,
462  const std::vector<std::vector<std::string>>& group_columns,
463  const std::vector<std::string>& group_output_columns,
464  const std::vector<std::string>& group_operations) override;
465 
466  /**
467  * \overload
468  */
469  std::shared_ptr<unity_sframe_base> groupby_aggregate(
470  const std::vector<std::string>& key_columns,
471  const std::vector<std::vector<std::string>>& group_columns,
472  const std::vector<std::string>& group_output_columns,
473  const std::vector<std::shared_ptr<group_aggregate_value>>& group_operations);
474 
475  /**
476  * Returns a new SFrame which contains all rows combined from current SFrame and "other"
477  * The "other" SFrame has to have the same number of columns with the same column names
478  * and same column types as "this" SFrame
479  */
480  std::shared_ptr<unity_sframe_base> append(std::shared_ptr<unity_sframe_base> other) override;
481 
482  inline std::shared_ptr<unity_sframe_base> join(std::shared_ptr<unity_sframe_base >right,
483  const std::string join_type,
484  const std::map<std::string,std::string>& join_keys) override
485  { return join_with_custom_name(right, join_type, join_keys, {}); }
486 
487  std::shared_ptr<unity_sframe_base> join_with_custom_name(std::shared_ptr<unity_sframe_base >right,
488  const std::string join_type,
489  const std::map<std::string,std::string>& join_keys,
490  const std::map<std::string,std::string>& alternative_names) override;
491 
492  std::shared_ptr<unity_sframe_base> sort(const std::vector<std::string>& sort_keys,
493  const std::vector<int>& sort_ascending) override;
494 
495  /**
496  * Pack a subset columns of current SFrame into one dictionary column, using
497  * column name as key in the dictionary, and value of the column as value
498  * in the dictionary, returns a new SFrame that includes other non-packed
499  * columns plus the newly generated dict column.
500  * Missing value in the original column will not show up in the packed
501  * dictionary value.
502 
503  * \param pack_column_names : list of column names to pack
504  * \param dict_key_names : dictionary key name to give to the packed dictionary
505  * \param dtype: the result SArray type
506  missing value is maintained, it could be filled with fill_na value is specified.
507  * \param fill_na: the value to fill when missing value is encountered
508 
509  * Returns a new SArray that contains the newly packed column
510  **/
511  std::shared_ptr<unity_sarray_base> pack_columns(
512  const std::vector<std::string>& pack_column_names,
513  const std::vector<std::string>& dict_key_names,
514  flex_type_enum dtype,
515  const flexible_type& fill_na) override;
516 
517  /**
518  * Convert a dictionary column of the SFrame to two columns with first column
519  * as the key for the dictionary and second column as the value for the
520  * dictionary. Returns a new SFrame with the two newly created columns, plus
521  * all columns other than the stacked column. The values from those columns
522  * are duplicated for all rows created from the same original row.
523 
524  * \param column_name: string
525  The column to stack. The name must come from current SFrame and must be of dict type
526 
527  * \param new_column_names: a list of str, optional
528  Must be length of two. The two column names to stack the dict value to.
529  If not given, the name is automatically generated.
530 
531  * \param new_column_types: a list of types, optional
532  Must be length of two. The type for the newly created column. If not
533  given, the default to [str, int].
534 
535  * \param drop_na if true, missing values from dictionary will be ignored. If false,
536  for missing dict value, one row will be created with the two new columns' value
537  being missing value
538 
539  * Retruns a new unity_sframe with stacked columns
540  **/
541  std::shared_ptr<unity_sframe_base> stack(
542  const std::string& column_name,
543  const std::vector<std::string>& new_column_names,
544  const std::vector<flex_type_enum>& new_column_types,
545  bool drop_na) override;
546 
547  /**
548  * Extracts a range of rows from an SFrame as a new SFrame.
549  * This will extract rows beginning at start (inclusive) and ending at
550  * end(exclusive) in steps of "step".
551  * step must be at least 1.
552  */
553  std::shared_ptr<unity_sframe_base> copy_range(size_t start, size_t step, size_t end) override;
554 
555  /**
556  * Returns a new SFrame with missing values dropped.
557  *
558  * Missing values are only searched for in the columns specified in the
559  * 'column_names'. If this vector is empty, all columns will be considered.
560  * If 'all' is true, a row is only dropped if all specified columns contain a
561  * missing value. If false, the row is dropped if any of the specified
562  * columns contain a missing value.
563  *
564  * If 'split' is true, this function returns two SFrames, the first being the
565  * SFrame with missing values dropped, and the second consisting of all the
566  * rows removed.
567  *
568  * If 'recursive' is true, the `nan`element check will be perfromed in
569  * a recursive manner to check each unit in a container-like flexible-typed
570  * cell in SFrame.
571  *
572  * Throws if the column names are not in this SFrame, or if too many are given.
573  */
574  std::list<std::shared_ptr<unity_sframe_base>> drop_missing_values(
575  const std::vector<std::string>& column_names, bool all, bool split,
576  bool recursive) override;
577 
578  dataframe_t to_dataframe() override;
579 
580  void save(oarchive& oarc) const override;
581 
582  void load(iarchive& iarc) override;
583 
584  void delete_on_close() override;
585 
586  /**
587  * Similar to logical filter, but return both positive and negative rows.
588  *
589  * \param logical_filter_array is an sarray of the same size, and has only
590  * zeros and ones as value.
591  *
592  * Return a list of two sframes with all positive examples goes to the first
593  * one and negative rows goes to the second one.
594  */
595  std::list<std::shared_ptr<unity_sframe_base>> logical_filter_split(
596  std::shared_ptr<unity_sarray_base> logical_filter_array);
597 
598  void explore(const std::string& path_to_client, const std::string& title) override;
599  void show(const std::string& path_to_client) override;
600  std::shared_ptr<model_base> plot() override;
601 
602  private:
603  /**
604  * Pointer to the lazy evaluator logical operator node.
605  * Should never be NULL. Must be set with the set_planner_node() function above.
606  */
607  std::shared_ptr<query_eval::planner_node> m_planner_node;
608 
609  std::vector<std::string> m_column_names;
610 
611  std::shared_ptr<sframe> m_cached_sframe;
612 
613  /**
614  * Supports \ref begin_iterator() and \ref iterator_get_next().
615  * The next segment I will read. (i.e. the current segment I am reading
616  * is iterator_next_segment_id - 1)
617  */
618  size_t iterator_next_segment_id = 0;
619 
620  /**
621  * A copy of the current SFrame. This allows iteration, and other
622  * SAarray operations to operate together safely in harmony without collisions.
623  */
624  std::unique_ptr<sframe_reader> iterator_sframe_ptr;
625 
626  /**
627  * Supports \ref begin_iterator() and \ref iterator_get_next().
628  * The begin iterator of the current segment I am reading.
629  */
630  std::unique_ptr<sframe_iterator> iterator_current_segment_iter;
631 
632  /**
633  * Supports \ref begin_iterator() and \ref iterator_get_next().
634  * The end iterator of the current segment I am reading.
635  */
636  std::unique_ptr<sframe_iterator> iterator_current_segment_enditer;
637 
638 
639  private:
640  // Helper functions
641 
642  /**
643  * Convert column names to column indices.
644  *
645  * If input column_names is empty, return 0,1,2,...num_columns-1
646  *
647  * Throw if column_names has duplication, or some column name does not exist.
648  */
649  std::vector<size_t> _convert_column_names_to_indices(const std::vector<std::string>& column_names);
650 
651  /**
652  * Generate a new column name
653  *
654  * New column name is in the form of X1, X2, X3 ....
655  * In case of conflict, add .1, .2 until conflict is resolved.
656  *
657  * \example
658  *
659  * Given current sframe column names: a, b, c
660  * Next 3 generated names are: X4, X5, X6
661  *
662  * Given current sframe column names: X4, X5.1, X6.2
663  * Next 3 generated names are: X4.1, X5, X6.1
664  */
665  std::string generate_next_column_name();
666 };
667 
668 
669 
670 }
671 
672 #endif
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
void copy_range(S &&input, T &&output, size_t start, size_t step, size_t end)
Definition: algorithm.hpp:599
std::vector< sframe > shuffle(sframe sframe_in, size_t n, std::function< size_t(const std::vector< flexible_type > &)> hash_fn, std::function< void(const std::vector< flexible_type > &, size_t)> emit_call_back=std::function< void(const std::vector< flexible_type > &, size_t)>())
void copy(Iterator begin, Iterator end, SWriter &&writer)
Definition: algorithm.hpp:416
std::shared_ptr< sframe > groupby_aggregate(const std::shared_ptr< planner_node > &source, const std::vector< std::string > &source_column_names, const std::vector< std::string > &keys, const std::vector< std::string > &output_column_names, const std::vector< std::pair< std::vector< std::string >, std::shared_ptr< group_aggregate_value >>> &groups)
void split(S &&input, T &&output1, T &&output2, FilterFn filterfn, size_t random_seed=std::time(NULL))
Definition: algorithm.hpp:293
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())
Definition: algorithm.hpp:64