6 #ifndef TURI_SGRAPH_SGRAPH_EDGE_APPLY_HPP 7 #define TURI_SGRAPH_SGRAPH_EDGE_APPLY_HPP 11 #include <type_traits> 12 #include <core/data/flexible_type/flexible_type.hpp> 14 #include <core/storage/sframe_data/sarray.hpp> 15 #include <core/storage/sgraph_data/sgraph.hpp> 16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp> 17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp> 18 #include <core/util/cityhash_tc.hpp> 32 namespace sgraph_compute {
67 template <
typename Fn,
typename T>
68 std::vector<std::shared_ptr<sarray<flexible_type>>>
70 std::vector<std::shared_ptr<
sarray<T>>> & other,
74 ASSERT_EQ(len, other.size());
75 std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
77 const std::vector<sframe>& edata = g.
edge_group();
79 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
80 ret_partition->open_for_write(1);
81 ret_partition->set_type(result_type);
83 ret_partition->close();
84 ret[i] = ret_partition;
119 template <
typename Fn>
120 std::vector<std::shared_ptr<sarray<flexible_type>>>
125 std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
127 const std::vector<sframe>& edata = g.
edge_group();
129 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
130 ret_partition->open_for_write(1);
131 ret_partition->set_type(result_type);
133 ret_partition->close();
134 ret[i] = ret_partition;
169 template <
typename Fn,
typename T>
170 std::vector<std::shared_ptr<sarray<flexible_type>>>
172 std::string column_name,
173 std::vector<std::shared_ptr<
sarray<T>>> & other,
177 ASSERT_EQ(other.size(), len);
178 std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
180 const std::vector<sframe>& edata = g.
edge_group();
182 std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
183 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
184 ret_partition->open_for_write(1);
185 ret_partition->set_type(result_type);
187 ret_partition->close();
188 ret[i] = ret_partition;
218 template <
typename Fn>
219 std::vector<std::shared_ptr<sarray<flexible_type>>>
221 std::string column_name,
225 std::vector<std::shared_ptr<sarray<flexible_type>>> ret(len);
227 const std::vector<sframe>& edata = g.
edge_group();
229 std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
230 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
231 ret_partition->open_for_write(1);
232 ret_partition->set_type(result_type);
233 transform(*graph_field, *ret_partition, fn);
234 ret_partition->close();
235 ret[i] = ret_partition;
280 template <
typename ResultType,
typename Reducer,
typename Combiner>
281 typename std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type
285 ResultType init = ResultType()) {
286 const std::vector<sframe>& edata = g.
edge_group();
289 ResultType ret = init;
291 std::vector<ResultType> result =
293 [&](
const std::vector<flexible_type>& left, ResultType& right) {
298 std::unique_lock<mutex> result_lock(lock);
299 for (ResultType& res: result) {
343 template <
typename ResultType,
typename Reducer,
typename Combiner>
345 std::string column_name,
348 ResultType init = ResultType()) {
349 const std::vector<sframe>& edata = g.
edge_group();
352 ResultType ret = init;
354 std::shared_ptr<sarray<flexible_type>> graph_field = edata[i].select_column(column_name);
355 std::vector<ResultType> result =
361 std::unique_lock<mutex> result_lock(lock);
362 for (ResultType& res: result) {
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
std::vector< ResultType > reduce(S &&input, FunctionType f, ResultType init=ResultType())
std::vector< std::shared_ptr< sarray< flexible_type > > > edge_apply(sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
size_t get_num_partitions() const
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type edge_reduce(sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
std::vector< sframe > & edge_group(size_t groupa=0, size_t groupb=0)
void binary_transform(S1 &&input1, S2 &&input2, T &&output, TransformFn transformfn)
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())