Turi Create  4.0

Classes

struct  turi::query_eval::less_than_full_function
 
struct  turi::query_eval::less_than_partial_function
 

Functions

sframe turi::query_eval::permute_sframe (sframe &values_sframe, std::shared_ptr< sarray< flexible_type > > forward_map)
 
std::shared_ptr< sframeturi::query_eval::ec_sort (std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &key_column_indices, const std::vector< bool > &sort_orders)
 
std::shared_ptr< sframeturi::query_eval::groupby_aggregate (const std::shared_ptr< planner_node > &source, const std::vector< std::string > &source_column_names, const std::vector< std::string > &keys, const std::vector< std::string > &output_column_names, const std::vector< std::pair< std::vector< std::string >, std::shared_ptr< group_aggregate_value >>> &groups)
 
std::shared_ptr< sframeturi::query_eval::sort (std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
 
std::shared_ptr< sframeturi::query_eval::sort_and_merge (const std::shared_ptr< sarray< std::pair< flex_list, std::string >>> &partition_array, const std::vector< bool > &partition_sorted, const std::vector< size_t > &partition_sizes, const std::vector< bool > &sort_orders, const std::vector< size_t > &permute_order, const std::vector< std::string > &column_names, const std::vector< flex_type_enum > &column_types)
 

Detailed Description

Sort, Groupby algorithms that take a lazy input.

Function Documentation

◆ ec_sort()

std::shared_ptr<sframe> turi::query_eval::ec_sort ( std::shared_ptr< planner_node sframe_planner_node,
const std::vector< std::string >  column_names,
const std::vector< size_t > &  key_column_indices,
const std::vector< bool > &  sort_orders 
)

External Memory Columnar Sort. See ec_sort for details.

The current sort algorithm (in turi::query_eval::sort) implementation has lasted us a while and it is time to think about something better.

A brief overview of the old sort algorithm

The old algorithm implemented in sort essentially a bucket sort.

Pivot generation

  • A fraction of random elements is selected on the key column filling a quantile sketch
  • The quantile sketch is used to select a set of K-1 pivots (hence K buckets)
  • Each bucket is associated with an output file

Scatter

  • The entire sframe is materialized to a stream which consumes the sframe row by row.
  • For each row, the key is compared to the pivots, and the row is written row-wise into the bucket.
  • This process can be done in parallel

Sort

  • Each bucket is loaded into memory and an in memory quicksort is performed.
  • This process can be done in parallel

Advantages

  1. Exactly 2x write amplification. Every tuple is only written exactly twice. (But see Issue 1)
  2. Works with Lazy sframe sources

Issues

  1. Much more than 2x write amplification.Though the buckets are not that well compressed since they are done row-wise. So while every tuple is written exactly twice, the effective #bytes written can be much much larger.
  2. Wide reads of SFrames are slow. If the SFrame to be sorted has a few hundred columns on disk, things break.
  3. Memory limits are hard to control. Image columns or very big dictionaries / lists are problematic.

The Proposed algorithm

Firstly, we assume that the sframe is read one value at a time, i.e. we get a stream of *(column_number, row_number, value)*. With the assumption that the data is at least, sequential within the column. I.e. if I just received *(c, r, v) : r > 0*,   I must have already received *(c, r-1, v)*

The the algorithm proceeds as such:

Forward Map Generation

  • A set of row numbers are added to the key columns, and the key columns are sorted. And then dropped. This gives the inverse map. (i.e. x[i] = j implies output row i is read from input row j)
  • Row numbers are added again, and its sorted again by the first set of row numbers. This gives the forward map (i.e. y[i] = j implies input row i is written to output row j)
  • (In SFrame pseudocode:

    B = A[['key']].add_row_number('r1').sort('key') inverse_map = B['r1'] # we don't need this C = B.add_row_number('r2').sort('r1') foward_map = C['r2']

The forward map is held as an SArray of integers.

Pivot Generation

  • Now we have a forward map, we can get exact buckets. Of N/K length. I.e. row r is written to bucket Floor(K \ forward_map(r) / N)

Scatter

  • For each (c,r,v) in data: Write (c,v) to bucket Floor(K \ forward_map(r) / N)

This requires a little modification to the sframe writer to allow single columns writes (really it already does this internally. It transposes written rows to turn it to columns). This exploits the property that if (c,r-1,v) must be read before (c,r,v). Hence the rows are written in the correct order. (though how to do this in parallel is a question.)

We will also have to generate a per-bucket forward_map using the same scatter procedure.

This requires a little bit of intelligence in the caching of the forward map SArray. If the forward map is small, we can keep it all in memory. If it is large, need a bit more work. Some intelligence needed in this datastructure.

Sort

For each Bucket b:
    Allocate Output vector of (Length of bucket) * (#columns)
    Let S be the starting index of bucket b (i.e. b*N/k)
    Let T be the ending index of bucket b (i.e. (b+1)*N/k)
    Load forward_map[S:T] into memory
    For each (c,r,v) in bucket b
        Output[per_bucket_forward_map(r) - S][c] = v
    Dump Output to an SFrame

Advantages

  1. Only sequential sweeps on the input SFrames, block-wise.
  2. Does not matter if there are a large number of columns.
  3. Memory limits can be easier to control. a. Scatter has little memory requirements (apart from the write buffer stage). b. The forward map is all integers. c. The Sort stage can happen a few columns at a time.

Issues

  1. The block-wise reads do not work well on lazy SFrames. In theory this is possible, since the algorithm is technically more general and will work even if the (c,r,v) tuples are generated one row at a time (i.e. wide reads). However, executing this on a lazy Sframe, means that we actually perform the wide reads which are slow. (Example: if I have a really wide SFrame on disk. And I perform the read through the query execution pipeline, the query execution pipeline performs the wide read and that is slow. Whereas if I go to the disk SFrame directly, I can completely avoid the wide read).
  2. Due to (1) it is better to materialize the SFrame and operate on the physical SFrame. Hence we get up to 3x write amplification. However, every intermediate bucket is fully compressed.

Optimizations And implementation Details

  • One key aspect is the construction of the forward map. We still need to have a second sort algorithm to make that work. We could use our current sorting algo, or try to make something simpler since the 'value' is always a single integer. As a start, I would just use our current sort implementation. And look for something better in the future.
  • We really need almost random seeks on the forward map. I.e. it is mostly sequential due to the column ordering. Some intelligence is needed to decide how much of the forward map to hold in memory, and how much to keep on disk.
  • Particular excessively heavy columns (like super big dictionaries, lists, images) , we store the row number instead during the scatter phase. And we use seeks against the original input to transfer it straight from input to output. We could take a guess about the column data size and use the seek strategy if it is too large.
  • There are 2 optimizations possible.
    • One is the above: we write row number into the bucket, and we go back to the original input SFrame for data. We can put a threshold size here of say ... 256 KB. about 100MBps * 2ms.
    • The second is optimization in the final sort of the bucket. We can perform seeks to fetch elements from the bucket. Unlike the above, we pay for an additional write + seek of the element, but it IS a smaller file.
  • Minimize full decode of dictionary/list type. We should be able to read/write dict/list columns in the input SFrame as strings. (Annoyingly this is actually quite hard to achieve at the moment)

Notes on the current implementation

  • The current implementation uses the old sort algorithm for the backward map generation, and the forward map generation. A faster algorithm could be used for the forward map sort since it now very small. A dedicated sort around just sorting integers could be nice.
Parameters
sframe_planner_nodeThe lazy sframe to be sorted
sort_column_namesThe columns to be sorted
sort_ordersThe order for each column to be sorted, true is ascending
Returns
The sorted sframe

◆ groupby_aggregate()

std::shared_ptr<sframe> turi::query_eval::groupby_aggregate ( const std::shared_ptr< planner_node > &  source,
const std::vector< std::string > &  source_column_names,
const std::vector< std::string > &  keys,
const std::vector< std::string > &  output_column_names,
const std::vector< std::pair< std::vector< std::string >, std::shared_ptr< group_aggregate_value >>> &  groups 
)

Groupby aggregate algorithm that operates on lazy input.

Identical to turi::groupby_aggregate but can take a lazy input.

Parameters
sourceThe lazy input node
source_column_namesThe column names of the input source
keysAn array of column names to generate the group on
output_column_namesThe output column names for each aggregate. This must be the same length as the 'groups' parameter. Output column names must be unique and must not share similar column names as keys. If there are any empty entries, their values will be automatically assigned.
groupsA collection of {column_names, group operator} pairs describing the aggregates to generate. You can have multiple aggregators for each set of columns. You do not need every column in the source to be represented. This must be the same length as the 'group_output_columns' parameter.
Examples:
/build/src/core/storage/sframe_interface/unity_sframe.hpp.

◆ permute_sframe()

sframe turi::query_eval::permute_sframe ( sframe values_sframe,
std::shared_ptr< sarray< flexible_type > >  forward_map 
)

Permutes an sframe by a forward map. forward_map has the same length as the sframe and must be a permutation of all the integers [0, len-1].

The input sframe is then permuted so that sframe row i is written to row forward_map[i] of the returned sframe.

Note
The forward_map is not checked that it is a valid permutation If the constraints is not met, either an exception will be thrown, or the result is ill-defined.

◆ sort()

std::shared_ptr<sframe> turi::query_eval::sort ( std::shared_ptr< planner_node sframe_planner_node,
const std::vector< std::string >  column_names,
const std::vector< size_t > &  sort_column_indices,
const std::vector< bool > &  sort_orders 
)

Sort given SFrame.

The algorithm is like the following:

  • First do a quantile sketch over all sort columns and use the quantile sketch to figure out the partition keys that we will use to split the sframe rows into small chunks so that each chunk is realtively sorted. Each chunk is small enough so that we could sort in memory
  • Scatter partition the sframe according to above partition keys. The resulting value is persisted. Each partition is stored as one segment in a sarray.
  • The sorting resulting is then lazily materialized through le_sort operator

There are a few optimizations along the way:

  • if all sorting keys are the same, then no need to sort
  • if the sframe is small enough to fit into memory, then we simply do a in memory sort
  • if some partitions of the sframe have the same sorting key, then that partition will not be sorted

Also see ec_sort for another sort implementation

Parameters
sframe_planner_nodeThe lazy sframe to be sorted
sort_column_namesThe columns to be sorted
sort_ordersThe order for each column to be sorted, true is ascending
Returns
The sorted sframe
Examples:
/build/src/core/storage/sframe_interface/unity_sframe.hpp.

◆ sort_and_merge()

std::shared_ptr<sframe> turi::query_eval::sort_and_merge ( const std::shared_ptr< sarray< std::pair< flex_list, std::string >>> &  partition_array,
const std::vector< bool > &  partition_sorted,
const std::vector< size_t > &  partition_sizes,
const std::vector< bool > &  sort_orders,
const std::vector< size_t > &  permute_order,
const std::vector< std::string > &  column_names,
const std::vector< flex_type_enum > &  column_types 
)

The merge stage of sort.

The input is a partially sorted(partitioned) sframe, represented by an sarray<string> with N segments. Each segment is a partitioned key range, and segments are ordered by the key orders.

Given the partially sorted sframe, this function will in parallel sort each partition, and concat the result into final sframe.

Parameters
partition_arraythe serialized input sframe, partially sorted
partition_sortedflag whether the partition is already sorted
partition_sizesthe estimate size of each partition
sort_orderssort order of the keys
permute_orderThe output order of the keys. column {permute_order[i]} will be stored in column i of the final SFrame
column_namescolumn names of the final sframe
column_typescolumn types of the final sframe
Returns
a sorted sframe.