Turi Create  4.0
general_fstream_source.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef FILEIO_GENERAL_FSTREAM_SOURCE_HPP
7 #define FILEIO_GENERAL_FSTREAM_SOURCE_HPP
8 #include <memory>
9 #include <fstream>
10 #include <boost/iostreams/stream.hpp>
11 #include <core/storage/fileio/union_fstream.hpp>
12 #include <core/storage/fileio/fileio_constants.hpp>
13 #include <boost/iostreams/filter/gzip.hpp>
14 namespace turi {
15 namespace fileio_impl {
16 
17 /**
18  * \ingroup fileio
19  * \internal
20  * Implements a general file stream source device which wraps the
21  * union_fstream, and provides automatic gzip decompression capabilities.
22  *
23  * The general_fstream_souce is NOT thread-safe.
24  */
26 
27  /// The source device must be copyable; thus the shared_ptr.
28  std::shared_ptr<union_fstream> in_file;
29  /// The source device must be copyable; thus the shared_ptr.
30  std::shared_ptr<boost::iostreams::gzip_decompressor> decompressor;
31 
32  /// The underlying stream inside the in_file (std stream or hdfs stream)
33  std::shared_ptr<std::istream> underlying_stream;
34 
35  /// Set by the constructor. whether it is gzip compressed.
36  bool is_gzip_compressed = false;
37  public:
38  typedef char char_type;
39  struct category: public boost::iostreams::device_tag,
40  boost::iostreams::closable_tag,
41  boost::iostreams::multichar_tag,
42  boost::iostreams::input_seekable,
43  boost::iostreams::optimally_buffered_tag {};
44 
45  /**
46  * Constructs a fstream source which opens a file. This file can be
47  * of any protocol supported by the union_fstream, and may also be
48  * gzip compressed. Gzip compression detection is automatic based on the file
49  * extension. (Whether it ends in .gz)
50  */
51  explicit general_fstream_source(std::string file);
52 
53  /**
54  * Constructs a fstream source which opens a file. This file can be
55  * of any protocol supported by the union_fstream, and may also be
56  * gzip compressed. Gzip compression detection is not performed, and the
57  * gzip_compressed flag is used to enable/disable gzip decompression.
58  */
59  general_fstream_source(std::string file, bool gzip_compressed);
60 
61  /**
62  * Default copy constructor. copied object shares handles with the original
63  * object. Required because boost streams requires devices to be copyable.
64  * This should really not be used otherwise.
65  */
67 
68  /**
69  * Default move constructor
70  */
72 
73 
74  /**
75  * destructor. If all copies of this object is closed,
76  * closes the file.
77  */
79 
80  inline std::streamsize optimal_buffer_size() const {
82  }
83 
84  /**
85  * Returns true if the file is opened
86  */
87  bool is_open() const;
88 
89  /**
90  * Attempts to read bufsize bytes into the buffer provided.
91  * Returns the actual number of bytes read. Returns -1 on failure.
92  */
93  std::streamsize read(char* c, std::streamsize bufsize);
94 
95  /**
96  * Closes all file handles
97  */
98  void close();
99 
100 
101  /**
102  * Returns the length of the open file.
103  * Returns (size_t)(-1) if there is no file opened.
104  */
105  size_t file_size() const;
106 
107  /**
108  * Returns the number of physical bytes read so far. This is an estimate,
109  * especially if the file is gzip compressed.
110  * Returns (size_t)(-1) if there is no file opened.
111  */
112  size_t get_bytes_read() const;
113 
114 
115  /**
116  * Seeks to a different location. Will fail on compressed files.
117  */
118  std::streampos seek(std::streamoff off, std::ios_base::seekdir way);
119 
120 
121  /**
122  * Returns the underlying stream object if possible. nullptr otherwise.
123  */
124  std::shared_ptr<std::istream> get_underlying_stream() const;
125 
126  private:
127  /**
128  * The constructors redirect to this function to open the stream.
129  */
130  void open_file(std::string file, bool gzip_compressed);
131 };
132 
133 } // namespace fileio_impl
134 } // namespace turi
135 #endif
std::streamsize read(char *c, std::streamsize bufsize)
std::streampos seek(std::streamoff off, std::ios_base::seekdir way)
std::shared_ptr< std::istream > get_underlying_stream() const
size_t FILEIO_READER_BUFFER_SIZE