Turi Create  4.0
async_request_socket.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef NANOSOCKETS_SOCKETS_ASYNC_REQUEST_SOCKET_HPP
7 #define NANOSOCKETS_SOCKETS_ASYNC_REQUEST_SOCKET_HPP
8 #include <string>
9 #include <vector>
10 #include <core/parallel/mutex.hpp>
11 #include <core/parallel/pthread_tools.hpp>
12 #include <core/system/nanosockets/zmq_msg_vector.hpp>
13 #include <core/export.hpp>
14 namespace turi {
15 
16 namespace nanosockets {
17 
18 
19 
20 /**
21  * \ingroup nanosockets
22  *
23  * Constructs a nanomsg asynchronous request socket.
24  *
25  * The async_request_socket is the requesting endpoint of
26  * \ref async_reply_socket. The \ref async_reply_socket listens and waits for
27  * requests, and the \ref async_request_socket sends requests.
28  * Communication is atomic; either the listener receives all of a message, or
29  * none at all. Communication will be automatically retried as needed.
30  *
31  * \code
32  * async_request_socket sock("ipc:///tmp/echo_service")
33  * zmq_msg_vector ret;
34  * int ret = sock.request_master(msg, ret, 10); // 10 second timeout
35  * \endcode
36  *
37  * This object is multi-threaded. Calls can be made from multiple threads
38  * simultaneously and will be queued accordingly.
39  *
40  * All messaging is done via \ref zmq_msg_vector which is internally, an array
41  * of nn_msg_t objects. Message boundaries are preserved across the wire; i.e.
42  * if a request of 4 messages is sent, we will receive exactly a zmq_msg_vector
43  * of 4 messages.
44  */
45 class EXPORT async_request_socket {
46  public:
47  /**
48  * Constructs a request socket.
49  * The request will be sent to the current owners of the key
50  *
51  * \param target_address Where to connect to
52  * \param num_connections Number of parallel connections
53  */
54  async_request_socket(std::string target_address, size_t num_connections=2);
55 
56 
57  /**
58  * Closes this socket. Once closed, the socket cannot be used again.
59  */
60  void close();
61 
63 
64  /**
65  * Sends a request to the server.
66  * Returns 0 on success, an error number on failure
67  *
68  * \param msgs The message to send
69  * \param ret The returned message will be stored here
70  * \param timeout Number of seconds to wait before timeout. Defaults to 0
71  */
72  int request_master(zmq_msg_vector& msgs,
73  zmq_msg_vector& ret,
74  size_t timeout = 0);
75 
76  /**
77  * When waiting for a response, this function will be polled once per second.
78  * If this function returns false, the receive polling will quit.
79  * This can be used for instance, to quit a receive if we know for certain
80  * the remote is no longer alive.
81  */
82  void set_receive_poller(boost::function<bool()>);
83  private:
84 
85  struct socket_data {
86 
87  inline socket_data() { }
88 
89  // fake copy constructor
90  inline socket_data(const socket_data& other) { }
91  // fake operator=
92  inline void operator=(const socket_data& other) { }
93 
94  // The actual zmq socket
95  int z_socket = -1;
96  };
97  // queue of available sockets
98  mutex global_lock;
99  conditional cvar;
100  std::vector<size_t> available;
101 
102  std::string server;
103  std::vector<socket_data> sockets;
104 
105  boost::function<bool()> receive_poller;
106  // create a socket for socket i
107  // returns 0 on success, errno on failure.
108  int create_socket(size_t i);
109 };
110 
111 
112 }
113 }
114 #endif