7 #ifndef TURI_S3_FILESYS_HPP_ 8 #define TURI_S3_FILESYS_HPP_ 10 #ifndef TC_DISABLE_REMOTEFS 12 #include <aws/core/Aws.h> 13 #include <aws/s3/S3Client.h> 14 #include <aws/s3/model/CreateMultipartUploadRequest.h> 16 #include <core/storage/fileio/s3_api.hpp> 45 struct ScopedAwsInitAPI {
46 ScopedAwsInitAPI(
const Aws::SDKOptions &options) : options_(options) {
47 Aws::InitAPI(options_);
49 ~ScopedAwsInitAPI() { Aws::ShutdownAPI(options_); }
50 Aws::SDKOptions options_;
53 const ScopedAwsInitAPI &turi_global_AWS_SDK_setup(
54 const Aws::SDKOptions &options = Aws::SDKOptions());
64 virtual size_t Read(
void *ptr,
size_t size) = 0;
70 virtual void Write(
const void *ptr,
size_t size) = 0;
75 virtual void Close() = 0;
78 virtual ~Stream(
void) {}
87 virtual void Seek(
size_t pos) = 0;
90 virtual size_t Tell(
void) = 0;
93 virtual size_t FileSize(
void)
const = 0;
96 virtual bool AtEnd(
void)
const = 0;
111 virtual void Close() {
116 virtual size_t Tell(
void) {
return curr_bytes_; }
118 virtual size_t FileSize()
const {
return file_size_; }
120 virtual bool AtEnd(
void)
const {
return curr_bytes_ == file_size_; }
122 virtual void Write(
const void *ptr,
size_t size) {
123 std_log_and_throw(std::runtime_error,
124 "AWSReadStreamBase is not supposed to write");
127 virtual void Seek(
size_t pos) {
132 virtual size_t Read(
void *ptr,
size_t size);
141 virtual void InitRequest(
size_t begin_bytes,
s3url &url) = 0;
145 size_t file_size_ = 0;
153 void Reset(
size_t begin_bytes);
162 int FillBuffer(
char *buf,
size_t want_bytes);
171 file_size_ = file_size;
185 constexpr
size_t S3_MIN_MULTIPART_SIZE = 5 * 1024 * 1024;
187 class WriteStream :
public Stream {
189 WriteStream(
const s3url &url,
bool no_exception =
false)
190 : url_(url), no_exception_(no_exception) {
191 const char *buz = getenv(
"TURI_S3_WRITE_BUFFER_MB");
192 if (buz !=
nullptr) {
193 max_buffer_size_ = std::max(static_cast<size_t>(atol(buz)) << 20UL,
194 S3_MIN_MULTIPART_SIZE);
197 const size_t kDefaultBufferSize = 64 << 20UL;
198 max_buffer_size_ = kDefaultBufferSize;
204 virtual size_t Read(
void *ptr,
size_t size) {
205 if (!no_exception_) {
206 std_log_and_throw(std::runtime_error,
207 "S3.WriteStream cannot be used for read");
212 virtual void Write(
const void *ptr,
size_t size);
214 virtual ~WriteStream() {
216 no_exception_ =
true;
222 virtual void Close() {
231 virtual void InitRequest(
s3url &url) { InitMultipart(url); }
235 size_t max_buffer_size_;
239 bool no_exception_ =
false;
247 std::string upload_id_;
249 Aws::S3::S3Client s3_client_;
252 std::vector<Aws::S3::Model::UploadPartOutcomeCallable> completed_parts_;
254 bool closed_ =
false;
256 void InitMultipart(
const s3url &url) {
260 Aws::S3::Model::CreateMultipartUploadRequest create_request;
261 create_request.SetBucket(
262 Aws::String(url.bucket.c_str(), url.bucket.length()));
263 create_request.SetKey(
264 Aws::String(url.object_name.c_str(), url.object_name.length()));
267 auto createMultipartUploadOutcome =
268 s3_client_.CreateMultipartUpload(create_request);
270 if (!createMultipartUploadOutcome.IsSuccess()) {
271 auto error = createMultipartUploadOutcome.GetError();
272 std::stringstream ss;
273 ss << error.GetExceptionName() <<
": " << error.GetMessage() << std::endl;
275 log_and_throw_io_failure(ss.str());
278 upload_id_ = createMultipartUploadOutcome.GetResult().GetUploadId().c_str();
288 void Upload(
bool force_upload =
false);
298 S3FileSystem(
const s3url &url) : url_(url) {}
300 virtual ~S3FileSystem() {}
309 static void ListObjects(
const s3url &path, std::vector<FileInfo> &out_list);
316 virtual void ListDirectory(
const s3url &path,
317 std::vector<FileInfo> &out_list);
328 virtual std::shared_ptr<Stream> Open(
const s3url &path,
329 const char *
const flag);
337 virtual std::shared_ptr<SeekStream> OpenForRead(
const s3url &path,
338 bool no_exception =
true);
s3url path
full path to the file
virtual size_t Tell(void)
tell the position of the stream
FileInfo()
default constructor
reader stream that should be used to read from AWS SDK there's no buffer in this implementation. Every read will fetch packets through network. So combine this with read_caching_device
virtual size_t FileSize() const
virtual void Seek(size_t pos)
seek to certain position of the file
size_t size
the size of the file
virtual bool AtEnd(void) const
Returns true if at end of stream.
reader stream that can be used to read
#define ASSERT_TRUE(cond)
virtual void InitRequest(size_t begin_bytes, s3url &url)
initialize the ecurl request,
interface of i/o stream that support seek
Aws::S3::S3Client init_aws_sdk_with_turi_env(s3url &parsed_url)
FileType type
the type of the file
use to store file information