6 #ifndef TURI_SHMIPC_HPP 7 #define TURI_SHMIPC_HPP 11 #include <core/storage/fileio/fs_utils.hpp> 12 #include <boost/interprocess/interprocess_fwd.hpp> 17 struct shared_memory_buffer;
59 bool bind(
const std::string& ipcfile =
"",
80 bool send(
const char* c,
size_t len);
87 bool receive(
char** c,
size_t* clen,
size_t& receivelen,
size_t timeout);
93 bool receive_direct(
char**c,
size_t* len,
size_t& receivelen,
size_t timeout);
101 std::shared_ptr<raii_deleter> m_ipcfile_deleter;
102 std::shared_ptr<boost::interprocess::shared_memory_object> m_shared_object;
103 std::shared_ptr<boost::interprocess::mapped_region> m_mapped_region;
105 std::string m_shmname;
106 shared_memory_buffer* m_buffer =
nullptr;
127 bool connect(std::string ipcfile,
size_t timeout = 10);
137 bool send(
const char* c,
size_t len);
144 bool receive(
char**c,
size_t* len,
size_t& receivelen,
size_t timeout);
150 bool receive_direct(
char**c,
size_t* len,
size_t& receivelen,
size_t timeout);
152 std::shared_ptr<boost::interprocess::shared_memory_object> m_shared_object;
153 std::shared_ptr<boost::interprocess::mapped_region> m_mapped_region;
154 shared_memory_buffer* m_buffer =
nullptr;
163 template <
typename T>
171 size_t receivelen = 0;
172 if (buffer_size == 0)
return false;
173 if (len < shm.buffer_size() - 1) {
181 int ret = shm.send(c, shm.buffer_size());
182 if (ret ==
false)
return false;
183 sent = shm.buffer_size();
185 bool ret = shm.receive_direct(
nullptr,
nullptr, receivelen, (
size_t)(-1));
186 if (ret ==
false)
return false;
187 size_t send_length = std::min(len - sent, buffer_size);
188 ret = shm.send(c + sent, send_length);
189 if (ret ==
false)
return false;
192 if (len % buffer_size == 0) {
193 bool ret = shm.receive_direct(
nullptr,
nullptr, receivelen, (
size_t)(-1));
194 if (ret ==
false)
return false;
195 ret = shm.send(
nullptr, 0);
196 if (ret ==
false)
return false;
209 template <
typename T>
211 size_t& receivelen,
size_t timeout) {
217 if (buffer_size == 0) log_and_throw(
"Invalid shared memory object");
220 size_t last_receivelen = 0;
221 size_t cur_timeout = timeout;
225 bool ret = shm.receive_direct(&recv_buffer, &recvlen,
226 last_receivelen, cur_timeout);
227 if (ret ==
false)
return false;
229 if (receivelen + last_receivelen > (*clen)) {
231 size_t realloc_size = std::max<size_t>((*clen) * 2, receivelen + last_receivelen);
232 (*c) = (
char*)realloc(*c, realloc_size);
233 (*clen) = realloc_size;
235 memcpy((*c) + receivelen, recv_buffer, last_receivelen);
236 cur_timeout = (size_t)(-1);
239 receivelen += last_receivelen;
241 if (last_receivelen < buffer_size) {
245 bool ret = shm.send(
nullptr, 0);
246 if (ret ==
false)
return false;
bool send(const char *c, size_t len)
bool receive_direct(char **c, size_t *len, size_t &receivelen, size_t timeout)
bool wait_for_connect(size_t timeout=10)
bool bind(const std::string &ipcfile="", size_t buffer_size=1024 *1024)
size_t buffer_size() const
bool receive(char **c, size_t *clen, size_t &receivelen, size_t timeout)
bool large_send(T &shm, const char *c, size_t len)
std::string get_shared_memory_name() const
bool large_receive(T &shm, char **c, size_t *clen, size_t &receivelen, size_t timeout)