Turi Create  4.0
turi::sgraph_compute Namespace Reference

Classes

class  edge_scope
 
class  fast_edge_scope
 
class  sgraph_engine
 
class  sgraph_synchronize
 
class  sgraph_synchronize_interface
 
class  vertex_block
 
struct  vertex_partition_exchange
 

Functions

void hilbert_blocked_parallel_for (size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn, size_t parallel_limit=SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS)
 
void hilbert_parallel_for (size_t n, std::function< void(std::vector< std::pair< size_t, size_t > >) > preamble, std::function< void(std::pair< size_t, size_t >)> fn)
 
template<typename Fn , typename T >
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply (sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
 
template<typename Fn >
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply (sgraph &g, flex_type_enum result_type, Fn fn)
 
template<typename Fn , typename T >
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply (sgraph &g, std::string column_name, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
 
template<typename Fn >
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply (sgraph &g, std::string column_name, flex_type_enum result_type, Fn fn)
 
template<typename ResultType , typename Reducer , typename Combiner >
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type edge_reduce (sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
 
template<typename ResultType , typename Reducer , typename Combiner >
ResultType edge_reduce (sgraph &g, std::string column_name, Reducer fn, Combiner combine, ResultType init=ResultType())
 
void fast_triple_apply (sgraph &g, fast_triple_apply_fn_type apply_fn, const std::vector< std::string > &edge_fields, const std::vector< std::string > &mutated_edge_fields)
 
template<typename T >
std::vector< std::vector< T > > create_vertex_data (const sgraph &g)
 
void triple_apply (sgraph &g, triple_apply_fn_type apply_fn, const std::vector< std::string > &mutated_vertex_fields, const std::vector< std::string > &mutated_edge_fields={}, bool requires_vertex_id=true)
 
void triple_apply (sgraph &g, batch_triple_apply_fn_type batch_apply_fn, const std::vector< std::string > &mutated_vertex_fields, const std::vector< std::string > &mutated_edge_fields={})
 
void batch_triple_apply_mock (sgraph &g, triple_apply_fn_type apply_fn, const std::vector< std::string > &mutated_vertex_fields, const std::vector< std::string > &mutated_edge_fields={})
 
template<typename Fn , typename T >
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply (sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
 
template<typename Fn >
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply (sgraph &g, flex_type_enum result_type, Fn fn)
 
template<typename Fn , typename T >
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply (sgraph &g, std::string column_name, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
 
template<typename Fn >
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply (sgraph &g, std::string column_name, flex_type_enum result_type, Fn fn)
 
template<typename ResultType , typename Reducer , typename Combiner >
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type vertex_reduce (sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
 
template<typename ResultType , typename Reducer , typename Combiner >
ResultType vertex_reduce (sgraph &g, std::string column_name, Reducer fn, Combiner combine, ResultType init=ResultType())
 

Detailed Description

Graph Computation Functions

Function Documentation

◆ batch_triple_apply_mock()

void turi::sgraph_compute::batch_triple_apply_mock ( sgraph g,
triple_apply_fn_type  apply_fn,
const std::vector< std::string > &  mutated_vertex_fields,
const std::vector< std::string > &  mutated_edge_fields = {} 
)

Mock the single triple apply using batch_triple_apply implementation. Used for testing only.

◆ create_vertex_data()

template<typename T >
std::vector<std::vector<T> > turi::sgraph_compute::create_vertex_data ( const sgraph g)

Utility function

Definition at line 96 of file sgraph_fast_triple_apply.hpp.

◆ edge_apply() [1/4]

template<typename Fn , typename T >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::edge_apply ( sgraph g,
std::vector< std::shared_ptr< sarray< T >>> &  other,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation combining one external array (other) with the graph edge data. other must be the same length as the edge data. Abstractly performs the following computation:

for each edge i:
out[i] = fn(edge[i], other[i])

out must be of the result_type specified.

The function must take as the first argument, a vector<flexible_type> and the second argument, T, and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a change in message value between the existing column, and a new computed column:

const size_t msg_idx = g.get_edge_field_id("message");
// compute the change in message
g,
new_message, // a vector<shared_ptr<sarray<flexible_type>>>
[&](const std::vector<flexible_type>& edata, const flexible_type& y) {
return std::abs((double)(edata[msg_idx]) - (double)(y));
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 69 of file sgraph_edge_apply.hpp.

◆ edge_apply() [2/4]

template<typename Fn >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::edge_apply ( sgraph g,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation on graph edge_data. Abstractly performs the following computation:

for each edge i:
out[i] = fn(edge_data[i])

out must be of the result_type specified.

The function must take as the only argument, a vector<flexible_type>. and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a "normalized" sampling vector.

auto normalized = sgraph_compute::edge_apply(
g,
[&](std::vector<flexible_type>& edata) {
double sum = 0.0;
for (const auto& v : edata) sum += v;
for (auto& v : edata): v /= sum;
return edata;
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 121 of file sgraph_edge_apply.hpp.

◆ edge_apply() [3/4]

template<typename Fn , typename T >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::edge_apply ( sgraph g,
std::string  column_name,
std::vector< std::shared_ptr< sarray< T >>> &  other,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation combining one external array (other) with one column of the graph edge data. other must be the same length as the edge data. Abstractly performs the following computation:

for each edge i:
out[i] = fn(edge_data[column_name][i], other[i])

out must be of the result_type specified.

The function must take as the first argument, a flexible_type and the second argument, T, and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a change in message value between the existing column, and a new computed column:

// compute the change in message
g,
"message",
new_message, // a vector<shared_ptr<sarray<flexible_type>>>
[&](const flexible_type& edata, const flexible_type& y) {
return std::abs((double)(edata) - (double)(y));
});

Definition at line 171 of file sgraph_edge_apply.hpp.

◆ edge_apply() [4/4]

template<typename Fn >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::edge_apply ( sgraph g,
std::string  column_name,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation on one column of the graph edge_data. Abstractly performs the following computation:

for each edge i:
out[i] = fn(edge_data[column_name][i])

out must be of the result_type specified.

The function must take as the only argument, a flexible_type. and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute the log of the message column.

auto normalized = sgraph_compute::edge_apply(
g,
"message",
[&](const flexible_type& y) {
return log(y);
});

Definition at line 220 of file sgraph_edge_apply.hpp.

◆ edge_reduce() [1/2]

template<typename ResultType , typename Reducer , typename Combiner >
std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type turi::sgraph_compute::edge_reduce ( sgraph g,
Reducer  fn,
Combiner  combine,
ResultType  init = ResultType() 
)

Performs a reduction over the graph data. If you are only reducing over one column, see the alternative overload.

The edge data is partitioned into small chunks. Within each chunk, the reducer function is called on every element using init as the initial value. This accomplishes a collection of partial reductions. Finally, the combine function is used to merge all the partial reductions which is then returned.

Abstractly performs the following computation:

total_reduction = init
for each partition:
partial_reduction[partition] = init
for each edge i in partition:
reducer(edge_data[i], partial_reduction[partition])
combiner(partial_reduction[partition], total_reduction)
return total_reduction

Example. Here were compute the sum of the triangle_count field of every edge.

const size_t triangle_idx = g.get_edge_field_id("triangle_counts");
total_triangles =
sgraph_compute::reduce<double>(g,
[](const std::vector<flexible_type>& edata, double& acc) {
// ran on each edge data
acc += (flex_int)edata[triangle_idx];
},
[](const double& v, double& acc) {
// partial combiner.
acc += v;
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 282 of file sgraph_edge_apply.hpp.

◆ edge_reduce() [2/2]

template<typename ResultType , typename Reducer , typename Combiner >
ResultType turi::sgraph_compute::edge_reduce ( sgraph g,
std::string  column_name,
Reducer  fn,
Combiner  combine,
ResultType  init = ResultType() 
)

Performs a reduction over a single column of the graph data.

The edge data is partitioned into small chunks. Within each chunk, the reducer function is called on every element using init as the initial value. This accomplishes a collection of partial reductions. Finally, the combine function is used to merge all the partial reductions which is then returned.

Abstractly performs the following computation:

total_reduction = init
for each partition:
partial_reduction[partition] = init
for each edge i in partition:
reducer(edge_data[columnname][i], partial_reduction[partition])
combiner(partial_reduction[partition], total_reduction)
return total_reduction

Example. Here were compute the sum of the triangle field of every edge.

total_triangles =
sgraph_compute::reduce<double>(g,
"triangle",
[](const flexible_type& tr, double& acc) {
// ran on each edge data
acc += (flex_int)tr;
},
[](const double& v, double& acc) {
// partial combiner.
acc += v;
});

Definition at line 344 of file sgraph_edge_apply.hpp.

◆ fast_triple_apply()

void turi::sgraph_compute::fast_triple_apply ( sgraph g,
fast_triple_apply_fn_type  apply_fn,
const std::vector< std::string > &  edge_fields,
const std::vector< std::string > &  mutated_edge_fields 
)

A faster and simplified version of triple_apply.

The "faster" assumption is based on that vertex data can be loaded entirey into memory and accessed by the apply function through addressing.

The interface made it possible for vertex data to stay in memory across multiple triple applies before commiting to the disk.

Main interface difference:

  1. Vertex data are provided as vertex address, allowing user to specify their own vertex data storage.
  2. Allowing user to explicitly specify which edge fields are required to read and mutate.
  3. Vertex locking is ommited for simplification. (we can add it later if needed).
Parameters
gThe target graph to perform the transformation.
apply_fnThe user defined function that will be applied on each edge scope.
vertex_fieldsA subset of vertex data columns that the apply_fn will access.
mutated_vertex_fieldsA subset of columns in vertex_fields that the apply_fn will modify.

◆ hilbert_blocked_parallel_for()

void turi::sgraph_compute::hilbert_blocked_parallel_for ( size_t  n,
std::function< void(std::vector< std::pair< size_t, size_t > >) >  preamble,
std::function< void(std::pair< size_t, size_t >)>  fn,
size_t  parallel_limit = SGRAPH_HILBERT_CURVE_PARALLEL_FOR_NUM_THREADS 
)
inline

This performs a parallel sweep over an n*n grid following the Hilbert curve ordering. The parallel sweep is broken into two parts. A "preamble" callback which is called sequentially, which contains a list of all the coordinates to be executed in the next pass, and a function which is executed on every coordinate in the pass.

The function abstractly implements the following:

for i = 0 to n*n step parallel_limit
// collect all the coordinates to be run in this pass
std::vector<pair<size_t, size_t> > coordinates
for j = i to min(i + parallel_limit, n*n)
coordinates.push_back(convert_hilbert_curve_to_coordinates(j))
// run the preamble
preamble(coordinates)
parallel for over coordinate in coordinates:
fn(coordinate)

n must be at least 2 and a power of 2.

Definition at line 54 of file hilbert_parallel_for.hpp.

◆ hilbert_parallel_for()

void turi::sgraph_compute::hilbert_parallel_for ( size_t  n,
std::function< void(std::vector< std::pair< size_t, size_t > >) >  preamble,
std::function< void(std::pair< size_t, size_t >)>  fn 
)
inline

Non blocking version.

Definition at line 73 of file hilbert_parallel_for.hpp.

◆ triple_apply() [1/2]

void turi::sgraph_compute::triple_apply ( sgraph g,
triple_apply_fn_type  apply_fn,
const std::vector< std::string > &  mutated_vertex_fields,
const std::vector< std::string > &  mutated_edge_fields = {},
bool  requires_vertex_id = true 
)

Apply a transform function on each edge and its associated source and target vertices in parallel. Each edge is visited once and in parallel. The modification to vertex data will be protected by lock.

The effect of the function is equivalent to the following pesudo-code:

parallel_for (edge in g) {
lock(edge.source(), edge.target())
apply_fn(edge.source().data(), edge.data(), edge.target().data());
unlock(edge.source(), edge.target())
}
Parameters
gThe target graph to perform the transformation.
apply_fnThe user defined function that will be applied on each edge scope.
mutated_vertex_fieldsA subset of vertex data columns that the apply_fn will modify.
mutated_edge_fieldsA subset of edge data columns that the apply_fn will modify.
requires_vertex_idSet to false for optimization when vertex id is not required for triple_apply computation.

The behavior is undefined when mutated_vertex_fields, and mutated_edge_fields are inconsistent with the apply_fn function.

◆ triple_apply() [2/2]

void turi::sgraph_compute::triple_apply ( sgraph g,
batch_triple_apply_fn_type  batch_apply_fn,
const std::vector< std::string > &  mutated_vertex_fields,
const std::vector< std::string > &  mutated_edge_fields = {} 
)

Overload. Take the apply function that processes a batch of edges at once. Used for testing the building block of lambda triple apply.

◆ vertex_apply() [1/4]

template<typename Fn , typename T >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::vertex_apply ( sgraph g,
std::vector< std::shared_ptr< sarray< T >>> &  other,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation combining one external array (other) with the graph vertex data. other must be the same length as the vertex data. Abstractly performs the following computation:

for each vertex i:
out[i] = fn(vertex_data[i], other[i])

out must be of the result_type specified.

The function must take as the first argument, a vector<flexible_type> and the second argument, T, and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a change in pagerank value between the existing pagerank column, and a new computed column:

const size_t pr_idx = g.get_vertex_field_id("pagerank");
// compute the change in pagerank
g,
new_pagerank, // a vector<shared_ptr<sarray<flexible_type>>>
[&](const std::vector<flexible_type>& vdata, const flexible_type& y) {
return std::abs((double)(vdata[pr_idx]) - (double)(y));
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 69 of file sgraph_vertex_apply.hpp.

◆ vertex_apply() [2/4]

template<typename Fn >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::vertex_apply ( sgraph g,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation on graph vertex_data. Abstractly performs the following computation:

for each vertex i:
out[i] = fn(vertex_data[i])

out must be of the result_type specified.

The function must take as the only argument, a vector<flexible_type>. and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a "normalized" pagerank.

double pagerank_sum = [...from reduce function below...]
const size_t pr_idx = g.get_vertex_field_id("pagerank");
auto normalized = sgraph_compute::vertex_apply(
g,
[&](const std::vector<flexible_type>& vdata) {
return vdata[pr_idx] / pagerank_sum;
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 121 of file sgraph_vertex_apply.hpp.

◆ vertex_apply() [3/4]

template<typename Fn , typename T >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::vertex_apply ( sgraph g,
std::string  column_name,
std::vector< std::shared_ptr< sarray< T >>> &  other,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation combining one external array (other) with one column of the graph vertex data. other must be the same length as the vertex data. Abstractly performs the following computation:

for each vertex i:
out[i] = fn(vertex_data[column_name][i], other[i])

out must be of the result_type specified.

The function must take as the first argument, a flexible_type and the second argument, T, and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a change in pagerank value between the existing pagerank column, and a new computed column:

// compute the change in pagerank
g,
"pagerank",
new_pagerank, // a vector<shared_ptr<sarray<flexible_type>>>
[&](const flexible_type& vdata, const flexible_type& y) {
return std::abs((double)(vdata) - (double)(y));
});

Definition at line 170 of file sgraph_vertex_apply.hpp.

◆ vertex_apply() [4/4]

template<typename Fn >
std::vector<std::shared_ptr<sarray<flexible_type> > > turi::sgraph_compute::vertex_apply ( sgraph g,
std::string  column_name,
flex_type_enum  result_type,
Fn  fn 
)

Performs a map operation on one column of the graph vertex_data. Abstractly performs the following computation:

for each vertex i:
out[i] = fn(vertex_data[column_name][i])

out must be of the result_type specified.

The function must take as the only argument, a flexible_type. and must return an object castable to a flexible_type of the result_type specified.

For instance, if I am going to compute a "normalized" pagerank.

double pagerank_sum = [...from reduce function below...]
auto normalized = sgraph_compute::vertex_apply(
g,
"pagerank",
[&](const flexible_type& y) {
return y / pagerank_sum;
});

Definition at line 220 of file sgraph_vertex_apply.hpp.

◆ vertex_reduce() [1/2]

template<typename ResultType , typename Reducer , typename Combiner >
std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type turi::sgraph_compute::vertex_reduce ( sgraph g,
Reducer  fn,
Combiner  combine,
ResultType  init = ResultType() 
)

Performs a reduction over the graph data. If you are only reducing over one column, see the alternative overload.

The vertex data is partitioned into small chunks. Within each chunk, the reducer function is called on every element using init as the initial value. This accomplishes a collection of partial reductions. Finally, the combine function is used to merge all the partial reductions which is then returned.

Abstractly performs the following computation:

total_reduction = init
for each partition:
partial_reduction[partition] = init
for each vertex i in partition:
reducer(vertex_data[i], partial_reduction[partition])
combiner(partial_reduction[partition], total_reduction)
return total_reduction

Example. Here were compute the sum of the pagerank field of every vertex.

const size_t pr_idx = g.get_vertex_field_id("pagerank");
total_pagerank =
sgraph_compute::reduce<double>(g,
[](const std::vector<flexible_type>& vdata, double& acc) {
// ran on each vertex data
acc += (flex_float)vdata[pr_idx];
},
[](const double& v, double& acc) {
// partial combiner.
acc += v;
});

Note that if the apply is only going to access one column, the alternative overload will be more efficient.

Definition at line 281 of file sgraph_vertex_apply.hpp.

◆ vertex_reduce() [2/2]

template<typename ResultType , typename Reducer , typename Combiner >
ResultType turi::sgraph_compute::vertex_reduce ( sgraph g,
std::string  column_name,
Reducer  fn,
Combiner  combine,
ResultType  init = ResultType() 
)

Performs a reduction over a single column of the graph data.

The vertex data is partitioned into small chunks. Within each chunk, the reducer function is called on every element using init as the initial value. This accomplishes a collection of partial reductions. Finally, the combine function is used to merge all the partial reductions which is then returned.

Abstractly performs the following computation:

total_reduction = init
for each partition:
partial_reduction[partition] = init
for each vertex i in partition:
reducer(vertex_data[columnname][i], partial_reduction[partition])
combiner(partial_reduction[partition], total_reduction)
return total_reduction

Example. Here were compute the sum of the pagerank field of every vertex.

total_pagerank =
sgraph_compute::reduce<double>(g,
"pagerank",
[](const flexible_type& pr, double& acc) {
// ran on each vertex data
acc += (flex_float)pr;
},
[](const double& v, double& acc) {
// partial combiner.
acc += v;
});

Definition at line 342 of file sgraph_vertex_apply.hpp.