Turi Create  4.0
async_reply_socket.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef NANOSOCKETS_SOCKETS_ASYNC_REPLY_SOCKET_HPP
7 #define NANOSOCKETS_SOCKETS_ASYNC_REPLY_SOCKET_HPP
8 #include <string>
9 #include <vector>
10 #include <set>
11 #include <queue>
12 #include <boost/function.hpp>
13 #include <core/parallel/pthread_tools.hpp>
14 #include <core/system/nanosockets/zmq_msg_vector.hpp>
15 #include <core/export.hpp>
16 namespace turi {
17 
18 namespace nanosockets {
19 /**
20  * \ingroup nanosockets
21  *
22  * A nanomsg asynchronous reply socket.
23  *
24  * The Asynchronous reply socket is the target endpoint of the the
25  * asynchronous request socket (\ref async_request_socket). The reply socket
26  * listens on an endpoint, and the request socket sends requests to an endpoint.
27  * Endpoints are standard Zeromq style endpoint addresses , for instance,
28  * tcp://[ip]:[port], or ipc://[filename] (interprocess socket) or
29  * inproc://[handlename] (inprocess socket).
30  * Ipc sockets are emulated on windows using TCP.
31  *
32  * The asynchronous reply socket is constructed with a callback which is
33  * called whenever a request is received. The callback may be called in
34  * parallel; up to the value of nthreads. For instance, a simple echo service
35  * can be built with:
36  *
37  * \code
38  * void echo(zmq_msg_vector& recv, zmq_msg_vector& reply) {
39  * reply = recv;
40  * }
41  * async_reply_socket sock(echo, 4, "ipc:///tmp/echo_service")
42  * sock.start_polling();
43  * \endcode
44  *
45  * All messaging is done via \ref zmq_msg_vector which is internally, an array
46  * of nn_msg_t objects. Message boundaries are preserved across the wire; i.e.
47  * if a request of 4 messages is sent, we will receive exactly a zmq_msg_vector
48  * of 4 messages.
49  *
50  * Also, \see async_request_socket
51  */
52 class EXPORT async_reply_socket {
53  public:
54 
55  /**
56  * Returns true if there are contents to reply.
57  * Returns false otherwise.
58  * If the reply socket is connected to a request socket,
59  * this must always return true.
60  * \note There is no provided way to figure out if a reply is necessary.
61  * This must be managed on a higher protocol layer.
62  */
63  typedef boost::function<bool (zmq_msg_vector& recv,
64  zmq_msg_vector& reply)> callback_type;
65 
66  /**
67  * Constructs a reply socket.
68  * \param callback The function used to process replies. Multiple
69  * threads may call the callback simultaneously
70  * \param nthreads The maximum number of threads to use
71  * \param alternate_bind_address If set, this will be address to bind to.
72  * Otherwise, binds to a free tcp address.
73  */
74  async_reply_socket(callback_type callback,
75  size_t nthreads = 4,
76  std::string bind_address = "");
77 
78  void start_polling();
79  void stop_polling();
80 
81  /**
82  * Closes the socket. Once closed. It cannot be opened again
83  */
84  void close();
85 
86  /**
87  * Returns the address the socket is bound to
88  */
89  std::string get_bound_address();
90 
92 
93  private:
94  struct job {
95  char* data = nullptr;
96  size_t datalen = 0;
97  void* control = nullptr;
98  };
99  mutex socketlock;
100  int z_socket = -1;
101  std::string local_address;
102  callback_type callback;
103 
104  std::queue<job> jobqueue;
105  mutex queuelock;
106  conditional queuecond;
107  bool queue_terminate = false; // false initially. If true, all threads die.
108 
109  void thread_function();
110  void poll_function();
111 
112  void process_job(job j);
113  thread_group threads;
114  thread_group poll_thread;
115 };
116 
117 } // nanosockets
118 }
119 #endif
boost::function< bool(zmq_msg_vector &recv, zmq_msg_vector &reply)> callback_type