Turi Create  4.0
ec_sort.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef SFRAME_ALGORITHM_EC_SORT_HPP
7 #define SFRAME_ALGORITHM_EC_SORT_HPP
8 
9 #include <vector>
10 #include <memory>
11 
12 namespace turi {
13 class sframe;
14 
15 namespace query_eval {
16 
17 struct planner_node;
18 
19 /**
20  * \ingroup sframe_query_engine
21  * \addtogroup Algorithms Algorithms
22  * \{
23  */
24 /**
25  * External Memory Columnar Sort. See \ref ec_sort for details.
26  *
27  *
28  * The current sort algorithm (in \ref turi::query_eval::sort) implementation
29  * has lasted us a while and it is time to think about something better.
30  *
31  * A brief overview of the old sort algorithm
32  * ==========================================
33  *
34  * The old algorithm implemented in \ref sort essentially a bucket sort.
35  *
36  * Pivot generation
37  * ----------------
38  * - A fraction of random elements is selected on the key column filling a
39  * quantile sketch
40  * - The quantile sketch is used to select a set of K-1 pivots (hence K buckets)
41  * - Each bucket is associated with an output file
42  *
43  * Scatter
44  * -------
45  * - The entire sframe is materialized to a stream which consumes the sframe row
46  * by row.
47  * - For each row, the key is compared to the pivots, and the row is written
48  * row-wise into the bucket.
49  * - This process can be done in parallel
50  *
51  * Sort
52  * ----
53  * - Each bucket is loaded into memory and an in memory quicksort is performed.
54  * - This process can be done in parallel
55  *
56  * Advantages
57  * ----------
58  *
59  * 1. Exactly 2x write amplification. Every tuple is only written exactly twice.
60  * (But see Issue 1)
61  * 2. Works with Lazy sframe sources
62  *
63  * Issues
64  * ------
65  *
66  * 1. Much more than 2x write amplification.Though the buckets are not that well
67  * compressed since they are done row-wise. So while every tuple is written
68  * exactly twice, the effective \#bytes written can be *much much* larger.
69  *
70  * 2. Wide reads of SFrames are slow. If the SFrame to be sorted has a few hundred
71  * columns on disk, things break.
72  *
73  * 3. Memory limits are hard to control. Image columns or very big dictionaries /
74  * lists are problematic.
75  *
76  * The Proposed algorithm
77  * =======================
78  * Firstly, we assume that the sframe is read one value at a time, i.e.
79  * we get a stream of *(column\_number, row\_number, value)*. With the
80  * assumption that the data is at least, sequential within the column.
81  * I.e. if I just received *(c, r, v) : r > 0*,   I must have already
82  * received *(c, r-1, v)*
83  *
84  * The the algorithm proceeds as such:
85  *
86  * Forward Map Generation
87  * ----------------------
88  *
89  * - A set of row numbers are added to the key columns, and the key
90  * columns are sorted. And then dropped. This gives the inverse map.
91  * (i.e. x[i] = j implies output row i is read from input row j)
92  * - Row numbers are added again, and its sorted again by the first set
93  * of row numbers. This gives the forward map (i.e. y[i] = j implies
94  * input row i is written to output row j)
95  * - (In SFrame pseudocode:
96  *
97  * B = A[['key']].add_row_number('r1').sort('key')
98  * inverse_map = B['r1'] # we don't need this
99  * C = B.add_row_number('r2').sort('r1')
100  * foward_map = C['r2']
101  *
102  * The forward map is held as an SArray of integers.
103  *
104  * Pivot Generation
105  * ----------------
106  * - Now we have a forward map, we can get exact buckets. Of N/K
107  * length. I.e. row r is written to bucket `Floor(K \ forward_map(r) / N)`
108  *
109  * Scatter
110  * -------
111  * - For each (c,r,v) in data:
112  * Write (c,v) to bucket `Floor(K \ forward_map(r) / N)`
113  *
114  * This requires a little modification to the sframe writer to allow
115  * single columns writes (really it already does this internally. It
116  * transposes written rows to turn it to columns). This exploits the
117  * property that if (c,r-1,v) must be read before (c,r,v). Hence the
118  * rows are written in the correct order. (though how to do this in
119  * parallel is a question.)
120  *
121  * We will also have to generate a per-bucket forward_map using the same
122  * scatter procedure.
123  *
124  * This requires a little bit of intelligence in the caching of the
125  * forward map SArray. If the forward map is small, we can keep it all
126  * in memory. If it is large, need a bit more work. Some intelligence
127  * needed in this datastructure.
128  *
129  * Sort
130  * ----
131  *
132  * For each Bucket b:
133  * Allocate Output vector of (Length of bucket) * (#columns)
134  * Let S be the starting index of bucket b (i.e. b*N/k)
135  * Let T be the ending index of bucket b (i.e. (b+1)*N/k)
136  * Load forward_map[S:T] into memory
137  * For each (c,r,v) in bucket b
138  * Output[per_bucket_forward_map(r) - S][c] = v
139  * Dump Output to an SFrame
140  *
141  * Advantages
142  * ----------
143  *
144  * 1. Only sequential sweeps on the input SFrames, block-wise.
145  * 2. Does not matter if there are a large number of columns.
146  * 3. Memory limits can be easier to control.
147  * a. Scatter has little memory requirements (apart from the write buffer stage).
148  * b. The forward map is all integers.
149  * c. The Sort stage can happen a few columns at a time.
150  *
151  * Issues
152  * ------
153  *
154  * 1. The block-wise reads do not work well on lazy SFrames. In theory this is
155  * possible, since the algorithm is technically more general and will work even if
156  * the (c,r,v) tuples are generated one row at a time (i.e. wide reads). However,
157  * executing this on a lazy Sframe, means that we *actually* perform the wide
158  * reads which are slow. (Example: if I have a really wide SFrame on disk.
159  * And I perform the read through the query execution pipeline, the query
160  * execution pipeline performs the wide read and that is slow. Whereas if I go to
161  * the disk SFrame directly, I can completely avoid the wide read).
162  *
163  *
164  * 2. Due to (1) it is better to materialize the SFrame and operate on
165  * the physical SFrame. Hence we get up to 3x write amplification.
166  * However, every intermediate bucket is fully compressed.
167  *
168  * Optimizations And implementation Details
169  * ----------------------------------------
170  *
171  * - One key aspect is the construction of the forward map. We still
172  * need to have a second sort algorithm to make that work. We could use
173  * our current sorting algo, or try to make something simpler since the
174  * 'value' is always a single integer. As a start, I would just use our
175  * current sort implementation. And look for something better in the
176  * future.
177  *
178  * - We really need almost random seeks on the forward map. I.e. it is mostly
179  * sequential due to the column ordering. Some intelligence is needed to decide
180  * how much of the forward map to hold in memory, and how much to keep on disk.
181  *
182  * - Particular excessively heavy columns (like super big dictionaries,
183  * lists, images) , we store the row number instead during the scatter
184  * phase. And we use seeks against the original input to transfer it
185  * straight from input to output. We could take a guess about the
186  * column data size and use the seek strategy if it is too large.
187  *
188  * - There are 2 optimizations possible.
189  * - One is the above: we write row number into the bucket, and we go back
190  * to the original input SFrame for data. We can put a threshold size here
191  * of say ... 256 KB. about 100MBps * 2ms.
192  *
193  * - The second is optimization in the final sort of the bucket. We can perform
194  * seeks to fetch elements from the bucket. Unlike the above, we pay for an
195  * additional write + seek of the element, but it IS a smaller file.
196  *
197  * - Minimize full decode of dictionary/list type. We should be able to
198  * read/write dict/list columns in the input SFrame as strings. (Annoyingly
199  * this is actually quite hard to achieve at the moment)
200  *
201  * Notes on the current implementation
202  * -----------------------------------
203  * - The current implementation uses the old sort algorithm for the backward
204  * map generation, and the forward map generation. A faster algorithm could be
205  * used for the forward map sort since it now very small. A dedicated sort
206  * around just sorting integers could be nice.
207  *
208  * \param sframe_planner_node The lazy sframe to be sorted
209  * \param sort_column_names The columns to be sorted
210  * \param sort_orders The order for each column to be sorted, true is ascending
211  *
212  * \return The sorted sframe
213  */
214 std::shared_ptr<sframe> ec_sort(
215  std::shared_ptr<planner_node> sframe_planner_node,
216  const std::vector<std::string> column_names,
217  const std::vector<size_t>& key_column_indices,
218  const std::vector<bool>& sort_orders);
219 
220 /// \}
221 } // query_eval
222 } // turicreate
223 #endif
std::shared_ptr< sframe > ec_sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &key_column_indices, const std::vector< bool > &sort_orders)