Turi Create  4.0
sgraph_compute_vertex_block.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SGRAPH_SGRAPH_COMPUTE_VERTEX_BLOCK_HPP
7 #define TURI_SGRAPH_SGRAPH_COMPUTE_VERTEX_BLOCK_HPP
8 #include <map>
9 #include <vector>
10 #include <algorithm>
11 #include <core/parallel/mutex.hpp>
12 #include <core/storage/sframe_data/sframe.hpp>
13 #include <core/data/flexible_type/flexible_type.hpp>
14 namespace turi {
15 
16 
17 /**
18  * \ingroup sgraph_physical
19  * \addtogroup sgraph_compute SGraph Compute
20  * \{
21  */
22 
23 /**
24  * Graph Computation Functions
25  */
26 namespace sgraph_compute {
27 
28 /**
29  * Represents a partition of vertices which is held in memory.
30  */
31 template <typename SIterableType>
32 class vertex_block {
33  public:
34 
35  /**
36  * Loads an SFrame/SArray into memory (accessible directly via m_vertices)
37  * if not already loaded.
38  */
39  void load_if_not_loaded(const SIterableType& sf) {
40  if (!m_loaded) {
41  load_impl(sf);
42  m_loaded = true;
43  }
44  }
45 
46  /**
47  * Loads an SFrame/SArray into memory (accessible directly via m_vertices)
48  * reloading it if it has already been loaded.
49  */
50  void load(const SIterableType& sf) {
51  load_impl(sf);
52  m_loaded = true;
53  }
54 
55  void flush(SIterableType& outputsf) {
56  std::copy(m_vertices.begin(), m_vertices.end(),
57  outputsf.get_output_iterator(0));
58  outputsf.close();
59  }
60 
61  void flush(SIterableType& outputsf, const std::vector<size_t>& mutated_field_index) {
62  auto out = outputsf.get_output_iterator(0);
63  std::vector<flexible_type> temp(mutated_field_index.size());
64  for (const auto& value: m_vertices) {
65  for (size_t i = 0; i < mutated_field_index.size(); ++i) {
66  temp[i] = value[mutated_field_index[i]];
67  }
68  *out = temp;
69  ++out;
70  }
71  outputsf.close();
72  }
73 
74  /**
75  * Unloads the loaded data, releasing all memory used.
76  */
77  void unload() {
78  m_loaded = false;
79  m_vertices.clear();
80  m_vertices.shrink_to_fit();
81  if (is_modified()) {
82  m_reader.reset();
83  }
85  }
86 
87  /**
88  * Returns true if the SFrame is loaded. False otherwise.
89  */
90  bool is_loaded() {
91  return m_loaded;
92  }
93 
94  /**
95  * Returns true if the SFrame is modified. False otherwise.
96  */
97  bool is_modified() {
98  return m_modified;
99  }
100 
101  /**
102  * Sets the modified flag
103  */
105  m_modified = true;
106  }
107 
108  /**
109  * Clears the modified flag
110  */
112  m_modified = false;
113  }
114 
115  typename SIterableType::value_type& operator[](size_t i) {
116  return m_vertices[i];
117  }
118 
119  const typename SIterableType::value_type& operator[](size_t i) const {
120  return m_vertices[i];
121  }
122  /// The loaded data
123  std::vector<typename SIterableType::value_type> m_vertices;
124 
125  private:
126  /// Internal load implementation
127  void load_impl(const SIterableType& sf) {
128  if (m_last_index_file != sf.get_index_file() || !m_reader) {
129  m_last_index_file = sf.get_index_file();
130  m_reader = std::move(sf.get_reader());
131  }
132  m_vertices.reserve(sf.size());
133  m_reader->read_rows(0, m_reader->size(), m_vertices);
134  }
135 
136  /// Flag denoting if the data has been loaded
137  bool m_loaded = false;
138  /// Flag denoting modification
139  bool m_modified = false;
140  // cache the reader
141  std::string m_last_index_file;
142  std::unique_ptr<typename SIterableType::reader_type> m_reader;
143 };
144 
145 
146 } // sgraph_compute
147 
148 /// \}
149 } // turicreate
150 #endif
void load_if_not_loaded(const SIterableType &sf)
void copy(const std::string src, const std::string dest)
std::vector< typename SIterableType::value_type > m_vertices
The loaded data.