Turi Create  4.0
sgraph_edge_apply.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_SGRAPH_EDGE_APPLY_HPP
7 #define TURI_SGRAPH_SGRAPH_EDGE_APPLY_HPP
8 
9 #include <vector>
10 #include <tuple>
11 #include <type_traits>
12 #include <core/data/flexible_type/flexible_type.hpp>
13 #include <functional>
14 #include <core/storage/sframe_data/sarray.hpp>
15 #include <core/storage/sgraph_data/sgraph.hpp>
16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp>
17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp>
18 #include <core/util/cityhash_tc.hpp>
19 
20 namespace turi {
21 
22 
23 /**
24  * \ingroup sgraph_physical
25  * \addtogroup sgraph_compute SGraph Compute
26  * \{
27  */
28 
29 /**
30  * Graph Computation Functions
31  */
32 namespace sgraph_compute {
33 
34 /**
35  * Performs a map operation combining one external array (other) with the
36  * graph edge data. other must be the same length as the edge data.
37  * Abstractly performs the following computation:
38  * \code
39  * for each edge i:
40  * out[i] = fn(edge[i], other[i])
41  * \endcode
42  * out must be of the result_type specified.
43  *
44  * The function must take as the first argument, a vector<flexible_type>
45  * and the second argument, T, and must return an object castable to a
46  * flexible_type of the result_type specified.
47  *
48  * For instance, if I am going to compute a change in message value between
49  * the existing column, and a new computed column:
50  *
51  * \code
52  * const size_t msg_idx = g.get_edge_field_id("message");
53  *
54  * // compute the change in message
55  * auto delta = sgraph_compute::edge_apply(
56  * g,
57  * new_message, // a vector<shared_ptr<sarray<flexible_type>>>
58  * flex_type_enum::FLOAT,
59  * [&](const std::vector<flexible_type>& edata, const flexible_type& y) {
60  * return std::abs((double)(edata[msg_idx]) - (double)(y));
61  * });
62  * \endcode
63  *
64  * Note that if the apply is only going to access one column, the alternative
65  * overload will be more efficient.
66  */
67 template <typename Fn, typename T>
68 std::vector<std::shared_ptr<sarray<flexible_type>>>
70  std::vector<std::shared_ptr<sarray<T>>> & other,
71  flex_type_enum result_type,
72  Fn fn) {
73  size_t len = g.get_num_partitions() * g.get_num_partitions();
74  ASSERT_EQ(len, other.size());
75  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
76  // get all the edge partitions.
77  const std::vector<sframe>& edata = g.edge_group();
78  parallel_for((size_t)(0), len, [&](size_t i) {
79  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
80  ret_partition->open_for_write(1);
81  ret_partition->set_type(result_type);
82  binary_transform(edata[i], *other[i], *ret_partition, fn);
83  ret_partition->close();
84  ret[i] = ret_partition;
85  });
86  return ret;
87 }
88 
89 /**
90  * Performs a map operation on graph edge_data.
91  * Abstractly performs the following computation:
92  * \code
93  * for each edge i:
94  * out[i] = fn(edge_data[i])
95  * \endcode
96  * out must be of the result_type specified.
97  *
98  * The function must take as the only argument, a vector<flexible_type>.
99  * and must return an object castable to a flexible_type of the result_type
100  * specified.
101  *
102  * For instance, if I am going to compute a "normalized" sampling vector.
103  *
104  * \code
105  * auto normalized = sgraph_compute::edge_apply(
106  * g,
107  * flex_type_enum::FLOAT,
108  * [&](std::vector<flexible_type>& edata) {
109  * double sum = 0.0;
110  * for (const auto& v : edata) sum += v;
111  * for (auto& v : edata): v /= sum;
112  * return edata;
113  * });
114  * \endcode
115  *
116  * Note that if the apply is only going to access one column, the alternative
117  * overload will be more efficient.
118  */
119 template <typename Fn>
120 std::vector<std::shared_ptr<sarray<flexible_type>>>
122  flex_type_enum result_type,
123  Fn fn) {
124  size_t len = g.get_num_partitions() * g.get_num_partitions();
125  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
126  // get all the edge partitions.
127  const std::vector<sframe>& edata = g.edge_group();
128  parallel_for((size_t)(0), len, [&](size_t i) {
129  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
130  ret_partition->open_for_write(1);
131  ret_partition->set_type(result_type);
132  transform(edata[i], *ret_partition, fn);
133  ret_partition->close();
134  ret[i] = ret_partition;
135  });
136  return ret;
137 }
138 
139 
140 /**
141  * Performs a map operation combining one external array (other) with one
142  * column of the graph edge data. other must be the same length as the edge data.
143  * Abstractly performs the following computation:
144  * \code
145  * for each edge i:
146  * out[i] = fn(edge_data[column_name][i], other[i])
147  * \endcode
148  * out must be of the result_type specified.
149  *
150  * The function must take as the first argument, a flexible_type
151  * and the second argument, T, and must return an object castable to a
152  * flexible_type of the result_type specified.
153  *
154  * For instance, if I am going to compute a change in message value between
155  * the existing column, and a new computed column:
156  *
157  * \code
158  * // compute the change in message
159  * auto delta = sgraph_compute::edge_apply(
160  * g,
161  * "message",
162  * new_message, // a vector<shared_ptr<sarray<flexible_type>>>
163  * flex_type_enum::FLOAT,
164  * [&](const flexible_type& edata, const flexible_type& y) {
165  * return std::abs((double)(edata) - (double)(y));
166  * });
167  * \endcode
168  */
169 template <typename Fn, typename T>
170 std::vector<std::shared_ptr<sarray<flexible_type>>>
172  std::string column_name,
173  std::vector<std::shared_ptr<sarray<T>>> & other,
174  flex_type_enum result_type,
175  Fn fn) {
176  size_t len = g.get_num_partitions() * g.get_num_partitions();
177  ASSERT_EQ(other.size(), len);
178  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
179  // get all the edge partitions.
180  const std::vector<sframe>& edata = g.edge_group();
181  parallel_for((size_t)(0), len, [&](size_t i) {
182  std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
183  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
184  ret_partition->open_for_write(1);
185  ret_partition->set_type(result_type);
186  binary_transform(*graph_field, *other[i], *ret_partition, fn);
187  ret_partition->close();
188  ret[i] = ret_partition;
189  });
190  return ret;
191 }
192 
193 /**
194  * Performs a map operation on one column of the graph edge_data.
195  * Abstractly performs the following computation:
196  * \code
197  * for each edge i:
198  * out[i] = fn(edge_data[column_name][i])
199  * \endcode
200  * out must be of the result_type specified.
201  *
202  * The function must take as the only argument, a flexible_type.
203  * and must return an object castable to a flexible_type of the result_type
204  * specified.
205  *
206  * For instance, if I am going to compute the log of the message column.
207  *
208  * \code
209  * auto normalized = sgraph_compute::edge_apply(
210  * g,
211  * "message",
212  * flex_type_enum::FLOAT,
213  * [&](const flexible_type& y) {
214  * return log(y);
215  * });
216  * \endcode
217  */
218 template <typename Fn>
219 std::vector<std::shared_ptr<sarray<flexible_type>>>
221  std::string column_name,
222  flex_type_enum result_type,
223  Fn fn) {
224  size_t len = g.get_num_partitions()*g.get_num_partitions();
225  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
226  // get all the edge partitions.
227  const std::vector<sframe>& edata = g.edge_group();
228  parallel_for((size_t)(0), len, [&](size_t i) {
229  std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
230  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
231  ret_partition->open_for_write(1);
232  ret_partition->set_type(result_type);
233  transform(*graph_field, *ret_partition, fn);
234  ret_partition->close();
235  ret[i] = ret_partition;
236  });
237  return ret;
238 }
239 
240 
241 /**
242  * Performs a reduction over the graph data. If you are only reducing
243  * over one column, see the alternative overload.
244  *
245  * The edge data is partitioned into small chunks. Within each chunk,
246  * the reducer function is called on every element using init as the initial
247  * value. This accomplishes a collection of partial reductions.
248  * Finally, the combine function is used to merge all the partial reductions
249  * which is then returned.
250  *
251  * Abstractly performs the following computation:
252  * \code
253  * total_reduction = init
254  * for each partition:
255  * partial_reduction[partition] = init
256  * for each edge i in partition:
257  * reducer(edge_data[i], partial_reduction[partition])
258  * combiner(partial_reduction[partition], total_reduction)
259  * return total_reduction
260  * \endcode
261  *
262  * Example. Here were compute the sum of the triangle_count field of every edge.
263  * \code
264  * const size_t triangle_idx = g.get_edge_field_id("triangle_counts");
265  * total_triangles =
266  * sgraph_compute::reduce<double>(g,
267  * [](const std::vector<flexible_type>& edata, double& acc) {
268  * // ran on each edge data
269  * acc += (flex_int)edata[triangle_idx];
270  * },
271  * [](const double& v, double& acc) {
272  * // partial combiner.
273  * acc += v;
274  * });
275  * \endcode
276  *
277  * Note that if the apply is only going to access one column, the alternative
278  * overload will be more efficient.
279  */
280 template <typename ResultType, typename Reducer, typename Combiner>
281 typename std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type
282 /*ResultType*/ edge_reduce(sgraph& g,
283  Reducer fn,
284  Combiner combine,
285  ResultType init = ResultType()) {
286  const std::vector<sframe>& edata = g.edge_group();
287  size_t len = g.get_num_partitions() * g.get_num_partitions();
288  mutex lock;
289  ResultType ret = init;
290  parallel_for((size_t)(0), len, [&](size_t i) {
291  std::vector<ResultType> result =
292  turi::reduce(edata[i],
293  [&](const std::vector<flexible_type>& left, ResultType& right) {
294  fn(left, right);
295  return true;
296  }, init);
297 
298  std::unique_lock<mutex> result_lock(lock);
299  for (ResultType& res: result) {
300  combine(res, ret);
301  }
302  });
303  return ret;
304 }
305 
306 
307 /**
308  * Performs a reduction over a single column of the graph data.
309  *
310  * The edge data is partitioned into small chunks. Within each chunk,
311  * the reducer function is called on every element using init as the initial
312  * value. This accomplishes a collection of partial reductions.
313  * Finally, the combine function is used to merge all the partial reductions
314  * which is then returned.
315  *
316  *
317  * Abstractly performs the following computation:
318  * \code
319  * total_reduction = init
320  * for each partition:
321  * partial_reduction[partition] = init
322  * for each edge i in partition:
323  * reducer(edge_data[columnname][i], partial_reduction[partition])
324  * combiner(partial_reduction[partition], total_reduction)
325  * return total_reduction
326  * \endcode
327  *
328  * Example. Here were compute the sum of the triangle field of every edge.
329  * \code
330  * total_triangles =
331  * sgraph_compute::reduce<double>(g,
332  * "triangle",
333  * [](const flexible_type& tr, double& acc) {
334  * // ran on each edge data
335  * acc += (flex_int)tr;
336  * },
337  * [](const double& v, double& acc) {
338  * // partial combiner.
339  * acc += v;
340  * });
341  * \endcode
342  */
343 template <typename ResultType, typename Reducer, typename Combiner>
344 ResultType edge_reduce(sgraph& g,
345  std::string column_name,
346  Reducer fn,
347  Combiner combine,
348  ResultType init = ResultType()) {
349  const std::vector<sframe>& edata = g.edge_group();
350  size_t len = g.get_num_partitions() * g.get_num_partitions();
351  mutex lock;
352  ResultType ret = init;
353  parallel_for((size_t)(0), len, [&](size_t i) {
354  std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
355  std::vector<ResultType> result =
356  turi::reduce(*graph_field,
357  [&](const flexible_type& left, ResultType& right) {
358  fn(left, right);
359  return true;
360  }, init);
361  std::unique_lock<mutex> result_lock(lock);
362  for (ResultType& res: result) {
363  combine(res, ret);
364  }
365  });
366  return ret;
367 }
368 
369 } // end of sgraph
370 
371 /// \}
372 } // end of turicreate
373 #endif
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
std::vector< ResultType > reduce(S &&input, FunctionType f, ResultType init=ResultType())
Definition: algorithm.hpp:491
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply(sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
size_t get_num_partitions() const
Definition: sgraph.hpp:492
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type edge_reduce(sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
std::vector< sframe > & edge_group(size_t groupa=0, size_t groupb=0)
Definition: sgraph.hpp:301
void binary_transform(S1 &&input1, S2 &&input2, T &&output, TransformFn transformfn)
Definition: algorithm.hpp:543
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())
Definition: algorithm.hpp:64