Turi Create  4.0
s3_filesys.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at
5  * https://opensource.org/licenses/BSD-3-Clause
6  */
7 #ifndef TURI_S3_FILESYS_HPP_
8 #define TURI_S3_FILESYS_HPP_
9 
10 #ifndef TC_DISABLE_REMOTEFS
11 
12 #include <aws/core/Aws.h>
13 #include <aws/s3/S3Client.h>
14 #include <aws/s3/model/CreateMultipartUploadRequest.h>
15 
16 #include <core/storage/fileio/s3_api.hpp>
17 #include <string>
18 #include <vector>
19 
20 namespace turi {
21 namespace fileio {
22 
23 namespace s3 {
24 
25 /*! \brief type of file */
26 enum FileType {
27  /*! \brief the file is file */
28  kFile,
29  /*! \brief the file is directory */
30  kDirectory
31 };
32 
33 /*! \brief use to store file information */
34 struct FileInfo {
35  /*! \brief full path to the file */
37  /*! \brief the size of the file */
38  size_t size;
39  /*! \brief the type of the file */
40  FileType type;
41  /*! \brief default constructor */
42  FileInfo() : size(0), type(kFile) {}
43 };
44 
45 struct ScopedAwsInitAPI {
46  ScopedAwsInitAPI(const Aws::SDKOptions &options) : options_(options) {
47  Aws::InitAPI(options_);
48  }
49  ~ScopedAwsInitAPI() { Aws::ShutdownAPI(options_); }
50  Aws::SDKOptions options_;
51 };
52 
53 const ScopedAwsInitAPI &turi_global_AWS_SDK_setup(
54  const Aws::SDKOptions &options = Aws::SDKOptions());
55 
56 class Stream {
57  public:
58  /*!
59  * \brief reads data from a stream
60  * \param ptr pointer to a memory buffer
61  * \param size block size
62  * \return the size of data read
63  */
64  virtual size_t Read(void *ptr, size_t size) = 0;
65  /*!
66  * \brief writes data to a stream
67  * \param ptr pointer to a memory buffer
68  * \param size block size
69  */
70  virtual void Write(const void *ptr, size_t size) = 0;
71 
72  /*!
73  * \brief closes the stream
74  */
75  virtual void Close() = 0;
76 
77  /*! \brief virtual destructor */
78  virtual ~Stream(void) {}
79 };
80 
81 /*! \brief interface of i/o stream that support seek */
82 class SeekStream : public Stream {
83  public:
84  // virtual destructor
85  virtual ~SeekStream(void) {}
86  /*! \brief seek to certain position of the file */
87  virtual void Seek(size_t pos) = 0;
88 
89  /*! \brief tell the position of the stream */
90  virtual size_t Tell(void) = 0;
91 
92  /*! tell the physical size of the stream */
93  virtual size_t FileSize(void) const = 0;
94 
95  /*! \brief Returns true if at end of stream */
96  virtual bool AtEnd(void) const = 0;
97 };
98 
99 /*!
100  * \brief reader stream that should be used to read from AWS SDK
101  * there's no buffer in this implementation. Every read will fetch
102  * packets through network. So combine this with read_caching_device
103  */
105  public:
106  virtual ~AWSReadStreamBase() {
107  logstream(LOG_DEBUG) << "~AWSReadStream" << std::endl;
108  Close();
109  }
110 
111  virtual void Close() {
112  logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl;
113  Reset(file_size_);
114  }
115 
116  virtual size_t Tell(void) { return curr_bytes_; }
117 
118  virtual size_t FileSize() const { return file_size_; }
119 
120  virtual bool AtEnd(void) const { return curr_bytes_ == file_size_; }
121 
122  virtual void Write(const void *ptr, size_t size) {
123  std_log_and_throw(std::runtime_error,
124  "AWSReadStreamBase is not supposed to write");
125  }
126  // lazy seek function
127  virtual void Seek(size_t pos) {
128  ASSERT_TRUE(pos < file_size_);
129  Reset(pos);
130  }
131 
132  virtual size_t Read(void *ptr, size_t size);
133 
134  protected:
135  AWSReadStreamBase() : file_size_(0), curr_bytes_(0) {}
136 
137  /*!
138  * \brief initialize the ecurl request,
139  * \param begin_bytes the beginning bytes of the stream
140  */
141  virtual void InitRequest(size_t begin_bytes, s3url &url) = 0;
142 
143  protected:
144  // the total size of the file
145  size_t file_size_ = 0;
146  s3url url_;
147 
148  private:
149  /*!
150  * \brief called by child class to initialize read
151  * \param begin_bytes the beginning bytes of the stream
152  */
153  void Reset(size_t begin_bytes);
154 
155  /*!
156  * \brief try to fill the buffer with at least wanted bytes
157  * \param buf the output stream pointer provided by client
158  * \param want_bytes number of bytes we want to fill
159  * \return number of remainning running curl handles
160  */
161 
162  int FillBuffer(char *buf, size_t want_bytes);
163  // current position in the stream
164  size_t curr_bytes_;
165 };
166 
167 /*! \brief reader stream that can be used to read */
169  public:
170  ReadStream(const s3url &url, size_t file_size) {
171  file_size_ = file_size;
172  url_ = url;
173  }
174 
175  virtual ~ReadStream() {}
176 
177  protected:
178  // implement InitRequest
179  virtual void InitRequest(size_t begin_bytes, s3url &url) {
180  Seek(begin_bytes);
181  url_ = url;
182  }
183 };
184 
185 constexpr size_t S3_MIN_MULTIPART_SIZE = 5 * 1024 * 1024; // 5MB
186 
187 class WriteStream : public Stream {
188  public:
189  WriteStream(const s3url &url, bool no_exception = false)
190  : url_(url), no_exception_(no_exception) {
191  const char *buz = getenv("TURI_S3_WRITE_BUFFER_MB");
192  if (buz != nullptr) {
193  max_buffer_size_ = std::max(static_cast<size_t>(atol(buz)) << 20UL,
194  S3_MIN_MULTIPART_SIZE);
195  } else {
196  // 64 MB
197  const size_t kDefaultBufferSize = 64 << 20UL;
198  max_buffer_size_ = kDefaultBufferSize;
199  }
200 
201  InitRequest(url_);
202  }
203 
204  virtual size_t Read(void *ptr, size_t size) {
205  if (!no_exception_) {
206  std_log_and_throw(std::runtime_error,
207  "S3.WriteStream cannot be used for read");
208  }
209  return 0;
210  }
211 
212  virtual void Write(const void *ptr, size_t size);
213  // destructor
214  virtual ~WriteStream() {
215  if (!closed_) {
216  no_exception_ = true;
217  Upload(true);
218  Finish();
219  }
220  }
221 
222  virtual void Close() {
223  if (!closed_) {
224  closed_ = true;
225  Upload(true);
226  Finish();
227  }
228  }
229 
230  protected:
231  virtual void InitRequest(s3url &url) { InitMultipart(url); }
232 
233  private:
234  // internal maximum buffer size
235  size_t max_buffer_size_;
236  // path we are reading
237  s3url url_;
238 
239  bool no_exception_ = false;
240  /*
241  * write data buffer
242  * Aws multipart upload api requries each part to be
243  * larger than 5MB, except for the last part.
244  **/
245  std::string buffer_;
246 
247  std::string upload_id_;
248 
249  Aws::S3::S3Client s3_client_;
250 
251  // UploadPartOutcomeCallable is fucture<UploadPartOutcome>
252  std::vector<Aws::S3::Model::UploadPartOutcomeCallable> completed_parts_;
253 
254  bool closed_ = false;
255 
256  void InitMultipart(const s3url &url) {
257  url_ = url;
258  s3_client_ = init_aws_sdk_with_turi_env(url_);
259 
260  Aws::S3::Model::CreateMultipartUploadRequest create_request;
261  create_request.SetBucket(
262  Aws::String(url.bucket.c_str(), url.bucket.length()));
263  create_request.SetKey(
264  Aws::String(url.object_name.c_str(), url.object_name.length()));
265  // create_request.SetContentType("text/plain");
266 
267  auto createMultipartUploadOutcome =
268  s3_client_.CreateMultipartUpload(create_request);
269 
270  if (!createMultipartUploadOutcome.IsSuccess()) {
271  auto error = createMultipartUploadOutcome.GetError();
272  std::stringstream ss;
273  ss << error.GetExceptionName() << ": " << error.GetMessage() << std::endl;
274  logstream(LOG_ERROR) << ss.str() << std::endl;
275  log_and_throw_io_failure(ss.str());
276  }
277 
278  upload_id_ = createMultipartUploadOutcome.GetResult().GetUploadId().c_str();
279  }
280 
281  /*!
282  * \brief upload the buffer to S3, store the etag
283  * clear the buffer.
284  *
285  * ONLY use this when you finish writing. Aws multipart api
286  * only allows the last part to be less than 5MB.
287  */
288  void Upload(bool force_upload = false);
289 
290  /*!
291  * \brief commit the upload and finish the session
292  */
293  void Finish(void);
294 };
295 
296 class S3FileSystem {
297  public:
298  S3FileSystem(const s3url &url) : url_(url) {}
299 
300  virtual ~S3FileSystem() {}
301 
302  /*!
303  * \brief get information about a path
304  * \param path the path to the file
305  * \return the information about the file
306  */
307  virtual FileInfo GetPathInfo(const s3url &path);
308 
309  static void ListObjects(const s3url &path, std::vector<FileInfo> &out_list);
310 
311  /*!
312  * \brief list files in a directory
313  * \param path to the file
314  * \param out_list the output information about the files
315  */
316  virtual void ListDirectory(const s3url &path,
317  std::vector<FileInfo> &out_list);
318 
319  /*!
320  * \brief open a stream, will report error and exit if bad thing happens
321  * NOTE: the Stream can continue to work even when filesystem was destructed
322  * \param path path to file
323  * \param uri the uri of the input
324  * \param flag can be "w", "r", "a"
325  * \return the created stream, can be NULL when allow_null == true and file
326  * do not exist
327  */
328  virtual std::shared_ptr<Stream> Open(const s3url &path,
329  const char *const flag);
330 
331  /*!
332  * \brief open a seekable stream for read
333  * \param path the path to the file
334  * \return the created stream, can be NULL if no_exception is true to
335  * indicate a failure to open the file.
336  */
337  virtual std::shared_ptr<SeekStream> OpenForRead(const s3url &path,
338  bool no_exception = true);
339 
340  protected:
341  s3url url_;
342 
343  private:
344  /*!
345  * \brief try to get information about a path
346  * \param path the path to the file
347  * \param out_info holds the path info
348  * \return return false when path do not exist
349  */
350  bool TryGetPathInfo(const s3url &path, FileInfo &info);
351 };
352 
353 } // namespace s3
354 } // namespace fileio
355 } // namespace turi
356 
357 #endif
358 
359 #endif
#define logstream(lvl)
Definition: logger.hpp:276
s3url path
full path to the file
Definition: s3_filesys.hpp:36
virtual size_t Tell(void)
tell the position of the stream
Definition: s3_filesys.hpp:116
FileInfo()
default constructor
Definition: s3_filesys.hpp:42
reader stream that should be used to read from AWS SDK there&#39;s no buffer in this implementation. Every read will fetch packets through network. So combine this with read_caching_device
Definition: s3_filesys.hpp:104
virtual size_t FileSize() const
Definition: s3_filesys.hpp:118
virtual void Seek(size_t pos)
seek to certain position of the file
Definition: s3_filesys.hpp:127
size_t size
the size of the file
Definition: s3_filesys.hpp:38
#define LOG_DEBUG
Definition: logger.hpp:102
virtual bool AtEnd(void) const
Returns true if at end of stream.
Definition: s3_filesys.hpp:120
#define LOG_ERROR
Definition: logger.hpp:97
reader stream that can be used to read
Definition: s3_filesys.hpp:168
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
virtual void InitRequest(size_t begin_bytes, s3url &url)
initialize the ecurl request,
Definition: s3_filesys.hpp:179
interface of i/o stream that support seek
Definition: s3_filesys.hpp:82
Aws::S3::S3Client init_aws_sdk_with_turi_env(s3url &parsed_url)
FileType type
the type of the file
Definition: s3_filesys.hpp:40
use to store file information
Definition: s3_filesys.hpp:34