6 #ifndef TURI_SGRAPH_SGRAPH_HPP 7 #define TURI_SGRAPH_SGRAPH_HPP 9 #include <core/data/flexible_type/flexible_type.hpp> 10 #include <core/storage/sgraph_data/sgraph_constants.hpp> 11 #include <core/storage/sframe_data/sframe.hpp> 12 #include <sparsehash/sparse_hash_map> 77 static const char* DEFAULT_GROUP_NAME;
78 static const char* VID_COLUMN_NAME;
79 static const char* SRC_COLUMN_NAME;
80 static const char* DST_COLUMN_NAME;
85 typedef std::map<std::string, flexible_type> options_map_t;
88 enum class edge_direction {
89 IN_EDGE = 1, OUT_EDGE = 2, ANY_EDGE = 3
92 struct vertex_partition_address {
93 size_t group = 0, partition = 0;
94 vertex_partition_address() =
default;
95 vertex_partition_address(
size_t group,
size_t partition):
96 group(group),partition(partition) { }
98 bool operator==(
const vertex_partition_address& other)
const {
99 return group == other.group && partition == other.partition;
101 bool operator<(
const vertex_partition_address& other)
const {
102 return group < other.group ||
103 (group == other.group && partition < other.partition);
107 class edge_partition_address {
109 size_t src_group = 0, dst_group = 0, partition1 = 0, partition2 = 0;
111 edge_partition_address() =
default;
112 edge_partition_address(
size_t src_group,
size_t dst_group,
113 size_t partition1,
size_t partition2) :
114 src_group(src_group), dst_group(dst_group),
115 partition1(partition1), partition2(partition2) { }
117 vertex_partition_address get_src_vertex_partition() {
118 vertex_partition_address ret;
119 ret.group = src_group; ret.partition = partition1;
123 vertex_partition_address get_dst_vertex_partition() {
124 vertex_partition_address ret;
125 ret.group = dst_group; ret.partition = partition2;
140 const options_map_t& field_constraint = options_map_t(),
141 size_t groupid = 0)
const;
153 const std::vector<flexible_type>& target_vids = {},
154 const options_map_t& field_constraint = options_map_t(),
155 size_t groupa = 0,
size_t groupb = 0)
const;
170 std::vector<std::string>
get_edge_fields(
size_t groupa = 0,
size_t groupb = 0)
const;
183 const std::string& id_field_name,
189 const std::string& id_field_name,
198 const std::string& source_field_name,
199 const std::string& target_field_name,
200 size_t groupa = 0,
size_t groupb = 0);
205 const std::string& source_field_name,
206 const std::string& target_field_name,
207 size_t groupa = 0,
size_t groupb = 0);
220 bool copy_edge_field(
const std::string& field,
const std::string& new_field,
221 size_t groupa = 0,
size_t groupb = 0);
244 bool remove_edge_field(
const std::string& field,
size_t groupa = 0,
size_t groupb = 0);
255 bool select_edge_fields(
const std::vector<std::string>& fields,
size_t groupa = 0,
size_t groupb = 0);
275 ASSERT_LT(groupid, m_num_groups);
276 return m_vertex_groups[groupid];
283 inline const std::vector<sframe>&
vertex_group(
size_t groupid = 0)
const {
284 ASSERT_LT(groupid, m_num_groups);
285 return m_vertex_groups[groupid];
303 ASSERT_LT(groupa, m_num_groups);
304 ASSERT_LT(groupb, m_num_groups);
305 return m_edge_groups.at({groupa, groupb});
313 size_t groupb)
const {
314 ASSERT_LT(groupa, m_num_groups);
315 ASSERT_LT(groupb, m_num_groups);
316 return m_edge_groups.at({groupa, groupb});
331 size_t groupid = 0) {
332 ASSERT_LT(partition, m_num_partitions);
342 size_t groupid = 0)
const {
343 ASSERT_LT(partition, m_num_partitions);
385 ASSERT_LT(partition1, m_num_partitions);
386 ASSERT_LT(partition2, m_num_partitions);
387 return edge_group(groupa, groupb)[partition1 * m_num_partitions + partition2];
398 size_t groupb = 0)
const {
399 ASSERT_LT(partition1, m_num_partitions);
400 ASSERT_LT(partition2, m_num_partitions);
401 return edge_group(groupa, groupb)[partition1 * m_num_partitions + partition2];
417 address.src_group, address.dst_group);
427 address.src_group, address.dst_group);
436 ASSERT_LT(idx, m_vertex_group_names.size());
437 return m_vertex_group_names[idx];
445 auto iter = std::find(m_vertex_group_names.begin(), m_vertex_group_names.end(), name);
446 if (iter != m_vertex_group_names.end())
return std::distance(m_vertex_group_names.begin(), iter);
447 else return (
size_t)(-1);
453 inline size_t num_edges(
size_t groupa,
size_t groupb)
const {
456 for (
auto& sf : egroup) {
465 inline size_t num_edges()
const {
return m_num_edges; };
473 for (
auto& sf : vgroup) {
487 inline bool empty()
const {
return (m_num_vertices == 0) && (m_num_edges == 0);}
493 return m_num_partitions;
503 inline flex_type_enum vertex_id_type()
const {
return m_vid_type; }
542 void swap_vertex_fields(
const std::string& field1,
const std::string& field2);
544 void swap_edge_fields(
const std::string& field1,
const std::string& field2);
546 void rename_vertex_fields(
const std::vector<std::string>& oldnames,
547 const std::vector<std::string>& newnames);
549 void rename_edge_fields(
const std::vector<std::string>& oldnames,
550 const std::vector<std::string>& newnames);
562 std::string column_name,
569 template<
typename T,
typename FLEX_TYPE=T>
571 std::string column_name,
579 std::string column_name,
580 size_t groupa = 0,
size_t groupb = 0);
587 std::string column_name,
594 template<
typename T,
typename FLEX_TYPE=T>
595 bool add_vertex_field(std::vector<std::vector<T>>& column,
596 std::string column_name,
605 std::string column_name,
606 size_t groupa = 0,
size_t groupb = 0);
613 std::vector<std::shared_ptr<sarray<flexible_type>>>
622 std::vector<std::vector<flexible_type>>
629 std::vector<std::shared_ptr<sarray<flexible_type>>>
642 size_t get_edge_field_id(std::string column_name,
size_t groupa = 0,
size_t groupb = 0);
650 void init(
size_t num_partitions);
663 void increase_number_of_groups(
size_t num_groups);
671 void commit_edge_buffer(
size_t groupa,
680 void commit_vertex_buffer(
size_t group,
681 std::vector<sframe>& vertex_buffer);
691 inline size_t get_vertex_partition(
const flexible_type& vid) {
return vid.
hash() % m_num_partitions; }
697 return get_vertex_partition(src) * m_num_partitions + get_vertex_partition(dst);
703 inline std::vector<flexible_type> get_vertex_ids(
size_t partition,
size_t group)
const {
704 std::vector<flexible_type> ret;
707 ret.reserve(id_column->size());
708 copy(*id_column, std::inserter(ret, ret.begin()));
715 static inline bool is_private_field(std::string s) {
716 return s.length() > 2 && s[0] ==
'_' && s[1] ==
'_';
723 std::vector<std::string> m_vertex_group_names;
728 size_t m_num_partitions = 0;
733 size_t m_num_groups = 1;
738 size_t m_num_vertices = 0;
743 size_t m_num_edges = 0;
754 std::vector<std::vector<sframe> > m_vertex_groups;
763 std::map<std::pair<size_t, size_t>, std::vector<sframe> > m_edge_groups;
777 static bool reorder_and_add_new_columns(
sframe& sf,
778 const std::vector<std::string>& column_names,
779 const std::vector<flex_type_enum>& column_types);
800 static bool union_columns(
sframe& sframe_a,
sframe& sframe_b);
802 typedef google::sparse_hash_map<flexible_type, size_t, std::hash<flexible_type> > vid_hash_map_type;
807 std::shared_ptr<vid_hash_map_type> fetch_vid_hash_map(
size_t partition,
size_t group);
812 static inline void init_empty_sframe(
sframe& sf,
813 std::vector<std::string> column_names = {},
814 std::vector<flex_type_enum> column_types = {}) {
825 void fast_validate_add_vertices(
const sframe& vertices,
832 void fast_validate_add_edges(
const sframe& edges,
833 size_t groupa,
size_t groupb);
837 template<
typename T,
typename FLEX_TYPE>
838 bool sgraph::add_vertex_field(std::vector<std::vector<T>>& column,
839 std::string column_name,
843 if (std::count(vfields.begin(), vfields.end(), column_name) != 0) {
848 if (vgroups.size() != column.size()) {
853 auto sa = std::make_shared<sarray<flexible_type>>();
854 sa->open_for_write(1);
855 sa->set_type(column_type);
856 auto writer = sa->get_output_iterator(0);
857 for (
auto& v: column[i])
858 *writer++ = std::move((FLEX_TYPE)(v));
860 vgroups[i] = vgroups[i].add_column(sa, column_name);
865 template<
typename T,
typename FLEX_TYPE>
867 std::string column_name,
871 if (std::count(vfields.begin(), vfields.end(), column_name) == 0) {
876 if (vgroups.size() != column.size()) {
884 auto sa = std::make_shared<sarray<flexible_type>>();
885 sa->open_for_write(1);
886 sa->set_type(column_type);
887 auto writer = sa->get_output_iterator(0);
888 for (
auto& v: column[i])
889 *writer++ = std::move((FLEX_TYPE)(v));
891 vgroups[i] = vgroups[i].replace_column(sa, column_name);
size_t num_edges(size_t groupa, size_t groupb) const
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
const sframe & vertex_partition(vertex_partition_address part) const
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
sframe group(sframe sframe_in, std::string key_column)
size_t get_num_groups() const
bool init_edge_field(const std::string &field, const flexible_type &init_value, size_t groupa=0, size_t groupb=0)
size_t num_vertices() const
std::string get_vertex_group_name(size_t idx) const
std::vector< sframe > & vertex_group(size_t groupid=0)
bool remove_edge_field(const std::string &field, size_t groupa=0, size_t groupb=0)
bool copy_edge_field(const std::string &field, const std::string &new_field, size_t groupa=0, size_t groupb=0)
const sframe & edge_partition(size_t partition1, size_t partition2, size_t groupa=0, size_t groupb=0) const
size_t get_num_partitions() const
bool copy_vertex_field(const std::string &field, const std::string &new_field, size_t group=0)
bool select_edge_fields(const std::vector< std::string > &fields, size_t groupa=0, size_t groupb=0)
std::shared_ptr< sarray< flexible_type > > select_column(size_t column_id) const
sframe get_edges(const std::vector< flexible_type > &source_vids={}, const std::vector< flexible_type > &target_vids={}, const options_map_t &field_constraint=options_map_t(), size_t groupa=0, size_t groupb=0) const
size_t SGRAPH_DEFAULT_NUM_PARTITIONS
size_t num_vertices(size_t group) const
void open_for_write(const std::vector< std::string > &column_names, const std::vector< flex_type_enum > &column_types, const std::string &frame_sidx_file="", size_t nsegments=SFRAME_DEFAULT_NUM_SEGMENTS, bool fail_on_column_names=true)
size_t get_vertex_field_id(std::string column_name, size_t group=0) const
sframe & edge_partition(size_t partition1, size_t partition2, size_t groupa=0, size_t groupb=0)
bool init_vertex_field(const std::string &field, const flexible_type &init_value, size_t group=0)
const sframe & vertex_partition(size_t partition, size_t groupid=0) const
void load(iarchive &iarc)
const std::vector< sframe > & edge_group(size_t groupa, size_t groupb) const
void save(oarchive &oarc) const
void save_reference(oarchive &oarc) const
sframe & vertex_partition(vertex_partition_address part)
std::vector< std::shared_ptr< sarray< flexible_type > > > fetch_edge_data_field(std::string column_name, size_t groupa=0, size_t groupb=0) const
void copy(Iterator begin, Iterator end, SWriter &&writer)
bool remove_vertex_field(const std::string &field, size_t group=0)
bool select_vertex_fields(const std::vector< std::string > &fields, size_t group=0)
size_t get_vertex_group_id(std::string name) const
bool add_vertices(const dataframe_t &vertices, const std::string &id_field_name, size_t group=0)
const std::vector< sframe > & vertex_group(size_t groupid=0) const
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
bool replace_vertex_field(const std::vector< std::shared_ptr< sarray< flexible_type >>> &column, std::string column_name, size_t group=0)
std::vector< std::vector< flexible_type > > fetch_vertex_data_field_in_memory(std::string column_name, size_t groupid=0) const
std::vector< sframe > & edge_group(size_t groupa=0, size_t groupb=0)
size_t get_edge_field_id(std::string column_name, size_t groupa=0, size_t groupb=0)
sframe get_vertices(const std::vector< flexible_type > &vid_vec={}, const options_map_t &field_constraint=options_map_t(), size_t groupid=0) const
std::vector< std::shared_ptr< sarray< flexible_type > > > fetch_vertex_data_field(std::string column_name, size_t group=0) const
bool replace_edge_field(const std::vector< std::shared_ptr< sarray< flexible_type >>> &column, std::string column_name, size_t groupa=0, size_t groupb=0)
const sframe & edge_partition(edge_partition_address address) const
bool add_edges(const dataframe_t &edges, const std::string &source_field_name, const std::string &target_field_name, size_t groupa=0, size_t groupb=0)
sframe & vertex_partition(size_t partition, size_t groupid=0)
std::vector< std::string > get_vertex_fields(size_t groupid=0) const
sframe & edge_partition(edge_partition_address address)
std::vector< flex_type_enum > get_edge_field_types(size_t groupa=0, size_t groupb=0) const
std::vector< flex_type_enum > get_vertex_field_types(size_t groupid=0) const
std::vector< std::string > get_edge_fields(size_t groupa=0, size_t groupb=0) const