turicreate.SGraph.triple_apply

SGraph.triple_apply(self, triple_apply_fn, mutated_fields, input_fields=None)

Apply a transform function to each edge and its associated source and target vertices in parallel. Each edge is visited once and in parallel. Modification to vertex data is protected by lock. The effect on the returned SGraph is equivalent to the following pseudocode:

>>> PARALLEL FOR (source, edge, target) AS triple in G:
...     LOCK (triple.source, triple.target)
...     (source, edge, target) = triple_apply_fn(triple)
...     UNLOCK (triple.source, triple.target)
... END PARALLEL FOR
Parameters:
triple_apply_fn : function

The function to apply to each triple of (source_vertex, edge, target_vertex). This function must take as input a tuple of (source_data, edge_data, target_data) and return a tuple of (new_source_data, new_edge_data, new_target_data). All variables in the both tuples must be of dict type. This can also be a toolkit extension function which is compiled as a native shared library using SDK.

mutated_fields : list[str] | str

Fields that triple_apply_fn will mutate. Note: columns that are actually mutated by the triple apply function but not specified in mutated_fields will have undetermined effects.

input_fields : list[str] | str, optional

Fields that triple_apply_fn will have access to. The default is None, which grants access to all fields. mutated_fields will always be included in input_fields.

Returns:
out : SGraph

A new SGraph with updated vertex and edge data. Only fields specified in the mutated_fields parameter are updated.

Notes

  • triple_apply does not currently support creating new fields in the lambda function.

Examples

Import turicreate and set up the graph.

>>> edges = turicreate.SFrame({'source': range(9), 'dest': range(1, 10)})
>>> g = turicreate.SGraph()
>>> g = g.add_edges(edges, src_field='source', dst_field='dest')
>>> g.vertices['degree'] = 0

Define the function to apply to each (source_node, edge, target_node) triple.

>>> def degree_count_fn (src, edge, dst):
        src['degree'] += 1
        dst['degree'] += 1
        return (src, edge, dst)

Apply the function to the SGraph.

>>> g = g.triple_apply(degree_count_fn, mutated_fields=['degree'])

Using native toolkit extension function:

#include <model_server/lib/toolkit_function_macros.hpp>
#include <vector>

using namespace turi;
std::vector<variant_type> connected_components_parameterized(
  std::map<std::string, flexible_type>& src,
  std::map<std::string, flexible_type>& edge,
  std::map<std::string, flexible_type>& dst,
  std::string column) {
    if (src[column] < dst[column]) dst[column] = src[column];
    else src[column] = dst[column];
    return {to_variant(src), to_variant(edge), to_variant(dst)};
}

BEGIN_FUNCTION_REGISTRATION
REGISTER_FUNCTION(connected_components_parameterized, "src", "edge", "dst", "column");
END_FUNCTION_REGISTRATION

compiled into example.so

>>> from example import connected_components_parameterized as cc
>>> e = tc.SFrame({'__src_id':[1,2,3,4,5], '__dst_id':[3,1,2,5,4]})
>>> g = tc.SGraph().add_edges(e)
>>> g.vertices['cid'] = g.vertices['__id']
>>> for i in range(2):
...     g = g.triple_apply(lambda src, edge, dst: cc(src, edge, dst, 'cid'), ['cid'], ['cid'])
>>> g.vertices['cid']
dtype: int
Rows: 5
[4, 1, 1, 1, 4]