6 #ifndef TURI_LAMBDA_GRAPH_PYLAMBDA_EVALUATOR_HPP 7 #define TURI_LAMBDA_GRAPH_PYLAMBDA_EVALUATOR_HPP 8 #include <core/system/lambda/graph_lambda_interface.hpp> 9 #include <core/data/flexible_type/flexible_type.hpp> 11 #include <core/parallel/mutex.hpp> 29 void init(
size_t num_partitions,
const std::vector<std::string>& vertex_keys);
46 inline std::vector<sgraph_vertex_data>& get_partition(
size_t partition_id) {
47 DASSERT_LT(partition_id, m_num_partitions);
49 return m_vertex_partitions[partition_id];
52 inline bool is_loaded(
size_t partition_id) {
53 DASSERT_LT(partition_id, m_num_partitions);
54 return m_is_partition_loaded[partition_id];
58 m_vertex_partitions.clear();
59 m_is_partition_loaded.clear();
60 m_vertex_keys.clear();
65 std::vector<std::vector<sgraph_vertex_data>> m_vertex_partitions;
66 std::vector<bool> m_is_partition_loaded;
67 std::vector<std::string> m_vertex_keys;
68 size_t m_num_partitions;
72 class graph_pylambda_evaluator :
public graph_lambda_evaluator_interface {
80 graph_pylambda_evaluator();
82 ~graph_pylambda_evaluator();
84 void init(
const std::string& lambda,
85 size_t num_partitions,
86 const std::vector<std::string>& vertex_fields,
87 const std::vector<std::string>& edge_fields,
88 size_t src_column_id,
size_t dst_column_id);
102 logstream(
LOG_INFO) <<
"graph_lambda_worker load partition " << partition_id << std::endl;
103 m_graph_sync.load_vertex_partition(partition_id, vertices);
106 inline bool is_loaded(
size_t partition_id) {
107 return m_graph_sync.is_loaded(partition_id);
116 m_graph_sync.update_vertex_partition(vpartition_exchange);
123 logstream(
LOG_INFO) <<
"graph_lambda_worker get partition " << partition_id << std::endl;
124 return m_graph_sync.get_vertex_partition_exchange(partition_id, vertex_ids, field_ids);
139 std::vector<sgraph_edge_data> eval_triple_apply(
const std::vector<sgraph_edge_data>& all_edge_data,
140 size_t src_partition,
size_t dst_partition,
141 const std::vector<size_t>& mutated_edge_field_ids = {});
146 size_t m_lambda_id = size_t(-1);
148 std::vector<std::string> m_vertex_keys;
149 std::vector<std::string> m_edge_keys;
150 size_t m_srcid_column;
151 size_t m_dstid_column;
void load_vertex_partition(size_t partition_id, std::vector< sgraph_vertex_data > &vertices)
void update_vertex_partition(vertex_partition_exchange &vpartition_exchange)
vertex_partition_exchange get_vertex_partition_exchange(size_t partition_id, const std::unordered_set< size_t > &vertex_ids, const std::vector< size_t > &field_ids)
#define DASSERT_TRUE(cond)
size_t partition_id
id of the partition that vertices belong to.