Turi Create  4.0
zmq_msg_vector.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef NANOSOCKETS_ZMQ_MSG_VECTOR_HPP
7 #define NANOSOCKETS_ZMQ_MSG_VECTOR_HPP
8 #include <cassert>
9 #include <string>
10 #include <list>
11 #include <cstring>
12 #include <core/storage/serialization/oarchive.hpp>
13 #include <core/storage/serialization/iarchive.hpp>
14 #include <core/export.hpp>
15 namespace turi {
16 namespace nanosockets {
17 /**
18  * \ingroup nanosockets
19  *
20  * Describes a wrapper around an array of nanomsg messages part.
21  * Writes are performed through the insert_back() function.
22  * The standard pattern is:
23  * \code
24  * // creates a new message part and returning it
25  * nn_msg_t* msg = msgvec.insert_back();
26  * zmq_msg_init_...(msg, ...)
27  * \endcode
28  *
29  * It can also be used for reading. It maintains a read index which
30  * is initialized to zero. Each call to read_next() will return the message
31  * part at the readindex, and increment the read index. If there are no more
32  * parts, the function will return NULL.
33  *
34  * The contents of the vector are automatically freed on destruction.
35  *
36  * This class says zmq. It was originally built as a thin wrapper around
37  * zeromq but is now built using nanomsg but the interface and all names
38  * are maintained for maximal backward compatibility
39  */
40 typedef std::string nn_msg_t;
41 class EXPORT zmq_msg_vector {
42  public:
43  zmq_msg_vector():read_index(0) { }
44  zmq_msg_vector(const zmq_msg_vector& other) {
45  read_index = 0;
46  (*this) = other;
47  }
48  zmq_msg_vector& operator=(const zmq_msg_vector& other) {
49  clone_from(const_cast<zmq_msg_vector&>(other));
50  return *this;
51  }
52 
53  inline ~zmq_msg_vector() {
54  clear();
55  }
56 
57  void clone_from(zmq_msg_vector& other) {
58  clear();
59  msgs = other.msgs;
60  }
61 
62  /// Returns the number of elements in the msg vector
63  inline size_t size() const {
64  return msgs.size();
65  }
66 
67  /// Return the message at index i in the vector. Slow.
68  inline const nn_msg_t* operator[](size_t i) const {
69  auto iter = msgs.begin();
70  while (i > 0) ++iter;
71  return &(*iter);
72  }
73 
74  /// Return the message at index i in the vector. Slow.
75  inline nn_msg_t* operator[](size_t i) {
76  auto iter = msgs.begin();
77  while (i > 0) ++iter;
78  return &(*iter);
79  }
80 
81  /// Returns true if the vector is empty
82  inline bool empty() const {
83  return msgs.empty();
84  }
85 
86  /**
87  * Allocates a new zeromq message part, inserting it into the
88  * end of the vector, and returning the message part.
89  */
90  inline nn_msg_t* insert_back() {
91  msgs.push_back(nn_msg_t());
92  return &msgs.back();
93  }
94 
95  /**
96  * Inserts a predefined nn_msg_t to the back
97  */
98  inline void insert_back(nn_msg_t& msg) {
99  msgs.push_back(msg);
100  }
101 
102  /**
103  * Inserts a message with a given length
104  */
105  inline nn_msg_t* insert_back(const void* c, size_t len) {
106  msgs.push_back(nn_msg_t(reinterpret_cast<const char*>(c), len));
107  return &(msgs.back());
108  }
109 
110  /**
111  * Inserts a message with a given length
112  */
113  inline nn_msg_t* insert_back(const std::string& s) {
114  msgs.push_back(s);
115  return &(msgs.back());
116  }
117 
118 
119  /**
120  * Allocates a new zeromq message part, inserting it into the
121  * front of the vector, and returning the message part.
122  */
123  inline nn_msg_t* insert_front() {
124  msgs.push_front(nn_msg_t());
125  return &(msgs.front());
126  }
127 
128 
129  /**
130  * Inserts a predefined nn_msg_t to the front
131  */
132  inline void insert_front(nn_msg_t& msg) {
133  msgs.push_back(msg);
134  }
135 
136  /**
137  * Inserts a message with a given length
138  */
139  inline nn_msg_t* insert_front(const void* c, size_t len) {
140  msgs.push_front(nn_msg_t(reinterpret_cast<const char*>(c), len));
141  return &(msgs.front());
142  }
143 
144  /**
145  * Inserts a message with a given length
146  */
147  inline nn_msg_t* insert_front(const std::string& s) {
148  msgs.push_front(s);
149  return &(msgs.front());
150  }
151 
152 
153 
154  /**
155  * Returns the next unread message.
156  * Returns NULL if all message have been read
157  */
158  inline nn_msg_t* read_next() {
159  if (read_index >= size()) return NULL;
160  nn_msg_t* ret = (*this)[read_index];
161  ++read_index;
162  return ret;
163  }
164 
165  /// Returns the current read index
166  inline size_t get_read_index() const {
167  return read_index;
168  }
169 
170  /// Returns the number of unread messages; equal to size() - read_index.
171  inline size_t num_unread_msgs() const {
172  return size() - get_read_index();
173  }
174 
175  /// Resets the read index to 0
176  inline void reset_read_index() {
177  read_index = 0;
178  }
179 
180  /// clears the vector and frees all the messages.
181  /// Also resets the read index.
182  inline void clear() {
183  msgs.clear();
184  read_index = 0;
185  }
186 
187  /// removes an element from the front.
188  inline void pop_front() {
189  if (!empty()) {
190  msgs.pop_front();
191  // shift the read index if it not already at the head
192  read_index -= (read_index > 0);
193  }
194  }
195 
196  /// removes an element from the back.
197  inline void pop_back() {
198  if (!empty()) {
199  msgs.pop_back();
200  }
201  }
202 
203  /// removes an element from the back.
204  inline nn_msg_t* back() {
205  if (!empty()) {
206  return &msgs.back();
207  }
208  return NULL;
209  }
210 
211 /// removes an element from the back.
212  inline nn_msg_t* front() {
213  if (!empty()) {
214  return &msgs.front();
215  }
216  return NULL;
217  }
218 
219 
220 
221  /**
222  * Pops front and tries to match it against a fixed string
223  * asertion failure is it does not match
224  */
225  inline void assert_pop_front(const void* c, size_t len) {
226  assert(!empty());
227  assert(msgs.front().length() == len);
228  if (len > 0) {
229  assert(memcmp(msgs.front().data(), c, len) == 0);
230  }
231  pop_front_and_free();
232  }
233 
234  inline void assert_pop_front(const std::string& s) {
235  assert_pop_front(s.c_str(), s.length());
236  }
237 
238 
239  inline std::string extract_front() {
240  assert(!empty());
241  std::string ret = std::move(msgs.front());
242  pop_front_and_free();
243  return ret;
244  }
245 
246  inline void extract_front(void* c, size_t clen) {
247  assert(!empty());
248  assert(clen == msgs.front().length());
249  if (clen > 0) {
250  memcpy(c, msgs.front().data(), clen);
251  }
252  pop_front_and_free();
253  }
254 
255  /// removes an element from the front and deletes it
256  inline void pop_front_and_free() {
257  if (!empty()) {
258  pop_front();
259  // shift the read index if it not already at the head
260  read_index -= (read_index > 0);
261  }
262  }
263 
264  /**
265  * Tries to send this message through a socket with a timeout
266  * Returns 0 on success.
267  * Returns EAGAIN if a timeout is reached.
268  * Otherwise returns a zeromq error code.
269  */
270  int send(int socket, int timeout);
271 
272  /**
273  * Tries to send this message through a ZeroMQ socket.
274  * Returns 0 on success.
275  * Otherwise returns a zeromq error code.
276  */
277  int send(int socket);
278 
279  /**
280  * Tries to receive a multipart tmessage through a ZeroMQ
281  * socket with a timeout
282  * Returns 0 on success.
283  * Returns EAGAIN if a timeout is reached.
284  * Otherwise returns a zeromq error code.
285  */
286  int recv(int socket, int timeout);
287 
288  /**
289  * Tries to receive a multipart tmessage through a ZeroMQ socket.
290  * Returns 0 on success.
291  * Otherwise returns a zeromq error code.
292  */
293  int recv(int socket);
294 
295  void save(turi::oarchive& oarc) const {
296  oarc << msgs;
297  }
298 
299  void load(turi::iarchive& iarc) {
300  msgs.clear();
301  iarc >> msgs;
302  }
303  private:
304  // list so things are not invalidated. A number of
305  // things in this class pass pointers around
306  std::list<nn_msg_t> msgs;
307  size_t read_index;
308 
309 
310  /**
311  * Tries to send this message through a ZeroMQ socket with a timeout
312  * Returns 0 on success.
313  * Returns EAGAIN if a timeout is reached.
314  * Otherwise returns a zeromq error code.
315  * Does not retry on EINTR
316  */
317  int send_impl(int socket, int timeout);
318 
319  /**
320  * Tries to send this message through a ZeroMQ socket.
321  * Returns 0 on success.
322  * Otherwise returns a zeromq error code.
323  * Does not retry on EINTR
324  */
325  int send_impl(int socket);
326 
327  /**
328  * Tries to receive a multipart tmessage through a ZeroMQ
329  * socket with a timeout
330  * Returns 0 on success.
331  * Returns EAGAIN if a timeout is reached.
332  * Otherwise returns a zeromq error code.
333  * Does not retry on EINTR
334  */
335  int recv_impl(int socket, int timeout);
336 
337  /**
338  * Tries to receive a multipart tmessage through a ZeroMQ socket.
339  * Returns 0 on success.
340  * Otherwise returns a zeromq error code.
341  * Does not retry on EINTR
342  */
343  int recv_impl(int socket);
344 
345 
346  };
347 
348 } // namespace nanosockets
349 }
350 #endif
The serialization input archive object which, provided with a reference to an istream, will read from the istream, providing deserialization capabilities.
Definition: iarchive.hpp:60
std::string nn_msg_t
The serialization output archive object which, provided with a reference to an ostream, will write to the ostream, providing serialization capabilities.
Definition: oarchive.hpp:80