Turi Create
4.0
|
Classes | |
struct | turi::query_eval::less_than_full_function |
struct | turi::query_eval::less_than_partial_function |
Functions | |
sframe | turi::query_eval::permute_sframe (sframe &values_sframe, std::shared_ptr< sarray< flexible_type > > forward_map) |
std::shared_ptr< sframe > | turi::query_eval::ec_sort (std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &key_column_indices, const std::vector< bool > &sort_orders) |
std::shared_ptr< sframe > | turi::query_eval::groupby_aggregate (const std::shared_ptr< planner_node > &source, const std::vector< std::string > &source_column_names, const std::vector< std::string > &keys, const std::vector< std::string > &output_column_names, const std::vector< std::pair< std::vector< std::string >, std::shared_ptr< group_aggregate_value >>> &groups) |
std::shared_ptr< sframe > | turi::query_eval::sort (std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders) |
std::shared_ptr< sframe > | turi::query_eval::sort_and_merge (const std::shared_ptr< sarray< std::pair< flex_list, std::string >>> &partition_array, const std::vector< bool > &partition_sorted, const std::vector< size_t > &partition_sizes, const std::vector< bool > &sort_orders, const std::vector< size_t > &permute_order, const std::vector< std::string > &column_names, const std::vector< flex_type_enum > &column_types) |
Sort, Groupby algorithms that take a lazy input.
std::shared_ptr<sframe> turi::query_eval::ec_sort | ( | std::shared_ptr< planner_node > | sframe_planner_node, |
const std::vector< std::string > | column_names, | ||
const std::vector< size_t > & | key_column_indices, | ||
const std::vector< bool > & | sort_orders | ||
) |
External Memory Columnar Sort. See ec_sort for details.
The current sort algorithm (in turi::query_eval::sort) implementation has lasted us a while and it is time to think about something better.
The old algorithm implemented in sort essentially a bucket sort.
Firstly, we assume that the sframe is read one value at a time, i.e. we get a stream of *(column_number, row_number, value)*. With the assumption that the data is at least, sequential within the column. I.e. if I just received *(c, r, v) : r > 0*, Â I must have already received *(c, r-1, v)*
The the algorithm proceeds as such:
(In SFrame pseudocode:
B = A[['key']].add_row_number('r1').sort('key') inverse_map = B['r1'] # we don't need this C = B.add_row_number('r2').sort('r1') foward_map = C['r2']
The forward map is held as an SArray of integers.
Floor(K \ forward_map(r) / N)
Floor(K \ forward_map(r) / N)
This requires a little modification to the sframe writer to allow single columns writes (really it already does this internally. It transposes written rows to turn it to columns). This exploits the property that if (c,r-1,v) must be read before (c,r,v). Hence the rows are written in the correct order. (though how to do this in parallel is a question.)
We will also have to generate a per-bucket forward_map using the same scatter procedure.
This requires a little bit of intelligence in the caching of the forward map SArray. If the forward map is small, we can keep it all in memory. If it is large, need a bit more work. Some intelligence needed in this datastructure.
For each Bucket b: Allocate Output vector of (Length of bucket) * (#columns) Let S be the starting index of bucket b (i.e. b*N/k) Let T be the ending index of bucket b (i.e. (b+1)*N/k) Load forward_map[S:T] into memory For each (c,r,v) in bucket b Output[per_bucket_forward_map(r) - S][c] = v Dump Output to an SFrame
sframe_planner_node | The lazy sframe to be sorted |
sort_column_names | The columns to be sorted |
sort_orders | The order for each column to be sorted, true is ascending |
std::shared_ptr<sframe> turi::query_eval::groupby_aggregate | ( | const std::shared_ptr< planner_node > & | source, |
const std::vector< std::string > & | source_column_names, | ||
const std::vector< std::string > & | keys, | ||
const std::vector< std::string > & | output_column_names, | ||
const std::vector< std::pair< std::vector< std::string >, std::shared_ptr< group_aggregate_value >>> & | groups | ||
) |
Groupby aggregate algorithm that operates on lazy input.
Identical to turi::groupby_aggregate but can take a lazy input.
source | The lazy input node |
source_column_names | The column names of the input source |
keys | An array of column names to generate the group on |
output_column_names | The output column names for each aggregate. This must be the same length as the 'groups' parameter. Output column names must be unique and must not share similar column names as keys. If there are any empty entries, their values will be automatically assigned. |
groups | A collection of {column_names, group operator} pairs describing the aggregates to generate. You can have multiple aggregators for each set of columns. You do not need every column in the source to be represented. This must be the same length as the 'group_output_columns' parameter. |
sframe turi::query_eval::permute_sframe | ( | sframe & | values_sframe, |
std::shared_ptr< sarray< flexible_type > > | forward_map | ||
) |
Permutes an sframe by a forward map. forward_map has the same length as the sframe and must be a permutation of all the integers [0, len-1].
The input sframe is then permuted so that sframe row i is written to row forward_map[i] of the returned sframe.
std::shared_ptr<sframe> turi::query_eval::sort | ( | std::shared_ptr< planner_node > | sframe_planner_node, |
const std::vector< std::string > | column_names, | ||
const std::vector< size_t > & | sort_column_indices, | ||
const std::vector< bool > & | sort_orders | ||
) |
Sort given SFrame.
The algorithm is like the following:
There are a few optimizations along the way:
Also see ec_sort for another sort implementation
sframe_planner_node | The lazy sframe to be sorted |
sort_column_names | The columns to be sorted |
sort_orders | The order for each column to be sorted, true is ascending |
std::shared_ptr<sframe> turi::query_eval::sort_and_merge | ( | const std::shared_ptr< sarray< std::pair< flex_list, std::string >>> & | partition_array, |
const std::vector< bool > & | partition_sorted, | ||
const std::vector< size_t > & | partition_sizes, | ||
const std::vector< bool > & | sort_orders, | ||
const std::vector< size_t > & | permute_order, | ||
const std::vector< std::string > & | column_names, | ||
const std::vector< flex_type_enum > & | column_types | ||
) |
The merge stage of sort.
The input is a partially sorted(partitioned) sframe, represented by an sarray<string> with N segments. Each segment is a partitioned key range, and segments are ordered by the key orders.
Given the partially sorted sframe, this function will in parallel sort each partition, and concat the result into final sframe.
partition_array | the serialized input sframe, partially sorted |
partition_sorted | flag whether the partition is already sorted |
partition_sizes | the estimate size of each partition |
sort_orders | sort order of the keys |
permute_order | The output order of the keys. column {permute_order[i]} will be stored in column i of the final SFrame |
column_names | column names of the final sframe |
column_types | column types of the final sframe |