6 #ifndef TURI_FILEIO_CACHING_DEVICE_HPP 7 #define TURI_FILEIO_CACHING_DEVICE_HPP 9 #include <core/storage/fileio/block_cache.hpp> 10 #include <core/storage/fileio/sanitize_url.hpp> 11 #include <core/parallel/mutex.hpp> 12 #include <core/util/basic_types.hpp> 19 const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024;
44 typedef typename T::char_type char_type;
45 typedef typename T::category category;
51 m_filename = filename;
55 std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
56 auto iter = m_filename_to_filesize_map.find(filename);
57 if (iter != m_filename_to_filesize_map.end()) {
58 m_file_size = iter->second;
60 m_contents = std::make_shared<T>(filename, write);
61 m_file_size = m_contents->file_size();
62 m_filename_to_filesize_map[filename] = m_file_size;
65 m_contents = std::make_shared<T>(filename, write);
74 void close(std::ios_base::openmode mode = std::ios_base::openmode()) {
75 if (mode == std::ios_base::out && m_writing) {
76 if (m_contents) m_contents->close(mode);
80 size_t block_number = 0;
82 std::string key = get_key_name(block_number);
83 if (bc.evict_key(key) ==
false)
break;
88 std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
89 m_filename_to_filesize_map.erase(m_filename);
92 }
else if (mode == std::ios_base::in && !m_writing) {
93 if (m_contents) m_contents->close(mode);
101 std::streamsize read(
char* strm_ptr, std::streamsize n) {
104 n = std::min<std::streamsize>(n, m_file_size - m_file_pos);
109 << m_file_size << std::endl;
112 std::streamsize ret = 0;
116 auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE;
118 auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE;
120 size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos;
121 n_bytes = std::min<size_t>(n_bytes, n);
122 bool success = fetch_block(strm_ptr + ret,
126 if (success ==
false) {
127 log_and_throw(std::string(
"Unable to read ") + m_filename);
132 m_file_pos += n_bytes;
137 std::streamsize write(
const char* strm_ptr, std::streamsize n) {
138 return get_contents()->write(strm_ptr, n);
142 return get_contents()->good();
148 std::streampos
seek(std::streamoff off,
149 std::ios_base::seekdir way,
150 std::ios_base::openmode openmode) {
151 if (openmode == std::ios_base::in) {
152 if (way == std::ios_base::beg) {
153 m_file_pos = std::min<std::streamoff>(off, m_file_size);
154 }
else if (way == std::ios_base::cur) {
155 m_file_pos = std::min<std::streamoff>(m_file_pos + off, m_file_size);
156 m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
157 }
else if (way == std::ios_base::end) {
158 m_file_pos = std::min<std::streamoff>(m_file_size + off - 1, m_file_size);
159 m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
163 return get_contents()->seek(off, way, openmode);
182 std::string m_filename;
183 std::shared_ptr<T> m_contents;
184 size_t m_file_size = 0;
185 std::streamoff m_file_pos = 0;
186 bool m_writing =
true;
188 static mutex m_filesize_cache_mutex;
189 static std::map<std::string, size_t> m_filename_to_filesize_map;
191 std::shared_ptr<T>& get_contents() {
193 m_contents = std::make_shared<T>(m_filename, m_writing);
197 std::string get_key_name(
size_t block_number) {
199 return m_filename +
"////:" + std::to_string(block_number);
205 bool fetch_block(
char* output,
210 std::string key = get_key_name(block_number);
211 int64_t ret = bc.read(key, output, startpos, startpos + length);
212 if (static_cast<size_t>(ret) == length)
return true;
217 auto block_start = block_number * READ_CACHING_BLOCK_SIZE;
218 auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size);
220 auto& contents = get_contents();
221 contents->seek(block_start, std::ios_base::beg, std::ios_base::in);
222 std::string block_contents(block_end - block_start, 0);
223 auto bytes_read = contents->read(&(block_contents[0]),
224 block_end - block_start);
226 static_assert(std::is_signed<decltype(bytes_read)>::value,
"decltype(bytes_read) signed");
227 static_assert(std::is_integral<decltype(bytes_read)>::value,
"decltype(bytes_read) integral");
228 if (bytes_read < truncate_check<int64_t>(block_end - block_start)) {
233 bool write_block_ok = bc.write(key, block_contents);
234 if (write_block_ok ==
false) {
239 const char* src = block_contents.c_str();
240 memcpy(output, src + startpos, length);
std::shared_ptr< std::istream > get_underlying_stream()
Not supported.
static block_cache & get_instance()
std::streampos seek(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode openmode)
std::streamsize optimal_buffer_size() const
std::string sanitize_url(std::string url)