MICO Platform
 All Classes Namespaces Functions Variables Friends
HDFSStream.hpp
1 #ifndef HAVE_HDFS_STREAM_H
2 #define HAVE_HDFS_STREAM_H 1
3 
4 #include <istream>
5 #include <ostream>
6 #include <streambuf>
7 #include <hdfs/hdfs.h>
8 
9 #define HDFS_DEFAULT_ADDRESS "localhost"
10 #define HDFS_DEFAULT_PORT 8020
11 #define DEFAULT_BUFFER_SIZE 128 * 1024
12 
13 
14 namespace mico
15 {
16  namespace io
17  {
21  enum FileMode {
22  FILE_MODE_READ, FILE_MODE_WRITE
23  };
24 
25  /*
26  * This is template class for HDFS input and output streams and not for direct use.
27  * In-/Output is buffered in memory. Depending on the use case, the default buffer size might be suboptimal.
28  *
29  * HDFS access is managed with libhdfs3 developed by Pivotal.
30  */
31  class HDFSStreambuf : public std::streambuf {
32 
33  public:
34  virtual ~HDFSStreambuf();
35 
36  protected:
37  /*
38  * Constructor sets up the buffer and opens the given file on the HDFS server.
39  *
40  * @param path Path including file name.
41  * @param mode If file is opened for reading (FILE_MODE_READ) or writing (FILE_MODE_WRITE).
42  * @param address Address of the HDFS name node.
43  * @param port Port of the HDFS RPC service.
44  */
45  HDFSStreambuf(const char* path, FileMode mode, const char* address, uint16_t port, int bufsize = DEFAULT_BUFFER_SIZE);
46 
47  char* buffer; //stream buffer
48  int buffer_size; //stream buffer size
49 
50  hdfsFS fs = NULL; //HDFS connection handle
51  hdfsFile file = NULL; //HDFS file handle
52 
53  private:
54  /*
55  * Copying not allowed, therefore copy constructor is not implemented.
56  */
57  HDFSStreambuf(const HDFSStreambuf& other);
58 
59  /*
60  * Copying not allowed, therefore copy assignment constructor is not implemented.
61  */
62  HDFSStreambuf& operator=(HDFSStreambuf other);
63 
64  };
65 
66  /*
67  * HDFSIStream provides the specific functionality for the HDFS input stream.
68  */
69  class HDFSIStream : public HDFSStreambuf {
70 
71  public:
72  /*
73  * Calls HDFSStreambuf constructor and sets up all input stream relevant things.
74  *
75  * @param path Path including file name.
76  * @param address Address of the HDFS name node.
77  * @param port Port of the HDFS RPC service.
78  */
79  HDFSIStream(const char* path, const char* address, uint16_t port);
80 
81  private:
82  /*
83  * Copying not allowed, therefore copy constructor is not implemented.
84  */
85  HDFSIStream(const HDFSIStream& other);
86 
87  /*
88  * Copying not allowed, therefore copy assignment constructor is not implemented.
89  */
90  HDFSIStream& operator=(HDFSIStream other);
91 
92  /*
93  * Retrieves data from the HDFS file if the buffer is empty.
94  *
95  * @return Hands back the next byte of the stream, or end of file (traits_type::eof()), or -1 on read error.
96  */
97  std::streambuf::int_type underflow();
98 
99  /*
100  * Implements seeking capabilities.
101  *
102  * @return The new absolute stream position or -1 on error.
103  */
104  std::streampos seekoff(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode which = std::ios_base::in);
105 
106  /*
107  * Implements seeking capabilities.
108  *
109  * @return The new absolute stream position or -1 on error.
110  */
111  std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in);
112 
113  /*
114  * The file size is needed to support relative seeking for way is std::ios_base::end.
115  */
116  std::streamsize file_size = -1;
117  };
118 
119  /*
120  * HDFSOStream provides the specific functionality for the HDFS output stream.
121  */
122  class HDFSOStream : public HDFSStreambuf {
123 
124  public:
125  /*
126  * Calls HDFSStreambuf constructor and sets up all output stream relevant things.
127  *
128  * @param path Path including file name.
129  * @param address Address of the HDFS name node.
130  * @param port Port of the HDFS RPC service.
131  */
132  HDFSOStream(const char* path, const char* address, uint16_t port);
133 
134  private:
135  /*
136  * Copying not allowed, therefore copy constructor is not implemented.
137  */
138  HDFSOStream(const HDFSOStream& other);
139 
140  /*
141  * Copying not allowed, therefore copy assignment constructor is not implemented.
142  */
143  HDFSOStream& operator=(HDFSOStream other);
144 
145  /*
146  * Calls the HDFS write function to flush the buffer and pushes c on the buffer.
147  *
148  * @return c on success, traits_type::eof() on error.
149  */
150  std::streambuf::int_type overflow(std::streambuf::int_type c);
151 
152  /*
153  * Flushes the buffer.
154  *
155  * @return 0 on success, -1 on error
156  */
157  int sync();
158 
159  /*
160  * Calles the HDFS write function to flush the buffer and reset buffer pointers. Helper for overflow(...)
161  * and sync().
162  *
163  * @return 0 on success, -1 on failure;
164  */
165  int writeBuffer();
166  };
167 
168 
169 
170  /*
171  * Main type for opening an input stream to a HDFS file for reading.
172  * Use hdfs_istream(path, name node address, RPC port) to open a new stream. Stream supports seeking.
173  */
174  class hdfs_istream : public std::istream {
175  public:
176  hdfs_istream(const char* path) : hdfs_istream(path, HDFS_DEFAULT_ADDRESS, HDFS_DEFAULT_PORT) {};
177  hdfs_istream(std::string path) : hdfs_istream(path.c_str()) {};
178  hdfs_istream(const char* path, const char* address, uint16_t port) : std::istream(new HDFSIStream(path, address, port)) {};
179  hdfs_istream(std::string path, std::string address, uint16_t port) : hdfs_istream(path.c_str(), address.c_str(), port) {};
180  ~hdfs_istream() { delete rdbuf(); };
181  };
182 
183  /*
184  * Main type for opening an output stream to a HDFS file for writing.
185  * Use hdfs_ostream(path, name node address, RPC port) to open a new stream. Stream does not support seeking.
186  */
187  class hdfs_ostream : public std::ostream {
188  public:
189  hdfs_ostream(const char* path) : hdfs_ostream(path, HDFS_DEFAULT_ADDRESS, HDFS_DEFAULT_PORT) {};
190  hdfs_ostream(std::string path) : hdfs_ostream(path.c_str()) {};
191  hdfs_ostream(const char* path, const char* address, uint16_t port) : std::ostream(new HDFSOStream(path, address, port)) {};
192  hdfs_ostream(std::string path, std::string address, uint16_t port) : hdfs_ostream(path.c_str(), address.c_str(), port) {};
193  ~hdfs_ostream() { rdbuf()->pubsync(); delete rdbuf(); };
194  };
195 
196  int removeHdfsFile(const char* path, const char* address, uint16_t port);
197  int removeHdfsFile(const char* path);
198  }
199 }
200 
201 #endif
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in c...
Definition: http_client.cpp:23
Definition: HDFSStream.hpp:122
Definition: HDFSStream.hpp:187
Definition: HDFSStream.hpp:69
Definition: HDFSStream.hpp:31
Definition: HDFSStream.hpp:174