Turi Create  4.0
hdfs.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_HDFS_HPP
7 #define TURI_HDFS_HPP
8 
9 // Requires the hdfs library
10 #ifdef HAS_HADOOP
11 extern "C" {
12  #include <hdfs.h>
13 
14 // Define structs not defined in hdfs.h
15 
16  /*
17  * The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
18  */
19  enum hdfsStreamType
20  {
21  HDFS_STREAMTYPE_UNINITIALIZED = 0,
22  HDFS_STREAMTYPE_INPUT = 1,
23  HDFS_STREAMTYPE_OUTPUT = 2,
24  };
25 
26  /**
27  * The 'file-handle' to a file in hdfs.
28  */
29  struct hdfsFile_internal {
30  void* file;
31  enum hdfsStreamType type;
32  int flags;
33  };
34 
35 }
36 #endif
37 
38 #include <vector>
39 #include <iostream>
40 #include <boost/iostreams/stream.hpp>
41 #include <core/logging/assertions.hpp>
42 
43 
44 namespace turi {
45 
46 #ifdef HAS_HADOOP
47  /**
48  * \ingroup fileio
49  * \internal
50  * Wrapper around libHDFS
51  */
52  class hdfs {
53  private:
54  /** the primary filesystem object */
55  hdfsFS filesystem;
56  public:
57  /** hdfs file source is used to construct boost iostreams */
58  class hdfs_device {
59  public: // boost iostream concepts
60  typedef char char_type;
61  struct category :
62  public boost::iostreams::device_tag,
63  public boost::iostreams::multichar_tag,
64  public boost::iostreams::closable_tag,
65  public boost::iostreams::bidirectional_seekable { };
66  // while this claims to be bidirectional_seekable, that is not true
67  // it is only read seekable. Will fail when seeking on write
68  private:
69  hdfsFS filesystem;
70 
71  hdfsFile file;
72 
73  size_t m_file_size;
74 
75  public:
76  hdfs_device() : filesystem(NULL), file(NULL) { }
77 
78  hdfs_device(const hdfs& hdfs_fs, const std::string& filename, const bool write = false);
79 
80  // ~hdfs_device() { if(file != NULL) close(); }
81 
82  // Because the device has bidirectional tag, close will be called
83  // twice, one with the std::ios_base::in, followed by out.
84  // Only close the file when the close tag matches the actual file type.
85  void close(std::ios_base::openmode mode = std::ios_base::openmode());
86 
87  /** the optimal buffer size is 0. */
88  inline std::streamsize optimal_buffer_size() const { return 0; }
89 
90  std::streamsize read(char* strm_ptr, std::streamsize n);
91 
92  std::streamsize write(const char* strm_ptr, std::streamsize n);
93 
94  bool good() const { return file != NULL; }
95 
96  /**
97  * Seeks to a different location.
98  */
99  std::streampos seek(std::streamoff off,
100  std::ios_base::seekdir way,
101  std::ios_base::openmode);
102  }; // end of hdfs device
103 
104  /**
105  * The basic file type has constructor matching the hdfs device.
106  */
107  typedef boost::iostreams::stream<hdfs_device> fstream;
108 
109  /**
110  * Open a connection to the filesystem. The default arguments
111  * should be sufficient for most uses
112  */
113  hdfs(const std::string& host = "default", tPort port = 0);
114 
115  bool good() const { return filesystem != NULL; }
116 
117  ~hdfs() {
118  if (good()) {
119  const int error = hdfsDisconnect(filesystem);
120  ASSERT_EQ(error, 0);
121  }
122  } // end of ~hdfs
123 
124  /**
125  * Returns the contents of a directory
126  */
127  std::vector<std::string> list_files(const std::string& path) const;
128 
129  /**
130  * Returns the contents of a directory as well as a boolean for every
131  * file identifying whether the file is a directory or not.
132  */
133  std::vector<std::pair<std::string, bool> > list_files_and_stat(const std::string& path) const;
134 
135  /**
136  * Returns the size of a given file. Returns (size_t)(-1) on failure.
137  */
138  size_t file_size(const std::string& path) const;
139 
140  /**
141  * Returns true if the given path exists
142  */
143  bool path_exists(const std::string& path) const;
144 
145  /**
146  * Returns true if the given path is a directory, false if it
147  * does not exist, or if is a regular file
148  */
149  bool is_directory(const std::string& path) const;
150 
151 
152  /**
153  * Creates a subdirectory and all parent required directories (like mkdir -p)
154  * Returns true on success, false on failure.
155  */
156  bool create_directories(const std::string& path) const;
157 
158  /**
159  * Change the permissions of the file.
160  */
161  bool chmod(const std::string& path, short mode) const;
162 
163 
164  /**
165  * Deletes a single file / empty directory.
166  * Returns true on success, false on failure.
167  */
168  bool delete_file_recursive(const std::string& path) const;
169 
170  inline static bool has_hadoop() { return true; }
171 
172  static hdfs& get_hdfs();
173 
174  static hdfs& get_hdfs(std::string host, size_t port);
175  }; // end of class hdfs
176 #else
177 
178  class hdfs {
179  public:
180  /** hdfs file source is used to construct boost iostreams */
181  class hdfs_device {
182  public: // boost iostream concepts
183  typedef char char_type;
184  typedef boost::iostreams::bidirectional_device_tag category;
185  public:
186  hdfs_device(const hdfs& hdfs_fs, const std::string& filename,
187  const bool write = false) {
188  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
189  << std::endl;
190  }
191  void close() { }
192  std::streamsize read(char* strm_ptr, std::streamsize n) {
193  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
194  << std::endl;
195  return 0;
196  } // end of read
197  std::streamsize write(const char* strm_ptr, std::streamsize n) {
198  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
199  << std::endl;
200  return 0;
201  }
202  bool good() const { return false; }
203 
204  }; // end of hdfs device
205 
206  /**
207  * The basic file type has constructor matching the hdfs device.
208  */
209  typedef boost::iostreams::stream<hdfs_device> fstream;
210 
211  /**
212  * Open a connection to the filesystem. The default arguments
213  * should be sufficient for most uses
214  */
215  hdfs(const std::string& host = "default", int port = 0) {
216  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
217  << std::endl;
218  } // end of constructor
219 
220  inline std::vector<std::string> list_files(const std::string& path) const {
221  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
222  << std::endl;
223  return std::vector<std::string>();;
224  } // end of list_files
225 
226  inline std::vector<std::pair<std::string, bool> > list_files_and_stat(const std::string& path) const {
227  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
228  << std::endl;
229  return std::vector<std::pair<std::string, bool>>();
230  }
231 
232  inline size_t file_size(const std::string& path) const {
233  return (size_t)(-1);
234  }
235 
236  /**
237  * Returns true if the given path exists
238  */
239  inline bool path_exists(const std::string& path) const {
240  return false;
241  }
242 
243  inline bool is_directory(const std::string& path) const {
244  return false;
245  }
246 
247  bool create_directories(const std::string& path) const {
248  return false;
249  }
250 
251  bool delete_file_recursive(const std::string& path) const {
252  return false;
253  }
254 
255  bool good() const { return false; }
256 
257  // No hadoop available
258  inline static bool has_hadoop() { return false; }
259 
260  static hdfs& get_hdfs();
261 
262  static hdfs& get_hdfs(std::string host, size_t port);
263  }; // end of class hdfs
264 
265 
266 #endif
267 
268 }; // end of namespace turi
269 #endif
#define logstream(lvl)
Definition: logger.hpp:276
#define LOG_FATAL
Definition: logger.hpp:96