6 #ifndef TURI_SGRAPH_SGRAPH_sgraph_engine_HPP 7 #define TURI_SGRAPH_SGRAPH_sgraph_engine_HPP 11 #include <type_traits> 12 #include <core/data/flexible_type/flexible_type.hpp> 14 #include <core/storage/sframe_data/sarray.hpp> 15 #include <core/storage/sgraph_data/sgraph.hpp> 16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp> 17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp> 18 #include <core/util/cityhash_tc.hpp> 31 namespace sgraph_compute {
111 template <
typename T>
114 typedef sgraph::vertex_partition_address vertex_partition_address;
115 typedef sgraph::edge_partition_address edge_partition_address;
116 typedef sgraph::edge_direction edge_direction;
117 typedef std::vector<flexible_type> graph_data_type;
126 using const_gather_function_type = std::function<void(
const graph_data_type& center,
127 const graph_data_type& edge,
128 const graph_data_type& other,
129 edge_direction edgedir,
131 std::vector<std::shared_ptr<sarray<T>>> gather(
sgraph& graph,
132 const_gather_function_type gather,
133 const T& initial_value,
134 edge_direction edgedir = edge_direction::ANY_EDGE,
135 size_t central_group = 0,
136 std::unordered_set<size_t> sgraph_compute_group = {0},
137 size_t parallel_limit = (size_t)(-1)) {
138 if (parallel_limit == static_cast<size_t>(-1)) {
141 init_data_structures(graph, central_group, initial_value);
146 [&](std::vector<std::pair<size_t, size_t> > edgeparts) {
147 std::set<vertex_partition_address> vertex_partitions;
148 std::set<size_t> combine_partitions;
153 for(
auto edgepart: edgeparts) {
155 << edgepart.first <<
" " << edgepart.second << std::endl;
156 for(
size_t gather_vgroup: sgraph_compute_group) {
157 if (edgedir == edge_direction::ANY_EDGE ||
158 edgedir == edge_direction::IN_EDGE) {
163 edge_partition_address address(gather_vgroup, central_group,
164 edgepart.first, edgepart.second);
165 combine_partitions.insert(address.get_dst_vertex_partition().partition);
166 vertex_partitions.insert(address.get_src_vertex_partition());
167 vertex_partitions.insert(address.get_dst_vertex_partition());
169 if (edgedir == edge_direction::ANY_EDGE ||
170 edgedir == edge_direction::OUT_EDGE) {
175 edge_partition_address address(central_group, gather_vgroup,
176 edgepart.first, edgepart.second);
177 combine_partitions.insert(address.get_src_vertex_partition().partition);
178 vertex_partitions.insert(address.get_src_vertex_partition());
179 vertex_partitions.insert(address.get_dst_vertex_partition());
184 load_graph_vertex_blocks(graph, vertex_partitions);
185 load_combine_blocks(combine_partitions);
189 [&](std::pair<size_t, size_t> edgepart) {
196 for(
size_t gather_vgroup: sgraph_compute_group) {
197 edge_partition_address address;
199 address = edge_partition_address(gather_vgroup, central_group,
200 edgepart.first, edgepart.second);
202 compute_const_gather(edgeframe, address, central_group, edgedir, gather);
206 load_combine_blocks(std::set<size_t>());
207 return combine_sarrays;
217 using const_edge_map_function_type = std::function<T(
const graph_data_type& source,
218 graph_data_type& edge,
219 const graph_data_type& target)>;
221 std::vector<std::shared_ptr<sarray<T>>> parallel_for_edges(
sgraph& graph,
222 const_edge_map_function_type map_fn,
224 size_t groupa = 0,
size_t groupb = 0,
225 size_t parallel_limit = (
size_t)(-1)) {
226 if (parallel_limit == static_cast<size_t>(-1)) {
234 std::vector<std::shared_ptr<sarray<T>>> return_edge_value(return_size);
239 [&](std::vector<std::pair<size_t, size_t> > edgeparts) {
240 std::set<vertex_partition_address> vertex_partitions;
241 std::set<size_t> combine_partitions;
246 for(
auto edgepart: edgeparts) {
248 << edgepart.first <<
" " << edgepart.second << std::endl;
253 edge_partition_address address(groupa, groupb, edgepart.first, edgepart.second);
254 combine_partitions.insert(address.get_dst_vertex_partition().partition);
255 vertex_partitions.insert(address.get_src_vertex_partition());
256 vertex_partitions.insert(address.get_dst_vertex_partition());
259 load_graph_vertex_blocks(graph, vertex_partitions);
263 [&](std::pair<size_t, size_t> edgepart) {
270 edge_partition_address address(groupa, groupb,
271 edgepart.first, edgepart.second);
274 return_edge_value[partid] = compute_edge_map(edgeframe, address, map_fn, ret_type);
276 return return_edge_value;
281 std::vector<std::vector<vertex_block<sframe> > > vertex_data;
283 std::vector<vertex_block<sarray<T> > > combine_data;
284 std::vector<std::shared_ptr<sarray<T> > > combine_sarrays;
285 static constexpr
size_t LOCK_ARRAY_SIZE = 1024;
289 template <
typename S>
290 typename std::enable_if<std::is_same<S, flexible_type>::value>::type
291 set_return_type(
const S& t) {
292 m_return_type = t.get_type();
295 template <
typename S>
296 typename std::enable_if<!std::is_same<S, flexible_type>::value>::type
297 set_return_type(
const S& t) { }
303 void init_data_structures(
const sgraph& graph,
304 size_t sgraph_compute_group,
305 const T& initial_value) {
307 combine_data.clear();
308 combine_sarrays.clear();
309 set_return_type(initial_value);
327 combine_sarrays[i].reset(
new sarray<T>());
328 combine_sarrays[i]->open_for_write(1);
329 auto iter = combine_sarrays[i]->get_output_iterator(0);
331 (*iter) = initial_value;
334 combine_sarrays[i]->close();
343 void load_graph_vertex_blocks(
sgraph& graph,
344 const std::set<vertex_partition_address>& vertex_address) {
347 for (
size_t partition = 0; partition < vertex_data[
group].size(); ++partition) {
348 if (vertex_data[
group][partition].is_loaded() &&
349 vertex_address.count({
group, partition}) == 0) {
350 vertex_data[
group][partition].unload();
354 std::vector<vertex_partition_address> vertex_address_vec;
355 std::copy(vertex_address.begin(), vertex_address.end(),
356 std::inserter(vertex_address_vec, vertex_address_vec.end()));
360 vertex_address_vec.end(),
363 [&](vertex_partition_address& part) {
368 << part.group <<
" " << part.partition << std::endl;
369 vertex_data[part.group][part.partition].load_if_not_loaded(frame);
379 void load_combine_blocks(
const std::set<size_t> partitions) {
380 for (
size_t partition = 0; partition < combine_data.size(); ++partition) {
381 if (combine_data[partition].is_loaded() && partitions.count(partition) == 0) {
383 combine_sarrays[partition].reset(
new sarray<T>());
384 combine_sarrays[partition]->open_for_write(1);
386 combine_sarrays[partition]->set_type(m_return_type);
388 combine_data[partition].flush(*combine_sarrays[partition]);
390 combine_data[partition].unload();
394 std::vector<size_t> partitions_vec;
395 std::copy(partitions.begin(), partitions.end(),
396 std::inserter(partitions_vec, partitions_vec.end()));
400 partitions_vec.end(),
403 combine_data[part].load_if_not_loaded(*combine_sarrays[part]);
407 void compute_const_gather(
sframe& edgeframe,
408 edge_partition_address address,
409 size_t central_group,
410 edge_direction edgedir,
411 const_gather_function_type& gather) {
413 size_t row_start = 0;
414 size_t row_end = reader->num_rows();
415 size_t srcid_column = edgeframe.
column_index(sgraph::SRC_COLUMN_NAME);
416 size_t dstid_column = edgeframe.
column_index(sgraph::DST_COLUMN_NAME);
418 vertex_partition_address src_address = address.get_src_vertex_partition();
419 vertex_partition_address dst_address = address.get_dst_vertex_partition();
420 while (row_start < row_end) {
421 size_t nrows = std::min<size_t>(1024, row_end - row_start);
422 std::vector<std::vector<flexible_type> > all_edgedata;
423 reader->read_rows(row_start, row_start + nrows, all_edgedata);
424 for (
const auto& edgedata : all_edgedata) {
428 size_t srcid = edgedata[srcid_column];
429 size_t dstid = edgedata[dstid_column];
431 if (edgedir == edge_direction::IN_EDGE ||
432 edgedir == edge_direction::ANY_EDGE) {
433 DASSERT_EQ(address.dst_group, central_group);
436 std::unique_lock<turi::mutex> guard(lock_array[vertexhash % LOCK_ARRAY_SIZE]);
439 gather(vertex_data[dst_address.group][dst_address.partition][dstid],
441 vertex_data[src_address.group][src_address.partition][srcid],
442 edge_direction::IN_EDGE,
443 combine_data[dst_address.partition][dstid]);
445 if (edgedir == edge_direction::OUT_EDGE || edgedir == edge_direction::ANY_EDGE) {
446 DASSERT_EQ(address.src_group, central_group);
449 std::unique_lock<turi::mutex> guard(lock_array[vertexhash % LOCK_ARRAY_SIZE]);
452 gather(vertex_data[src_address.group][src_address.partition][srcid],
454 vertex_data[dst_address.group][dst_address.partition][dstid],
455 edge_direction::OUT_EDGE,
456 combine_data[src_address.partition][srcid]);
463 std::shared_ptr<sarray<T>> compute_edge_map(
sframe& edgeframe,
464 edge_partition_address address,
465 const_edge_map_function_type map_fn,
467 std::shared_ptr<sarray<T>> ret = std::make_shared<sarray<T>>();
468 ret->open_for_write(1);
469 ret->set_type(ret_type);
470 auto out = ret->get_output_iterator(0);
473 size_t row_start = 0;
474 size_t row_end = reader->num_rows();
475 size_t srcid_column = edgeframe.
column_index(sgraph::SRC_COLUMN_NAME);
476 size_t dstid_column = edgeframe.
column_index(sgraph::DST_COLUMN_NAME);
478 vertex_partition_address src_address = address.get_src_vertex_partition();
479 vertex_partition_address dst_address = address.get_dst_vertex_partition();
480 while (row_start < row_end) {
481 size_t nrows = std::min<size_t>(1024, row_end - row_start);
482 std::vector<std::vector<flexible_type> > all_edgedata;
483 reader->read_rows(row_start, row_start + nrows, all_edgedata);
484 for (
auto& edgedata : all_edgedata) {
485 size_t srcid = edgedata[srcid_column];
486 size_t dstid = edgedata[dstid_column];
487 *out = map_fn(vertex_data[src_address.group][src_address.partition][srcid],
489 vertex_data[dst_address.group][dst_address.partition][dstid]);
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
sframe group(sframe sframe_in, std::string key_column)
size_t column_index(const std::string &column_name) const
static uint64_t hash64_combine(uint64_t h1, uint64_t h2)
size_t get_num_groups() const
size_t num_rows() const
Returns the length of each sarray.
static size_t cpu_count()
size_t get_num_partitions() const
static uint64_t hash64(const char *s, size_t len)
void copy(const std::string src, const std::string dest)
sframe & edge_partition(size_t partition1, size_t partition2, size_t groupa=0, size_t groupb=0)
std::unique_ptr< reader_type > get_reader() const
void hilbert_blocked_parallel_for(size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn, size_t parallel_limit=SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS)
sframe & vertex_partition(size_t partition, size_t groupid=0)