Turi Create  4.0
sgraph_vertex_apply.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_SGRAPH_VERTEX_APPLY_HPP
7 #define TURI_SGRAPH_SGRAPH_VERTEX_APPLY_HPP
8 
9 #include <vector>
10 #include <tuple>
11 #include <type_traits>
12 #include <core/data/flexible_type/flexible_type.hpp>
13 #include <functional>
14 #include <core/storage/sframe_data/sarray.hpp>
15 #include <core/storage/sgraph_data/sgraph.hpp>
16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp>
17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp>
18 #include <core/util/cityhash_tc.hpp>
19 
20 namespace turi {
21 
22 
23 /**
24  * \ingroup sgraph_physical
25  * \addtogroup sgraph_compute SGraph Compute
26  * \{
27  */
28 
29 /**
30  * Graph Computation Functions
31  */
32 namespace sgraph_compute {
33 
34 /**
35  * Performs a map operation combining one external array (other) with the
36  * graph vertex data. other must be the same length as the vertex data.
37  * Abstractly performs the following computation:
38  * \code
39  * for each vertex i:
40  * out[i] = fn(vertex_data[i], other[i])
41  * \endcode
42  * out must be of the result_type specified.
43  *
44  * The function must take as the first argument, a vector<flexible_type>
45  * and the second argument, T, and must return an object castable to a
46  * flexible_type of the result_type specified.
47  *
48  * For instance, if I am going to compute a change in pagerank value between
49  * the existing pagerank column, and a new computed column:
50  *
51  * \code
52  * const size_t pr_idx = g.get_vertex_field_id("pagerank");
53  *
54  * // compute the change in pagerank
55  * auto delta = sgraph_compute::vertex_apply(
56  * g,
57  * new_pagerank, // a vector<shared_ptr<sarray<flexible_type>>>
58  * flex_type_enum::FLOAT,
59  * [&](const std::vector<flexible_type>& vdata, const flexible_type& y) {
60  * return std::abs((double)(vdata[pr_idx]) - (double)(y));
61  * });
62  * \endcode
63  *
64  * Note that if the apply is only going to access one column, the alternative
65  * overload will be more efficient.
66  */
67 template <typename Fn, typename T>
68 std::vector<std::shared_ptr<sarray<flexible_type>>>
70  std::vector<std::shared_ptr<sarray<T>>> & other,
71  flex_type_enum result_type,
72  Fn fn) {
73  ASSERT_EQ(g.get_num_partitions(), other.size());
74  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(g.get_num_partitions());
75  // get all the vertex partitions.
76  const std::vector<sframe>& vdata = g.vertex_group();
77  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
78  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
79  ret_partition->open_for_write(1);
80  ret_partition->set_type(result_type);
81  binary_transform(vdata[i], *other[i], *ret_partition, fn);
82  ret_partition->close();
83  ret[i] = ret_partition;
84  });
85  return ret;
86 }
87 
88 /**
89  * Performs a map operation on graph vertex_data.
90  * Abstractly performs the following computation:
91  * \code
92  * for each vertex i:
93  * out[i] = fn(vertex_data[i])
94  * \endcode
95  * out must be of the result_type specified.
96  *
97  * The function must take as the only argument, a vector<flexible_type>.
98  * and must return an object castable to a flexible_type of the result_type
99  * specified.
100  *
101  * For instance, if I am going to compute a "normalized" pagerank.
102  *
103  * \code
104  * double pagerank_sum = [...from reduce function below...]
105  *
106  * const size_t pr_idx = g.get_vertex_field_id("pagerank");
107  *
108  * auto normalized = sgraph_compute::vertex_apply(
109  * g,
110  * flex_type_enum::FLOAT,
111  * [&](const std::vector<flexible_type>& vdata) {
112  * return vdata[pr_idx] / pagerank_sum;
113  * });
114  * \endcode
115  *
116  * Note that if the apply is only going to access one column, the alternative
117  * overload will be more efficient.
118  */
119 template <typename Fn>
120 std::vector<std::shared_ptr<sarray<flexible_type>>>
122  flex_type_enum result_type,
123  Fn fn) {
124  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(g.get_num_partitions());
125  // get all the vertex partitions.
126  const std::vector<sframe>& vdata = g.vertex_group();
127  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
128  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
129  ret_partition->open_for_write(1);
130  ret_partition->set_type(result_type);
131  transform(vdata[i], *ret_partition, fn);
132  ret_partition->close();
133  ret[i] = ret_partition;
134  });
135  return ret;
136 }
137 
138 
139 /**
140  * Performs a map operation combining one external array (other) with one
141  * column of the graph vertex data. other must be the same length as the vertex data.
142  * Abstractly performs the following computation:
143  * \code
144  * for each vertex i:
145  * out[i] = fn(vertex_data[column_name][i], other[i])
146  * \endcode
147  * out must be of the result_type specified.
148  *
149  * The function must take as the first argument, a flexible_type
150  * and the second argument, T, and must return an object castable to a
151  * flexible_type of the result_type specified.
152  *
153  * For instance, if I am going to compute a change in pagerank value between
154  * the existing pagerank column, and a new computed column:
155  *
156  * \code
157  * // compute the change in pagerank
158  * auto delta = sgraph_compute::vertex_apply(
159  * g,
160  * "pagerank",
161  * new_pagerank, // a vector<shared_ptr<sarray<flexible_type>>>
162  * flex_type_enum::FLOAT,
163  * [&](const flexible_type& vdata, const flexible_type& y) {
164  * return std::abs((double)(vdata) - (double)(y));
165  * });
166  * \endcode
167  */
168 template <typename Fn, typename T>
169 std::vector<std::shared_ptr<sarray<flexible_type>>>
171  std::string column_name,
172  std::vector<std::shared_ptr<sarray<T>>> & other,
173  flex_type_enum result_type,
174  Fn fn) {
175  ASSERT_EQ(g.get_num_partitions(), other.size());
176  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(g.get_num_partitions());
177  // get all the vertex partitions.
178  const std::vector<sframe>& vdata = g.vertex_group();
179  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
180  std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
181  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
182  ret_partition->open_for_write(1);
183  ret_partition->set_type(result_type);
184  binary_transform(*graph_field, *other[i], *ret_partition, fn);
185  ret_partition->close();
186  ret[i] = ret_partition;
187  });
188  return ret;
189 }
190 
191 /**
192  * Performs a map operation on one column of the graph vertex_data.
193  * Abstractly performs the following computation:
194  * \code
195  * for each vertex i:
196  * out[i] = fn(vertex_data[column_name][i])
197  * \endcode
198  * out must be of the result_type specified.
199  *
200  * The function must take as the only argument, a flexible_type.
201  * and must return an object castable to a flexible_type of the result_type
202  * specified.
203  *
204  * For instance, if I am going to compute a "normalized" pagerank.
205  *
206  * \code
207  * double pagerank_sum = [...from reduce function below...]
208  *
209  * auto normalized = sgraph_compute::vertex_apply(
210  * g,
211  * "pagerank",
212  * flex_type_enum::FLOAT,
213  * [&](const flexible_type& y) {
214  * return y / pagerank_sum;
215  * });
216  * \endcode
217  */
218 template <typename Fn>
219 std::vector<std::shared_ptr<sarray<flexible_type>>>
221  std::string column_name,
222  flex_type_enum result_type,
223  Fn fn) {
224  std::vector<std::shared_ptr<sarray<flexible_type>>> ret(g.get_num_partitions());
225  // get all the vertex partitions.
226  const std::vector<sframe>& vdata = g.vertex_group();
227  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
228  std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
229  std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
230  ret_partition->open_for_write(1);
231  ret_partition->set_type(result_type);
232  transform(*graph_field, *ret_partition, fn);
233  ret_partition->close();
234  ret[i] = ret_partition;
235  });
236  return ret;
237 }
238 
239 
240 /**
241  * Performs a reduction over the graph data. If you are only reducing
242  * over one column, see the alternative overload.
243  *
244  * The vertex data is partitioned into small chunks. Within each chunk,
245  * the reducer function is called on every element using init as the initial
246  * value. This accomplishes a collection of partial reductions.
247  * Finally, the combine function is used to merge all the partial reductions
248  * which is then returned.
249  *
250  * Abstractly performs the following computation:
251  * \code
252  * total_reduction = init
253  * for each partition:
254  * partial_reduction[partition] = init
255  * for each vertex i in partition:
256  * reducer(vertex_data[i], partial_reduction[partition])
257  * combiner(partial_reduction[partition], total_reduction)
258  * return total_reduction
259  * \endcode
260  *
261  * Example. Here were compute the sum of the pagerank field of every vertex.
262  * \code
263  * const size_t pr_idx = g.get_vertex_field_id("pagerank");
264  * total_pagerank =
265  * sgraph_compute::reduce<double>(g,
266  * [](const std::vector<flexible_type>& vdata, double& acc) {
267  * // ran on each vertex data
268  * acc += (flex_float)vdata[pr_idx];
269  * },
270  * [](const double& v, double& acc) {
271  * // partial combiner.
272  * acc += v;
273  * });
274  * \endcode
275  *
276  * Note that if the apply is only going to access one column, the alternative
277  * overload will be more efficient.
278  */
279 template <typename ResultType, typename Reducer, typename Combiner>
280 typename std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type
281 /*ResultType*/ vertex_reduce(sgraph& g,
282  Reducer fn,
283  Combiner combine,
284  ResultType init = ResultType()) {
285  const std::vector<sframe>& vdata = g.vertex_group();
286  mutex lock;
287  ResultType ret = init;
288  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
289  std::vector<ResultType> result =
290  turi::reduce(vdata[i],
291  [&](const std::vector<flexible_type>& left, ResultType& right) {
292  fn(left, right);
293  return true;
294  }, init);
295 
296  std::unique_lock<mutex> result_lock(lock);
297  for (ResultType& res: result) {
298  combine(res, ret);
299  }
300  });
301  return ret;
302 }
303 
304 
305 /**
306  * Performs a reduction over a single column of the graph data.
307  *
308  * The vertex data is partitioned into small chunks. Within each chunk,
309  * the reducer function is called on every element using init as the initial
310  * value. This accomplishes a collection of partial reductions.
311  * Finally, the combine function is used to merge all the partial reductions
312  * which is then returned.
313  *
314  *
315  * Abstractly performs the following computation:
316  * \code
317  * total_reduction = init
318  * for each partition:
319  * partial_reduction[partition] = init
320  * for each vertex i in partition:
321  * reducer(vertex_data[columnname][i], partial_reduction[partition])
322  * combiner(partial_reduction[partition], total_reduction)
323  * return total_reduction
324  * \endcode
325  *
326  * Example. Here were compute the sum of the pagerank field of every vertex.
327  * \code
328  * total_pagerank =
329  * sgraph_compute::reduce<double>(g,
330  * "pagerank",
331  * [](const flexible_type& pr, double& acc) {
332  * // ran on each vertex data
333  * acc += (flex_float)pr;
334  * },
335  * [](const double& v, double& acc) {
336  * // partial combiner.
337  * acc += v;
338  * });
339  * \endcode
340  */
341 template <typename ResultType, typename Reducer, typename Combiner>
342 ResultType vertex_reduce(sgraph& g,
343  std::string column_name,
344  Reducer fn,
345  Combiner combine,
346  ResultType init = ResultType()) {
347  const std::vector<sframe>& vdata = g.vertex_group();
348  mutex lock;
349  ResultType ret = init;
350  parallel_for((size_t)(0), (size_t)g.get_num_partitions(), [&](size_t i) {
351  std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
352  std::vector<ResultType> result =
353  turi::reduce(*graph_field,
354  [&](const flexible_type& left, ResultType& right) {
355  fn(left, right);
356  return true;
357  }, init);
358  std::unique_lock<mutex> result_lock(lock);
359  for (ResultType& res: result) {
360  combine(res, ret);
361  }
362  });
363  return ret;
364 }
365 
366 } // end of sgraph
367 
368 /// \}
369 } // end of turicreate
370 #endif
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type vertex_reduce(sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
std::vector< ResultType > reduce(S &&input, FunctionType f, ResultType init=ResultType())
Definition: algorithm.hpp:491
std::vector< sframe > & vertex_group(size_t groupid=0)
Definition: sgraph.hpp:274
size_t get_num_partitions() const
Definition: sgraph.hpp:492
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply(sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
void binary_transform(S1 &&input1, S2 &&input2, T &&output, TransformFn transformfn)
Definition: algorithm.hpp:543
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())
Definition: algorithm.hpp:64