6 #ifndef TURI_SGRAPH_SGRAPH_VERTEX_APPLY_HPP 7 #define TURI_SGRAPH_SGRAPH_VERTEX_APPLY_HPP 11 #include <type_traits> 12 #include <core/data/flexible_type/flexible_type.hpp> 14 #include <core/storage/sframe_data/sarray.hpp> 15 #include <core/storage/sgraph_data/sgraph.hpp> 16 #include <core/storage/sgraph_data/hilbert_parallel_for.hpp> 17 #include <core/storage/sgraph_data/sgraph_compute_vertex_block.hpp> 18 #include <core/util/cityhash_tc.hpp> 32 namespace sgraph_compute {
67 template <
typename Fn,
typename T>
68 std::vector<std::shared_ptr<sarray<flexible_type>>>
70 std::vector<std::shared_ptr<
sarray<T>>> & other,
78 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
79 ret_partition->open_for_write(1);
80 ret_partition->set_type(result_type);
82 ret_partition->close();
83 ret[i] = ret_partition;
119 template <
typename Fn>
120 std::vector<std::shared_ptr<sarray<flexible_type>>>
128 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
129 ret_partition->open_for_write(1);
130 ret_partition->set_type(result_type);
132 ret_partition->close();
133 ret[i] = ret_partition;
168 template <
typename Fn,
typename T>
169 std::vector<std::shared_ptr<sarray<flexible_type>>>
171 std::string column_name,
172 std::vector<std::shared_ptr<
sarray<T>>> & other,
180 std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
181 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
182 ret_partition->open_for_write(1);
183 ret_partition->set_type(result_type);
185 ret_partition->close();
186 ret[i] = ret_partition;
218 template <
typename Fn>
219 std::vector<std::shared_ptr<sarray<flexible_type>>>
221 std::string column_name,
228 std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
229 std::shared_ptr<sarray<flexible_type>> ret_partition = std::make_shared<sarray<flexible_type>>();
230 ret_partition->open_for_write(1);
231 ret_partition->set_type(result_type);
232 transform(*graph_field, *ret_partition, fn);
233 ret_partition->close();
234 ret[i] = ret_partition;
279 template <
typename ResultType,
typename Reducer,
typename Combiner>
280 typename std::enable_if<!std::is_convertible<Reducer, std::string>::value, ResultType>::type
284 ResultType init = ResultType()) {
287 ResultType ret = init;
289 std::vector<ResultType> result =
291 [&](
const std::vector<flexible_type>& left, ResultType& right) {
296 std::unique_lock<mutex> result_lock(lock);
297 for (ResultType& res: result) {
341 template <
typename ResultType,
typename Reducer,
typename Combiner>
343 std::string column_name,
346 ResultType init = ResultType()) {
349 ResultType ret = init;
351 std::shared_ptr<sarray<flexible_type>> graph_field = vdata[i].select_column(column_name);
352 std::vector<ResultType> result =
358 std::unique_lock<mutex> result_lock(lock);
359 for (ResultType& res: result) {
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
std::enable_if<!std::is_convertible< Reducer, std::string >::value, ResultType >::type vertex_reduce(sgraph &g, Reducer fn, Combiner combine, ResultType init=ResultType())
std::vector< ResultType > reduce(S &&input, FunctionType f, ResultType init=ResultType())
std::vector< sframe > & vertex_group(size_t groupid=0)
size_t get_num_partitions() const
std::vector< std::shared_ptr< sarray< flexible_type > > > vertex_apply(sgraph &g, std::vector< std::shared_ptr< sarray< T >>> &other, flex_type_enum result_type, Fn fn)
void binary_transform(S1 &&input1, S2 &&input2, T &&output, TransformFn transformfn)
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())