Turi Create  4.0
read_caching_device.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_FILEIO_CACHING_DEVICE_HPP
7 #define TURI_FILEIO_CACHING_DEVICE_HPP
9 #include <core/storage/fileio/block_cache.hpp>
10 #include <core/storage/fileio/sanitize_url.hpp>
11 #include <core/parallel/mutex.hpp>
12 #include <core/util/basic_types.hpp>
13 #include <mutex>
14 #include <map>
15 namespace turi {
16 
17 // private namespace
18 namespace {
19 const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB
20 } // end private namespace
21 
22 /**
23  * \ingroup fileio
24  * Can be wrapped around any device implement to provide read caching. This
25  * should be used only when the filesystem we are accessing is rather remote.
26  * It uses the \ref block_cache to cache large blocks on the cache:// file
27  * system.
28  *
29  * Before:
30  * \code
31  * typedef boost::iostreams::stream<s3_device> s3_fstream;
32  * \endcode
33  *
34  * After:
35  * \code
36  * typedef boost::iostreams::stream<read_caching_device<s3_device> > s3_fstream;
37  * \endcode
38  *
39  * It uses the \ref block_cache to pro
40  */
41 template <typename T>
43  public: // boost iostream concepts
44  typedef typename T::char_type char_type;
45  typedef typename T::category category;
46 
48 
49  read_caching_device(const std::string& filename, const bool write = false) {
50  logstream(LOG_DEBUG) << "read_cachine_device: " << filename << std::endl;
51  m_filename = filename;
52  if (write == false) {
53  // check the filesize cache for the filesize so we don't poke s3 again
54  // even if all the data we care about are in a cache
55  std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
56  auto iter = m_filename_to_filesize_map.find(filename);
57  if (iter != m_filename_to_filesize_map.end()) {
58  m_file_size = iter->second;
59  } else {
60  m_contents = std::make_shared<T>(filename, write);
61  m_file_size = m_contents->file_size();
62  m_filename_to_filesize_map[filename] = m_file_size;
63  }
64  } else {
65  m_contents = std::make_shared<T>(filename, write);
66  }
67 
68  m_writing = write;
69  }
70 
71  // Because the device has bidirectional tag, close will be called
72  // twice, one with the std::ios_base::in, followed by out.
73  // Only close the file when the close tag matches the actual file type.
74  void close(std::ios_base::openmode mode = std::ios_base::openmode()) {
75  if (mode == std::ios_base::out && m_writing) {
76  if (m_contents) m_contents->close(mode);
77  m_contents.reset();
78  // evict all blocks for this key
79  auto& bc = block_cache::get_instance();
80  size_t block_number = 0;
81  while(1) {
82  std::string key = get_key_name(block_number);
83  if (bc.evict_key(key) == false) break;
84  ++ block_number;
85  }
86  // evict the file size cache
87  {
88  std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
89  m_filename_to_filesize_map.erase(m_filename);
90  }
91 
92  } else if (mode == std::ios_base::in && !m_writing) {
93  if (m_contents) m_contents->close(mode);
94  m_contents.reset();
95  }
96  }
97 
98  /** the optimal buffer size is 0. */
99  inline std::streamsize optimal_buffer_size() const { return 0; }
100 
101  std::streamsize read(char* strm_ptr, std::streamsize n) {
102  // there is an upper limit of how many bytes we can read
103  // based on the file size
104  n = std::min<std::streamsize>(n, m_file_size - m_file_pos);
105 
106  // suspicious
107  if (n < 0) {
108  logstream(LOG_DEBUG) << "read size is " << n << "; file size is "
109  << m_file_size << std::endl;
110  }
111 
112  std::streamsize ret = 0;
113 
114  while(n > 0) {
115  // the block number containing the offset.
116  auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE;
117  // the offset inside the block
118  auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE;
119  // number of bytes I can read inside this block before I hit the next block
120  size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos;
121  n_bytes = std::min<size_t>(n_bytes, n);
122  bool success = fetch_block(strm_ptr + ret,
123  block_number,
124  block_offset,
125  n_bytes);
126  if (success == false) {
127  log_and_throw(std::string("Unable to read ") + m_filename);
128  }
129  n -= n_bytes;
130  ret += n_bytes;
131  // advance the file position
132  m_file_pos += n_bytes;
133  }
134  return ret;
135  }
136 
137  std::streamsize write(const char* strm_ptr, std::streamsize n) {
138  return get_contents()->write(strm_ptr, n);
139  }
140 
141  bool good() const {
142  return get_contents()->good();
143  }
144 
145  /**
146  * Seeks to a different location.
147  */
148  std::streampos seek(std::streamoff off,
149  std::ios_base::seekdir way,
150  std::ios_base::openmode openmode) {
151  if (openmode == std::ios_base::in) {
152  if (way == std::ios_base::beg) {
153  m_file_pos = std::min<std::streamoff>(off, m_file_size);
154  } else if (way == std::ios_base::cur) {
155  m_file_pos = std::min<std::streamoff>(m_file_pos + off, m_file_size);
156  m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
157  } else if (way == std::ios_base::end) {
158  m_file_pos = std::min<std::streamoff>(m_file_size + off - 1, m_file_size);
159  m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
160  }
161  return m_file_pos;
162  } else {
163  return get_contents()->seek(off, way, openmode);
164  }
165  }
166 
167  /**
168  * Returns the file size of the opened file.
169  * Returns (size_t)(-1) if there is no file opened, or if there is an
170  * error obtaining the file size.
171  */
172  size_t file_size() const {
173  return m_file_size;
174  }
175 
176  /// Not supported
177  std::shared_ptr<std::istream> get_underlying_stream() {
178  return nullptr;
179  }
180 
181  private:
182  std::string m_filename;
183  std::shared_ptr<T> m_contents;
184  size_t m_file_size = 0;
185  std::streamoff m_file_pos = 0;
186  bool m_writing = true;
187 
188  static mutex m_filesize_cache_mutex;
189  static std::map<std::string, size_t> m_filename_to_filesize_map;
190 
191  std::shared_ptr<T>& get_contents() {
192  if (!m_contents) {
193  m_contents = std::make_shared<T>(m_filename, m_writing);
194  }
195  return m_contents;
196  }
197  std::string get_key_name(size_t block_number) {
198  // we generate a key name that will never appear in any filename
199  return m_filename + "////:" + std::to_string(block_number);
200  }
201  /**
202  * Fetches the contents of a block.
203  * Returns true on success and false on failure.
204  */
205  bool fetch_block(char* output,
206  size_t block_number,
207  size_t startpos,
208  size_t length) {
209  auto& bc = block_cache::get_instance();
210  std::string key = get_key_name(block_number);
211  int64_t ret = bc.read(key, output, startpos, startpos + length);
212  if (static_cast<size_t>(ret) == length) return true;
213 
214  logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " << block_number << std::endl;
215  // ok. failure... no such block or block is bad. We read it ourselves.
216  // read the whole block
217  auto block_start = block_number * READ_CACHING_BLOCK_SIZE;
218  auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size);
219  // seek to the block and read the whole block at once
220  auto& contents = get_contents();
221  contents->seek(block_start, std::ios_base::beg, std::ios_base::in);
222  std::string block_contents(block_end - block_start, 0);
223  auto bytes_read = contents->read(&(block_contents[0]),
224  block_end - block_start);
225  // read failed.
226  static_assert(std::is_signed<decltype(bytes_read)>::value, "decltype(bytes_read) signed");
227  static_assert(std::is_integral<decltype(bytes_read)>::value, "decltype(bytes_read) integral");
228  if (bytes_read < truncate_check<int64_t>(block_end - block_start)) {
229  return false;
230  }
231 
232  // write the block
233  bool write_block_ok = bc.write(key, block_contents);
234  if (write_block_ok == false) {
235  logstream(LOG_ERROR) << "Unable to write block " << key << std::endl;
236  // still ok. we can continue. but too many of these are bad.
237  }
238  // since we just read the block, lets fill the output
239  const char* src = block_contents.c_str();
240  memcpy(output, src + startpos, length);
241  return true;
242  }
243 
244 }; // end of read_caching_device
245 
246 template<typename T>
248 
249 template<typename T>
250 std::map<std::string, size_t> read_caching_device<T>::m_filename_to_filesize_map;
251 } // namespace turi
252 #endif
#define logstream(lvl)
Definition: logger.hpp:276
std::shared_ptr< std::istream > get_underlying_stream()
Not supported.
#define LOG_INFO
Definition: logger.hpp:101
#define LOG_DEBUG
Definition: logger.hpp:102
#define LOG_ERROR
Definition: logger.hpp:97
static block_cache & get_instance()
std::streampos seek(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode openmode)
std::streamsize optimal_buffer_size() const
std::string sanitize_url(std::string url)