Turi Create  4.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
sgraph_engine.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_SGRAPH_sgraph_engine_HPP
7 #define TURI_SGRAPH_SGRAPH_sgraph_engine_HPP
8 
9 #include <vector>
10 #include <tuple>
11 #include <type_traits>
12 #include <core/data/flexible_type/flexible_type.hpp>
13 #include <functional>
14 #include <core/storage/sframe_data/sarray.hpp>
15 #include <core/storage/sgraph_data/sgraph.hpp>
16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp>
17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp>
18 #include <core/util/cityhash_tc.hpp>
19 
20 namespace turi {
21 
22 /**
23  * \ingroup sgraph_physical
24  * \addtogroup sgraph_compute SGraph Compute
25  * \{
26  */
27 
28 /**
29  * Graph Computation Functions
30  */
31 namespace sgraph_compute {
32 
33 /**
34  * PowerGraph computation.
35  *
36  * Two central graph computation operations are provided by this class:
37  * - Vertex Gather
38  * - Parallel For Edges
39  *
40  * ### Vertex Gather ###
41  * Given a graph, this class computes for each vertex, a generalized "sum" over
42  * neighborhood of each vertex. You need to define a function which is then
43  * given
44  * - the data on the central vertex,
45  * - the data on the edge,
46  * - the data on the other vertex
47  * - The direction of the edge.
48  *
49  * The function then performs some computation and aggregates the result
50  * into a combiner.
51  *
52  * Abstractly:
53  * \code
54  * for v \in vertices:
55  * output[v] = initial_value
56  *
57  * for v \in vertices:
58  * for u \in neighbor of v:
59  * gather(data[v], data[v,u], data[u], edge_direction_of_v_u, output[v])
60  *
61  * return output
62  * \endcode
63  *
64  * At completion, a vector of shared_ptr to sarrays will be returned. Each sarray
65  * corresponds to one vertex partition, and each sarray row corresponds to the
66  * result of summing across the neighborhood of the vertex.
67  *
68  * This is easiest to explain with an example. Pagerank for instance:
69  * \code
70  * const size_t degree_idx = g.get_vertex_field_id("out_degree");
71  * const size_t pr_idx = g.get_vertex_field_id("pagerank");
72  *
73  * // declare a sgraph_engine computation object of a particular return type.
74  * sgraph_compute::sgraph_engine<flexible_type> ga;
75  *
76  * ret = ga.gather(g,
77  * [=](const graph_data_type& center,
78  * const graph_data_type& edge,
79  * const graph_data_type& other,
80  * edge_direction edgedir,
81  * flexible_type& combiner) {
82  * combiner = combiner + (1-random_jump_prob) * (other[pr_idx] / other[degree_idx]);
83  * },
84  * flexible_type(random_jump_prob), // initial value
85  * edge_direction::IN_EDGE); // edges to sum over
86  * \endcode
87  *
88  *
89  * ### Parallel For Edges###
90  *
91  * The parallel_for_edges() performs the following operations on the graph:
92  *
93  * Abstractly:
94  * Given a function edge_map: (source_data, edge_data, target_data) -> T,
95  * \code
96  * for e \in edges:
97  * output[e] = edge_map(e.source.data(), e.data(), e.target.data())
98  *
99  * \endcode
100  *
101  * At completion it returns a vector of shared_ptr to sarrays.
102  * Each sarray corresponds to one edge partition, and each edge row corresponds
103  * to the result of the map operation over the edge.
104  *
105  * \note
106  * Refactoring is required in the furture for this class.
107  * Essentially, this class should provide the mechanism to iterate over edges
108  * with vertex data properly loaded. Then the user facing "gather" and "parallel for edges"
109  * will be simple functions that uses this class.
110  */
111 template <typename T>
113  public:
114  typedef sgraph::vertex_partition_address vertex_partition_address;
115  typedef sgraph::edge_partition_address edge_partition_address;
116  typedef sgraph::edge_direction edge_direction;
117  typedef std::vector<flexible_type> graph_data_type;
118 
119  sgraph_engine() { }
120 
121  /**************************************************************************/
122  /* */
123  /* Gather */
124  /* */
125  /**************************************************************************/
126  using const_gather_function_type = std::function<void(const graph_data_type& center,
127  const graph_data_type& edge,
128  const graph_data_type& other,
129  edge_direction edgedir,
130  T& combiner)>;
131  std::vector<std::shared_ptr<sarray<T>>> gather(sgraph& graph,
132  const_gather_function_type gather,
133  const T& initial_value,
134  edge_direction edgedir = edge_direction::ANY_EDGE,
135  size_t central_group = 0,
136  std::unordered_set<size_t> sgraph_compute_group = {0},
137  size_t parallel_limit = (size_t)(-1)) {
138  if (parallel_limit == static_cast<size_t>(-1)) {
139  parallel_limit = thread::cpu_count();
140  }
141  init_data_structures(graph, central_group, initial_value);
143  graph.get_num_partitions(),
144  // The preamble to each parallel for block.
145  // Thisis the collection of edges that will be executed in the next pass.
146  [&](std::vector<std::pair<size_t, size_t> > edgeparts) {
147  std::set<vertex_partition_address> vertex_partitions;
148  std::set<size_t> combine_partitions;
149  // for each partition requested, figure out exactly
150  // which partition / group I need to load
151  // That does depend on the edge direction I am
152  // executing.
153  for(auto edgepart: edgeparts) {
154  logstream(LOG_INFO) << "Planning Execution on Edge Partition: "
155  << edgepart.first << " " << edgepart.second << std::endl;
156  for(size_t gather_vgroup: sgraph_compute_group) {
157  if (edgedir == edge_direction::ANY_EDGE ||
158  edgedir == edge_direction::IN_EDGE) {
159  // this is the edge partition I will read when I have to run
160  // this edge set. this is IN-edges. So src group is
161  // the gather_vgroup and dst group is the central group.
162  // partition is as defined by edgepart
163  edge_partition_address address(gather_vgroup, central_group,
164  edgepart.first, edgepart.second);
165  combine_partitions.insert(address.get_dst_vertex_partition().partition);
166  vertex_partitions.insert(address.get_src_vertex_partition());
167  vertex_partitions.insert(address.get_dst_vertex_partition());
168  }
169  if (edgedir == edge_direction::ANY_EDGE ||
170  edgedir == edge_direction::OUT_EDGE) {
171  // this is the edge partition I will read when I have to run
172  // this edge set. this is OUT-edges. So dst group is
173  // the gather_vgroup and src group is the central group.
174  // partition is as defined by edgepart
175  edge_partition_address address(central_group, gather_vgroup,
176  edgepart.first, edgepart.second);
177  combine_partitions.insert(address.get_src_vertex_partition().partition);
178  vertex_partitions.insert(address.get_src_vertex_partition());
179  vertex_partitions.insert(address.get_dst_vertex_partition());
180  }
181  }
182  }
183  // request loading of all the vertex partitions I need
184  load_graph_vertex_blocks(graph, vertex_partitions);
185  load_combine_blocks(combine_partitions);
186  },
187  // This is the actual parallel for, and this is the block I am to
188  // be executing
189  [&](std::pair<size_t, size_t> edgepart) {
190  // at this stage we are in parallel. Also we we guaranteed to have all
191  // the required vertices in memory. So we just need to sweep the edge set.
192  // The trick is that edge partition (i,j) of group (a,b) only holds the
193  // edges going from group a to group b. So depending on the requested
194  // edge direction, we have to a little careful about which edge sets
195  // we are actually loading.
196  for(size_t gather_vgroup: sgraph_compute_group) {
197  edge_partition_address address;
198  // TODO: revisit the code when we actually have vertex groups
199  address = edge_partition_address(gather_vgroup, central_group,
200  edgepart.first, edgepart.second);
201  sframe& edgeframe = graph.edge_partition(address);
202  compute_const_gather(edgeframe, address, central_group, edgedir, gather);
203  }
204  });
205  // flush the combine blocks
206  load_combine_blocks(std::set<size_t>());
207  return combine_sarrays;
208  }
209 
210 
211  /**************************************************************************/
212  /* */
213  /* Parallel For Edges */
214  /* */
215  /**************************************************************************/
216 
217  using const_edge_map_function_type = std::function<T(const graph_data_type& source,
218  graph_data_type& edge,
219  const graph_data_type& target)>;
220 
221  std::vector<std::shared_ptr<sarray<T>>> parallel_for_edges(sgraph& graph,
222  const_edge_map_function_type map_fn,
223  flex_type_enum ret_type,
224  size_t groupa = 0, size_t groupb = 0,
225  size_t parallel_limit = (size_t)(-1)) {
226  if (parallel_limit == static_cast<size_t>(-1)) {
227  parallel_limit = thread::cpu_count();
228  }
229  vertex_data.clear();
230  vertex_data.resize(graph.get_num_groups());
231  for(auto& v: vertex_data) v.resize(graph.get_num_partitions());
232 
233  size_t return_size = graph.get_num_partitions() * graph.get_num_partitions();
234  std::vector<std::shared_ptr<sarray<T>>> return_edge_value(return_size);
236  graph.get_num_partitions(),
237  // The preamble to each parallel for block.
238  // Thisis the collection of edges that will be executed in the next pass.
239  [&](std::vector<std::pair<size_t, size_t> > edgeparts) {
240  std::set<vertex_partition_address> vertex_partitions;
241  std::set<size_t> combine_partitions;
242  // for each partition requested, figure out exactly
243  // which partition / group I need to load
244  // That does depend on the edge direction I am
245  // executing.
246  for(auto edgepart: edgeparts) {
247  logstream(LOG_INFO) << "Planning Execution on Edge Partition: "
248  << edgepart.first << " " << edgepart.second << std::endl;
249  // this is the edge partition I will read when I have to run
250  // this edge set. this is IN-edges. So src group is
251  // the gather_vgroup and dst group is the central group.
252  // partition is as defined by edgepart
253  edge_partition_address address(groupa, groupb, edgepart.first, edgepart.second);
254  combine_partitions.insert(address.get_dst_vertex_partition().partition);
255  vertex_partitions.insert(address.get_src_vertex_partition());
256  vertex_partitions.insert(address.get_dst_vertex_partition());
257  }
258  // request loading of all the vertex partitions I need
259  load_graph_vertex_blocks(graph, vertex_partitions);
260  },
261  // This is the actual parallel for, and this is the block I am to
262  // be executing
263  [&](std::pair<size_t, size_t> edgepart) {
264  // at this stage we are in parallel. Also we we guaranteed to have all
265  // the required vertices in memory. So we just need to sweep the edge set.
266  // The trick is that edge partition (i,j) of group (a,b) only holds the
267  // edges going from group a to group b. So depending on the requested
268  // edge direction, we have to a little careful about which edge sets
269  // we are actually loading.
270  edge_partition_address address(groupa, groupb,
271  edgepart.first, edgepart.second);
272  sframe& edgeframe = graph.edge_partition(address);
273  size_t partid = edgepart.first * graph.get_num_partitions() + edgepart.second;
274  return_edge_value[partid] = compute_edge_map(edgeframe, address, map_fn, ret_type);
275  });
276  return return_edge_value;
277  }
278 
279  private:
280  // vertex_data[group][partition]
281  std::vector<std::vector<vertex_block<sframe> > > vertex_data;
282  // combine_data[partition]
283  std::vector<vertex_block<sarray<T> > > combine_data;
284  std::vector<std::shared_ptr<sarray<T> > > combine_sarrays;
285  static constexpr size_t LOCK_ARRAY_SIZE = 1024;
286  turi::mutex lock_array[LOCK_ARRAY_SIZE];
288 
289  template <typename S>
290  typename std::enable_if<std::is_same<S, flexible_type>::value>::type
291  set_return_type(const S& t) {
292  m_return_type = t.get_type();
293  }
294 
295  template <typename S>
296  typename std::enable_if<!std::is_same<S, flexible_type>::value>::type
297  set_return_type(const S& t) { }
298 
299  /**
300  * Initializes the temporary data structures, and the accumulation sarrays
301  * we need to do the computation.
302  */
303  void init_data_structures(const sgraph& graph,
304  size_t sgraph_compute_group,
305  const T& initial_value) {
306  vertex_data.clear();
307  combine_data.clear();
308  combine_sarrays.clear();
309  set_return_type(initial_value);
310 
311  /*
312  * A *very* sparsely populated buffer of vertex data.
313  * vertex_data[group][partition][row]
314  */
315  vertex_data.resize(graph.get_num_groups());
316  for(auto& v: vertex_data) v.resize(graph.get_num_partitions());
317  // create the gather data
318  combine_sarrays.resize(graph.get_num_partitions());
319  combine_data.resize(graph.get_num_partitions());
320  // shape up the combine_sarrays.
321  // Create gather SArrays of the correct size
322  parallel_for(size_t(0),
323  graph.get_num_partitions(),
324  [&](size_t i) {
325  const sframe& frame = graph.vertex_partition(i, sgraph_compute_group);
326  size_t rows = frame.num_rows();
327  combine_sarrays[i].reset(new sarray<T>());
328  combine_sarrays[i]->open_for_write(1);
329  auto iter = combine_sarrays[i]->get_output_iterator(0);
330  while(rows--) {
331  (*iter) = initial_value;
332  ++iter;
333  }
334  combine_sarrays[i]->close();
335  });
336 
337  }
338 
339  /**
340  * This function will unload all graph vertex blocks which are not in the set
341  * vertex_address and load all vertex blocks which are in the set.
342  */
343  void load_graph_vertex_blocks(sgraph& graph,
344  const std::set<vertex_partition_address>& vertex_address) {
345  // look for all loaded blocks and if they are not in the vertex_address set, unload it
346  for (size_t group = 0; group < vertex_data.size(); ++group) {
347  for (size_t partition = 0; partition < vertex_data[group].size(); ++partition) {
348  if (vertex_data[group][partition].is_loaded() &&
349  vertex_address.count({group, partition}) == 0) {
350  vertex_data[group][partition].unload();
351  }
352  }
353  }
354  std::vector<vertex_partition_address> vertex_address_vec;
355  std::copy(vertex_address.begin(), vertex_address.end(),
356  std::inserter(vertex_address_vec, vertex_address_vec.end()));
357 
358  // now, for each element in vertex_address, if it is not loaded, load it
359  parallel_for(vertex_address_vec.begin(),
360  vertex_address_vec.end(),
361  // parallel for callback. called with each vertex address I need
362  // to load
363  [&](vertex_partition_address& part) {
364  // get the frame for the vertex partition
365  const sframe& frame = graph.vertex_partition(part.partition, part.group);
366  // load it into the vertex data.
367  logstream(LOG_INFO) << "Loading Vertex Partition: "
368  << part.group << " " << part.partition << std::endl;
369  vertex_data[part.group][part.partition].load_if_not_loaded(frame);
370  });
371  }
372 
373 
374  /**
375  * This function will unload all gather blocks not in the set partitions,
376  * writing them back out to the sarray if modified. It will then load
377  * all the gather blocks that are in the set.
378  */
379  void load_combine_blocks(const std::set<size_t> partitions) {
380  for (size_t partition = 0; partition < combine_data.size(); ++partition) {
381  if (combine_data[partition].is_loaded() && partitions.count(partition) == 0) {
382  // reset the existing sarray and save the gather data to it.
383  combine_sarrays[partition].reset(new sarray<T>());
384  combine_sarrays[partition]->open_for_write(1);
385  if (typeid(T) == typeid(flexible_type)) {
386  combine_sarrays[partition]->set_type(m_return_type);
387  }
388  combine_data[partition].flush(*combine_sarrays[partition]);
389  // we need to do a save here.
390  combine_data[partition].unload();
391  }
392  }
393 
394  std::vector<size_t> partitions_vec;
395  std::copy(partitions.begin(), partitions.end(),
396  std::inserter(partitions_vec, partitions_vec.end()));
397 
398  // now, for each element in partitions, if it is not loaded, load it
399  parallel_for(partitions_vec.begin(),
400  partitions_vec.end(),
401  [&](size_t part) {
402  logstream(LOG_INFO) << "Loading Combine Partition: " << part << std::endl;
403  combine_data[part].load_if_not_loaded(*combine_sarrays[part]);
404  });
405  }
406 
407  void compute_const_gather(sframe& edgeframe,
408  edge_partition_address address,
409  size_t central_group,
410  edge_direction edgedir,
411  const_gather_function_type& gather) {
412  auto reader = edgeframe.get_reader();
413  size_t row_start = 0;
414  size_t row_end = reader->num_rows();
415  size_t srcid_column = edgeframe.column_index(sgraph::SRC_COLUMN_NAME);
416  size_t dstid_column = edgeframe.column_index(sgraph::DST_COLUMN_NAME);
417 
418  vertex_partition_address src_address = address.get_src_vertex_partition();
419  vertex_partition_address dst_address = address.get_dst_vertex_partition();
420  while (row_start < row_end) {
421  size_t nrows = std::min<size_t>(1024, row_end - row_start);
422  std::vector<std::vector<flexible_type> > all_edgedata;
423  reader->read_rows(row_start, row_start + nrows, all_edgedata);
424  for (const auto& edgedata : all_edgedata) {
425  // ok. edges here go from address.src_group to address.dst_group
426  // we are gathering into target_central_group
427  // it is an in edge if the dst group is the target vertex group
428  size_t srcid = edgedata[srcid_column];
429  size_t dstid = edgedata[dstid_column];
430 
431  if (edgedir == edge_direction::IN_EDGE ||
432  edgedir == edge_direction::ANY_EDGE) {
433  DASSERT_EQ(address.dst_group, central_group);
434  // acquire lock on the combine target
435  size_t vertexhash = hash64_combine(hash64(dst_address.partition), hash64(dstid));
436  std::unique_lock<turi::mutex> guard(lock_array[vertexhash % LOCK_ARRAY_SIZE]);
437  // perform the gather
438  // recall vertex_data[group][partition][row]
439  gather(vertex_data[dst_address.group][dst_address.partition][dstid],
440  edgedata,
441  vertex_data[src_address.group][src_address.partition][srcid],
442  edge_direction::IN_EDGE,
443  combine_data[dst_address.partition][dstid]);
444  }
445  if (edgedir == edge_direction::OUT_EDGE || edgedir == edge_direction::ANY_EDGE) {
446  DASSERT_EQ(address.src_group, central_group);
447  // acquire lock on the combine target
448  size_t vertexhash = hash64_combine(hash64(src_address.partition), hash64(srcid));
449  std::unique_lock<turi::mutex> guard(lock_array[vertexhash % LOCK_ARRAY_SIZE]);
450  // perform the gather
451  // recall vertex_data[group][partition][row]
452  gather(vertex_data[src_address.group][src_address.partition][srcid],
453  edgedata,
454  vertex_data[dst_address.group][dst_address.partition][dstid],
455  edge_direction::OUT_EDGE,
456  combine_data[src_address.partition][srcid]);
457  }
458  }
459  row_start += nrows;
460  }
461  }
462 
463  std::shared_ptr<sarray<T>> compute_edge_map(sframe& edgeframe,
464  edge_partition_address address,
465  const_edge_map_function_type map_fn,
466  flex_type_enum ret_type) {
467  std::shared_ptr<sarray<T>> ret = std::make_shared<sarray<T>>();
468  ret->open_for_write(1);
469  ret->set_type(ret_type);
470  auto out = ret->get_output_iterator(0);
471 
472  auto reader = edgeframe.get_reader();
473  size_t row_start = 0;
474  size_t row_end = reader->num_rows();
475  size_t srcid_column = edgeframe.column_index(sgraph::SRC_COLUMN_NAME);
476  size_t dstid_column = edgeframe.column_index(sgraph::DST_COLUMN_NAME);
477 
478  vertex_partition_address src_address = address.get_src_vertex_partition();
479  vertex_partition_address dst_address = address.get_dst_vertex_partition();
480  while (row_start < row_end) {
481  size_t nrows = std::min<size_t>(1024, row_end - row_start);
482  std::vector<std::vector<flexible_type> > all_edgedata;
483  reader->read_rows(row_start, row_start + nrows, all_edgedata);
484  for (auto& edgedata : all_edgedata) {
485  size_t srcid = edgedata[srcid_column];
486  size_t dstid = edgedata[dstid_column];
487  *out = map_fn(vertex_data[src_address.group][src_address.partition][srcid],
488  edgedata,
489  vertex_data[dst_address.group][dst_address.partition][dstid]);
490  ++out;
491  }
492  row_start += nrows;
493  }
494  ret->close();
495  return ret;
496  }
497 }; // end sgraph_engine
498 
499 } // end sgraph_compute
500 
501 /// \}
502 } // end turicreate
503 #endif
#define logstream(lvl)
Definition: logger.hpp:276
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
sframe group(sframe sframe_in, std::string key_column)
size_t column_index(const std::string &column_name) const
Definition: sframe.hpp:457
static uint64_t hash64_combine(uint64_t h1, uint64_t h2)
#define LOG_INFO
Definition: logger.hpp:101
size_t get_num_groups() const
Definition: sgraph.hpp:499
size_t num_rows() const
Returns the length of each sarray.
Definition: sframe.hpp:346
static size_t cpu_count()
size_t get_num_partitions() const
Definition: sgraph.hpp:492
static uint64_t hash64(const char *s, size_t len)
void copy(const std::string src, const std::string dest)
sframe & edge_partition(size_t partition1, size_t partition2, size_t groupa=0, size_t groupb=0)
Definition: sgraph.hpp:381
std::unique_ptr< reader_type > get_reader() const
void hilbert_blocked_parallel_for(size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn, size_t parallel_limit=SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS)
sframe & vertex_partition(size_t partition, size_t groupid=0)
Definition: sgraph.hpp:330