Turi Create  4.0
shmipc.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_SHMIPC_HPP
7 #define TURI_SHMIPC_HPP
8 #include <string>
9 #include <cstddef>
10 #include <memory>
11 #include <core/storage/fileio/fs_utils.hpp>
12 #include <boost/interprocess/interprocess_fwd.hpp>
13 #include <core/logging/logger.hpp>
14 namespace turi {
15 namespace shmipc {
16 
17 struct shared_memory_buffer;
18 struct raii_deleter;
19 /**
20  * \defgroup shmipc Shared Memory Interprocess Communication
21  */
22 
23 /**
24  * \ingroup shmipc
25  * The SHM IPC server/client defines a simple unsynchronized single server /
26  * single client communication system over interprocess shared memory.
27  *
28  * The communication is mostly unsynchronized between server and client, so
29  * users of the server/client implementations have to be careful about who
30  * is sending and who is receiving.
31  *
32  * The class uses Posix Shared Memory segments. Essentially you define a
33  * "name" (on Linux this name shows up in /dev/shm, on Mac this is unfortunately
34  * not enumerable).
35  *
36  * Within the shared memory segment is essentially a buffer, and a pair of
37  * condition variables used to wake a client receiver, or a server receiver.
38  *
39  * The server creates a name, and a size and waits for a client to connect to
40  * it. Once a client connects, the shared memory segment is deleted (unlink).
41  * This means that once both server and client terminate (or crash), the shared
42  * memory segment is released. (otherwise the named segment will hang around
43  * until a reboot).
44  *
45  * However, this does mean that program crash prior to connection can result in
46  * leaked segments. And that is bad. Hence we need a "garbage collection"
47  * mechanism.
48  */
49 class server {
50  public:
51  server() = default;
52  ~server();
53  /**
54  * Binds a server to an ipc name. This SHM name will show up in
55  * /dev/shm on linux machines. Every server must bind to a different file.
56  * If ipcfile is an empty string, a file is automatically constructed and
57  * \ref get_shared_memory_name() can be used to get the shared memory name.
58  */
59  bool bind(const std::string& ipcfile = "",
60  size_t buffer_size = 1024*1024);
61 
62  /**
63  * Returns the maximum amount of data that can be sent or received.
64  */
65  size_t buffer_size() const;
66 
67  /**
68  * Returns the shared memory object name.
69  */
70  std::string get_shared_memory_name() const;
71 
72  /**
73  * Waits up to timeout seconds for a connection.
74  */
75  bool wait_for_connect(size_t timeout = 10);
76 
77  /**
78  * Sends a bunch of bytes.
79  */
80  bool send(const char* c, size_t len);
81 
82  /**
83  * Receives a bunch of bytes into (*c) and (*clen). (*c) and (*clen)
84  * may be resized as required to fit the data. if c == nullptr and
85  * clen == nullptr, the return data is discarded.
86  */
87  bool receive(char** c, size_t* clen, size_t& receivelen, size_t timeout);
88 
89  /**
90  * receives a direct buffer to the data. It is up to the caller to make sure
91  * that no other sends/receives happen while the caller accesses the data.
92  */
93  bool receive_direct(char**c, size_t* len, size_t& receivelen, size_t timeout);
94 
95  /**
96  * shutsdown the server
97  */
98  void shutdown();
99 
100  private:
101  std::shared_ptr<raii_deleter> m_ipcfile_deleter;
102  std::shared_ptr<boost::interprocess::shared_memory_object> m_shared_object;
103  std::shared_ptr<boost::interprocess::mapped_region> m_mapped_region;
104 
105  std::string m_shmname;
106  shared_memory_buffer* m_buffer = nullptr;
107 };
108 
109 
110 /**
111  * \ingroup shmipc
112  * The corresponding client object to the \ref server.
113  *
114  * The communication is mostly unsynchronized between server and client, so
115  * users of the server/client implementations have to be careful about who
116  * is sending and who is receiving.
117  *
118  * More design details in \ref server
119  */
120 class client{
121  public:
122  client() = default;
123  ~client() = default;
124  /**
125  * Connects to a server via the ipc file.
126  */
127  bool connect(std::string ipcfile, size_t timeout = 10);
128 
129  /**
130  * Returns the maximum amount of data that can be sent or received.
131  */
132  size_t buffer_size() const;
133 
134  /**
135  * Sends a bunch of bytes.
136  */
137  bool send(const char* c, size_t len);
138 
139  /**
140  * Receives a bunch of bytes into (*c) and (*clen). (*c) and (*clen)
141  * may be resized as required to fit the data. if c == nullptr and
142  * clen == nullptr, the return data is discarded.
143  */
144  bool receive(char**c, size_t* len, size_t& receivelen, size_t timeout);
145 
146  /**
147  * receives a direct buffer to the data. It is up to the caller to make sure
148  * that no other sends/receives happen while the caller accesses the data.
149  */
150  bool receive_direct(char**c, size_t* len, size_t& receivelen, size_t timeout);
151  private:
152  std::shared_ptr<boost::interprocess::shared_memory_object> m_shared_object;
153  std::shared_ptr<boost::interprocess::mapped_region> m_mapped_region;
154  shared_memory_buffer* m_buffer = nullptr;
155 };
156 
157 /**
158  * \ingroup shmipc
159  * Send an arbitrarily large amount of data
160  * through an SHMIPC channel. T can be either a server or a client.
161  * Receiver must use the matching large_receive function.
162  */
163 template <typename T>
164 bool large_send(T& shm, const char *c, size_t len) {
165 // logstream(LOG_WARNING) << "Sending : " << len << std::endl;
166  /*
167  * Essentially, we keep sending full buffers. The send is complete
168  * when an less than full buffer is received.
169  */
170  size_t buffer_size = shm.buffer_size();
171  size_t receivelen = 0;
172  if (buffer_size == 0) return false;
173  if (len < shm.buffer_size() - 1) {
174  shm.send(c, len);
175  } else {
176  /*
177  * send a full buffer then wait for a reply then send again. etc. We don't
178  * need to wait for reply on the last buffer which is not full.
179  */
180  size_t sent = 0;
181  int ret = shm.send(c, shm.buffer_size());
182  if (ret == false) return false;
183  sent = shm.buffer_size();
184  while (sent < len) {
185  bool ret = shm.receive_direct(nullptr, nullptr, receivelen, (size_t)(-1));
186  if (ret == false) return false;
187  size_t send_length = std::min(len - sent, buffer_size);
188  ret = shm.send(c + sent, send_length);
189  if (ret == false) return false;
190  sent += send_length;
191  }
192  if (len % buffer_size == 0) {
193  bool ret = shm.receive_direct(nullptr, nullptr, receivelen, (size_t)(-1));
194  if (ret == false) return false;
195  ret = shm.send(nullptr, 0); // send an empty terminator
196  if (ret == false) return false;
197  }
198  }
199  return true;
200 }
201 
202 
203 /**
204  * \ingroup shmipc
205  * Receives an arbitrarily large amount of data
206  * through an SHMIPC channel. T can be either a server or a client.
207  * Receiver must use the matching large_receive function.
208  */
209 template <typename T>
210 bool large_receive(T& shm, char **c, size_t* clen,
211  size_t& receivelen, size_t timeout) {
212  /*
213  * Essentially, we keep receiving as long as we are getting full buffers.
214  * timeout only applies to the first receive.
215  */
216  size_t buffer_size = shm.buffer_size();
217  if (buffer_size == 0) log_and_throw("Invalid shared memory object");
218 
219  receivelen = 0;
220  size_t last_receivelen = 0;
221  size_t cur_timeout = timeout;
222  while(1) {
223  char* recv_buffer;
224  size_t recvlen;
225  bool ret = shm.receive_direct(&recv_buffer, &recvlen,
226  last_receivelen, cur_timeout);
227  if (ret == false) return false;
228  // make sure we have room to receive an entire full buffer
229  if (receivelen + last_receivelen > (*clen)) {
230  // at least double c or fit.
231  size_t realloc_size = std::max<size_t>((*clen) * 2, receivelen + last_receivelen);
232  (*c) = (char*)realloc(*c, realloc_size);
233  (*clen) = realloc_size;
234  }
235  memcpy((*c) + receivelen, recv_buffer, last_receivelen);
236  cur_timeout = (size_t)(-1); // timeout only applies to the first receive
237 
238  // increment the receive count
239  receivelen += last_receivelen;
240  // non-full buffer. we are done
241  if (last_receivelen < buffer_size) {
242  // logstream(LOG_WARNING) << "Receiving: " << last_receivelen << std::endl;
243  return true;
244  } else {
245  bool ret = shm.send(nullptr, 0); // send an empty message (continue with next)
246  if (ret == false) return false;
247  }
248  }
249 }
250 
251 } // shmipc
252 } // namespace turi
253 #endif
bool send(const char *c, size_t len)
bool receive_direct(char **c, size_t *len, size_t &receivelen, size_t timeout)
bool wait_for_connect(size_t timeout=10)
bool bind(const std::string &ipcfile="", size_t buffer_size=1024 *1024)
size_t buffer_size() const
bool receive(char **c, size_t *clen, size_t &receivelen, size_t timeout)
bool large_send(T &shm, const char *c, size_t len)
Definition: shmipc.hpp:164
std::string get_shared_memory_name() const
bool large_receive(T &shm, char **c, size_t *clen, size_t &receivelen, size_t timeout)
Definition: shmipc.hpp:210