Turi Create  4.0
sgraph_synchronize.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_SGRAPH_SYNCHRONIZE_HPP
7 #define TURI_SGRAPH_SGRAPH_SYNCHRONIZE_HPP
8 
9 #include<core/storage/sgraph_data/sgraph_synchronize_interface.hpp>
10 #include<core/logging/assertions.hpp>
11 
12 namespace turi {
13 /**
14  * \internal
15  * \ingroup sgraph_physical
16  * \addtogroup sgraph_compute_internal SGraph Compute Internal
17  * \{
18  */
19 
20 /**
21  * Graph Computation Functions
22  */
23 namespace sgraph_compute {
24 
25 /**
26  * An implementation of \ref sgraph_synchronize_interface to exchange
27  * information about an sgraph.
28  *
29  * The main application of this is for communication of graph information.
30  *
31  * \ref vertex_partition_exchange is the struct that can hold
32  * a subset data of a subset of vertices from a sgraph partition.
33  *
34  * The choice for sparse vertices packing is motivated by the
35  * "triple_apply" computation pattern:
36  * as we process each edge partition, the associated vertex partition
37  * are sparsely visited and updated.
38  *
39  * \ref sgraph_synchronize_interface is used for both ends of the communication
40  * to deal with initialization, sending and receiving the vertex and edge exchange data.
41  *
42  * Example:
43  * \code
44  *
45  * //// Worker side ////
46  * sgraph_synchronize_interface* worker_graph_sync; // a client side implementation
47  *
48  * // initialize vertex data in partition 0, using all_vertices sent from server.
49  * worker_graph_sync.load_vertex_partition(0, all_vertices_from_server);
50  *
51  * // recevie a vertex_exchange from server, let's update the local vertices
52  * worker_graph_sync->update_vertex_partition(vexchange_from_server);
53  *
54  * // do some work to update vertex data in partition 0, and obtain a set of changed vertex data..
55  * vertex_exchange updated_vertices = worker_graph_sync->get_vertex_partition_exchange(0);
56  *
57  * // now we can send updated_vertices to the server.
58  *
59  * //// Server side ////
60  * sgraph_synchronize_interface* server_graph_sync; // a server side implementation
61  *
62  * // call to worker to initialize vertex partition.
63  *
64  * // recevie a vertex_exchange server, let's update the local vertices
65  * server_graph_sync->update_vertex_partition(vexchange_from_client);
66  *
67  * // do some work to update vertex data in partition 0, and obtain a set of changed vertex data..
68  * vertex_exchange updated_vertices = server_graph_sync->get_vertex_partition_exchange(0);
69  *
70  * // now we can send updated_vertices to the worker.
71  *
72  * \endcode
73  *
74  * \note This really is implementation detail and is used to allow graph
75  * computation methods that are implemented in Python
76  */
78  public:
79 
80  sgraph_synchronize() { }
81 
82  sgraph_synchronize(size_t num_partitions) {
83  init(num_partitions);
84  }
85 
86  void init(size_t num_partitions) {
87  m_vertex_partitions.clear();
88  m_is_partition_loaded.clear();
89 
90  m_num_partitions = num_partitions;
91  m_vertex_partitions.resize(num_partitions);
92  m_is_partition_loaded.resize(num_partitions, false);
93  }
94 
95  inline void load_vertex_partition(size_t partition_id, std::vector<sgraph_vertex_data>& vertices) {
96  DASSERT_LT(partition_id, m_num_partitions);
97  DASSERT_FALSE(m_is_partition_loaded[partition_id]);
98  m_vertex_partitions[partition_id] = &vertices;
99  m_is_partition_loaded[partition_id] = true;
100  }
101 
102  inline void update_vertex_partition(vertex_partition_exchange& vpartition_exchange) {
103  DASSERT_TRUE(m_is_partition_loaded[vpartition_exchange.partition_id]);
104 
105  auto& vertex_partition = *(m_vertex_partitions[vpartition_exchange.partition_id]);
106  auto& update_field_index = vpartition_exchange.field_ids;
107 
108  for (auto& vid_data_pair : vpartition_exchange.vertices) {
109  size_t id = vid_data_pair.first;
110  sgraph_vertex_data& vdata = vid_data_pair.second;
111  for (size_t i = 0; i < update_field_index.size(); ++i) {
112  size_t fid = vpartition_exchange.field_ids[i];
113  vertex_partition[id][fid] = vdata[i];
114  }
115  }
116  }
117 
118  inline vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set<size_t>& vertex_ids, const std::vector<size_t>& field_ids) {
119  DASSERT_TRUE(m_is_partition_loaded[partition_id]);
121  ret.partition_id = partition_id;
122  ret.field_ids = field_ids;
123  auto& vertex_partition = *(m_vertex_partitions[partition_id]);
124  for (size_t vid: vertex_ids) {
125  auto& vdata = vertex_partition[vid];
126  sgraph_vertex_data vdata_subset;
127  for (auto fid: field_ids) vdata_subset.push_back(vdata[fid]);
128  ret.vertices.push_back({vid, std::move(vdata_subset)});
129  }
130  return ret;
131  }
132 
133  private:
134  std::vector<std::vector<sgraph_vertex_data>*> m_vertex_partitions;
135  std::vector<bool> m_is_partition_loaded;
136  size_t m_num_partitions;
137 };
138 
139 } // end sgraph_compute
140 } // end turicreate
141 
142 #endif
void load_vertex_partition(size_t partition_id, std::vector< sgraph_vertex_data > &vertices)
Given a vector of all vertices of partition, initialize the local vertex storage. ...
void update_vertex_partition(vertex_partition_exchange &vpartition_exchange)
Given a vertex exchange object, update the local vertex storage.
std::vector< std::pair< size_t, sgraph_vertex_data > > vertices
#define DASSERT_FALSE(cond)
Definition: assertions.hpp:365
std::vector< flexible_type > sgraph_vertex_data
std::vector< size_t > field_ids
A subset of field ids the vertex data contain.
vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set< size_t > &vertex_ids, const std::vector< size_t > &field_ids)
Obtain a vertex exchange object containing a subset of vertices and fields.
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
size_t partition_id
id of the partition that vertices belong to.