Turi Create  4.0
join_impl.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #include <cstdio>
7 #include <unordered_set>
8 #include <unordered_map>
9 
10 #include <core/storage/sframe_data/sframe.hpp>
11 
12 //TODO: What happens if a join key (or part of one) is NULL?
13 enum join_type_t {INNER_JOIN = 0, LEFT_JOIN, RIGHT_JOIN, FULL_JOIN};
14 
15 namespace turi {
16 
17 
18 /**
19  * \ingroup sframe_physical
20  * \addtogroup Join Join
21  * \{
22  */
23 
24 /**
25  * Join Implementation Detail
26  */
27 namespace join_impl {
28 
29 size_t hash_key(const std::vector<flexible_type>& key);
30 size_t compute_hash_from_row(const std::vector<flexible_type> &row,
31  const std::vector<size_t> &positions);
32 
33 typedef struct {
34  std::vector<std::vector<flexible_type>> rows;
35  bool matched;
36 } hash_join_row_t;
37 
38 /**
39  * This class is the keeper of an in-memory hash table for use in a join
40  * algorithm. Its methods facilatate hashing by given join keys by taking
41  * a vector of positions these keys are in a row.
42  *
43  * NOTE:Could be templated for hash_type, row_type, but I don't see a need now.
44  */
46  public:
47  typedef hash_join_row_t value_type;
48 
49  /**
50  * Constructor. Takes a vector of hash positions, which are the column
51  * numbers in each row that represent the values the join is on (or the join
52  * keys). These hash positions are for the frame that each row is added from.
53  */
54  join_hash_table(std::vector<size_t> hp) : _hash_positions(hp) {}
55 
56  /**
57  * Add a row to the hash table. Each row must be from the same frame, or
58  * else join results will not make sense.
59  */
60  bool add_row(const std::vector<flexible_type> &row);
61 
62  /**
63  * Returns all rows whose join keys match the given row's join keys.
64  *
65  * The return object is a hash_join_row_t, a structure containing a list of
66  * rows and whether this join key was "matched" or not. An optional argument
67  * marks all of the matching rows as "matched", which is usually used for
68  * completing a left join, in deciding which rows need to be joined with NULL
69  * values and emitted into the result set.
70  */
71  const value_type &get_matching_rows(const std::vector<flexible_type> &row,
72  const std::vector<size_t> &hash_positions,
73  bool mark_match=true);
74 
75  /**
76  * Prints stats about the hash table to the log.
77  */
78  size_t num_stored_rows();
79  std::unordered_map<size_t, std::list<value_type>>::const_iterator cbegin();
80  std::unordered_map<size_t, std::list<value_type>>::const_iterator cend();
81 
82 
83  private:
84  /**
85  * Does an itemwise check to see if two rows have matching join keys.
86  */
87  bool join_values_equal(const std::vector<flexible_type> &row,
88  const std::vector<flexible_type> &other,
89  const std::vector<size_t> &hash_positions);
90 
91  // Holds a list of each row with an identical hash key. If we are joining on
92  // a key that holds a unique primary key within it, all entries will have a vector
93  // with 1 element. We can't guarantee this though, so the value type is a vector
94  // of vectors.
95  std::unordered_map<size_t, std::list<value_type>> _hash_table;
96  // The positions in the rows that we store taht make up the hash key
97  std::vector<size_t> _hash_positions;
98  const static value_type empty_vt;
99 };
100 
101 /**
102  * The hash_join_executor class executes a hash join. It is only meant
103  * to perform one join. Could eventually inherit from a generic join class
104  * and have further OOP design patterns to choose the algorithm (or just
105  * different function calls). Only one algorithm implemented thus far,
106  * so no need for that yet.
107  */
109  public:
110  //TODO: Perhaps combine the sframe and the join positions into a struct?
111  hash_join_executor(const sframe &left,
112  const sframe &right,
113  const std::vector<size_t> &left_join_positions,
114  const std::vector<size_t> &right_join_positions,
115  join_type_t join_type,
116  const std::map<std::string,std::string>& alter_names,
117  size_t max_buffer_size);
118 
119  ~hash_join_executor() {}
120 
121  sframe grace_hash_join();
122 
123  private:
124  // The original frames we were passed
125  sframe _left_frame;
126  sframe _right_frame;
127  std::vector<size_t> _left_join_positions;
128  std::vector<size_t> _right_join_positions;
129  size_t _max_buffer_size;
130  bool _left_join;
131  bool _right_join;
132  std::unordered_map<size_t,size_t> _right_to_left_join_positions;
133  std::unordered_map<size_t,size_t> _left_to_right_join_positions;
134  std::vector<std::string> _output_column_names;
135  std::vector<flex_type_enum> _output_column_types;
136  bool _reverse_output_column_order;
137  std::unordered_map<size_t, size_t> _reverse_to_original;
138  bool _frames_partitioned;
139  std::map<std::string, std::string> _alter_names_right;
140 
141  /**
142  * Partition the left and right frames for the GRACE hash join algorithm and
143  * write these partitions out to disk.
144  */
145  std::pair<std::shared_ptr<sframe>,std::shared_ptr<sframe>> grace_partition_frames();
146 
147  /**
148  * Partition one SFrame for the GRACE hash join algorithm.
149  *
150  * Used by grace_partition_frames().
151  */
152  std::shared_ptr<sframe> grace_partition_frame(const sframe &sf, const std::vector<size_t> &join_col_nums, size_t num_partitions);
153 
154  /**
155  * Return the number of cells (rows * cols) of an sframe.
156  */
157  size_t get_num_cells(const sframe &sf);
158 
159  /**
160  * Estimates how many partitions this SFrame should be divided into for the
161  * GRACE hash join. The goal is for each partition to fit into memory.
162  *
163  */
164  size_t choose_number_of_grace_partitions(const sframe &sf);
165 
166  /**
167  * Create an empty SFrame that includes the columns of both left and right
168  * frames without duplicating the join columns.
169  */
170  void init_result_frame(sframe &result_frame);
171 
172  /**
173  * Join a vector of rows from the left frame with a vector of rows from the
174  * right frame and write to the given output iterator.
175  *
176  * If both vectors have size > 0, this will perform a cross product and write
177  * the result. If one vector is empty and the other is not, each row from the
178  * non-empty vector is join with 'NULL' values, making sure that each join
179  * column is not 'NULL'.
180  */
181  void merge_rows_for_output(sframe &result_frame,
182  sframe::iterator result_iter,
183  const std::vector<std::vector<flexible_type>> &left_rows,
184  const std::vector<std::vector<flexible_type>> &right_rows);
185 
186  std::vector<flexible_type> unpack_row(std::string val, size_t num_cols);
187 };
188 
189 } // end of join_impl
190 
191 /// \}
192 } // end of turicreate
const value_type & get_matching_rows(const std::vector< flexible_type > &row, const std::vector< size_t > &hash_positions, bool mark_match=true)
bool add_row(const std::vector< flexible_type > &row)
join_hash_table(std::vector< size_t > hp)
Definition: join_impl.hpp:54