Turi Create  4.0
graph_pylambda.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_LAMBDA_GRAPH_PYLAMBDA_EVALUATOR_HPP
7 #define TURI_LAMBDA_GRAPH_PYLAMBDA_EVALUATOR_HPP
8 #include <core/system/lambda/graph_lambda_interface.hpp>
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <vector>
11 #include <core/parallel/mutex.hpp>
12 #include <atomic>
13 
14 namespace turi {
15 
16 namespace lambda {
17 
18 /**
19  * Implements sgraph synchronizing interface for graph pylambda worker.
20  *
21  * In the pylambda environment, vertex data are stored locally as python dictionary object.
22  * This class provides the following synchronization functionality:
23  * 1) Load a python object vertex partition given the flexible_type vertex partition.
24  * 2) Update the python object vertex partition given a vertex_partition_exchange.
25  * 3) Create a vertex_partition_exchange object from the stored python object vertex partition.
26  */
28  public:
29  void init(size_t num_partitions, const std::vector<std::string>& vertex_keys);
30 
31  /**
32  * Load a python object vertex partition given the flexible_type vertex partition.
33  */
34  void load_vertex_partition(size_t partition_id, std::vector<sgraph_vertex_data>& vertices);
35 
36  /**
37  * Update the python object vertex partition given a vertex_partition_exchange.
38  */
39  void update_vertex_partition(vertex_partition_exchange& vpartition_exchange);
40 
41  /**
42  * Create a vertex_partition_exchange object from the stored python object vertex partition.
43  */
44  vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set<size_t>& vertex_ids, const std::vector<size_t>& field_ids);
45 
46  inline std::vector<sgraph_vertex_data>& get_partition(size_t partition_id) {
47  DASSERT_LT(partition_id, m_num_partitions);
48  DASSERT_TRUE(is_loaded(partition_id));
49  return m_vertex_partitions[partition_id];
50  }
51 
52  inline bool is_loaded(size_t partition_id) {
53  DASSERT_LT(partition_id, m_num_partitions);
54  return m_is_partition_loaded[partition_id];
55  }
56 
57  inline void clear() {
58  m_vertex_partitions.clear();
59  m_is_partition_loaded.clear();
60  m_vertex_keys.clear();
61  m_num_partitions = 0;
62  }
63 
64  private:
65  std::vector<std::vector<sgraph_vertex_data>> m_vertex_partitions;
66  std::vector<bool> m_is_partition_loaded;
67  std::vector<std::string> m_vertex_keys;
68  size_t m_num_partitions;
69 };
70 
71 
72 class graph_pylambda_evaluator : public graph_lambda_evaluator_interface {
73  public:
74 
75  /**************************************************************************/
76  /* */
77  /* Constructor and Destructor */
78  /* */
79  /**************************************************************************/
80  graph_pylambda_evaluator();
81 
82  ~graph_pylambda_evaluator();
83 
84  void init(const std::string& lambda,
85  size_t num_partitions,
86  const std::vector<std::string>& vertex_fields,
87  const std::vector<std::string>& edge_fields,
88  size_t src_column_id, size_t dst_column_id);
89 
90  void clear();
91 
92 
93  /**************************************************************************/
94  /* */
95  /* Communication Interface */
96  /* */
97  /**************************************************************************/
98  /**
99  * Load a python object vertex partition given the flexible_type vertex partition.
100  */
101  inline void load_vertex_partition(size_t partition_id, std::vector<sgraph_vertex_data>& vertices) {
102  logstream(LOG_INFO) << "graph_lambda_worker load partition " << partition_id << std::endl;
103  m_graph_sync.load_vertex_partition(partition_id, vertices);
104  }
105 
106  inline bool is_loaded(size_t partition_id) {
107  return m_graph_sync.is_loaded(partition_id);
108  }
109 
110  /**
111  * Update the pyobject vertex partition with the vertex_partition_exchange.
112  */
113  inline void update_vertex_partition(vertex_partition_exchange& vpartition_exchange) {
114  logstream(LOG_INFO) << "graph_lambda_worker update partition "
115  << vpartition_exchange.partition_id << std::endl;
116  m_graph_sync.update_vertex_partition(vpartition_exchange);
117  }
118 
119  /**
120  * Create a vertex_partition_exchange object from the python vertex partition.
121  */
122  inline vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set<size_t>& vertex_ids, const std::vector<size_t>& field_ids) {
123  logstream(LOG_INFO) << "graph_lambda_worker get partition " << partition_id << std::endl;
124  return m_graph_sync.get_vertex_partition_exchange(partition_id, vertex_ids, field_ids);
125  }
126 
127  /**************************************************************************/
128  /* */
129  /* Compute Interface */
130  /* */
131  /**************************************************************************/
132  /**
133  * Apply the lambda function on a vector of edge data, and return
134  * the vector of mutated edge data.
135  *
136  * The return vector is aligned with the input vector, but each element only contains
137  * the fields specified mutated_edge_field_ids.
138  */
139  std::vector<sgraph_edge_data> eval_triple_apply(const std::vector<sgraph_edge_data>& all_edge_data,
140  size_t src_partition, size_t dst_partition,
141  const std::vector<size_t>& mutated_edge_field_ids = {});
142 
143  private:
144  mutex m_mutex;
145 
146  size_t m_lambda_id = size_t(-1);
147 
148  std::vector<std::string> m_vertex_keys;
149  std::vector<std::string> m_edge_keys;
150  size_t m_srcid_column;
151  size_t m_dstid_column;
152 
153  pysgraph_synchronize m_graph_sync;
154 };
155 
156 } // end of lambda
157 } // end of turicreate
158 
159 #endif
#define logstream(lvl)
Definition: logger.hpp:276
#define LOG_INFO
Definition: logger.hpp:101
void load_vertex_partition(size_t partition_id, std::vector< sgraph_vertex_data > &vertices)
void update_vertex_partition(vertex_partition_exchange &vpartition_exchange)
vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set< size_t > &vertex_ids, const std::vector< size_t > &field_ids)
#define DASSERT_TRUE(cond)
Definition: assertions.hpp:364
size_t partition_id
id of the partition that vertices belong to.