Turi Create  4.0
subscribe_socket.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef FAULT_SOCKETS_SUBSCRIBE_SOCKET_HPP
7 #define FAULT_SOCKETS_SUBSCRIBE_SOCKET_HPP
8 #include <string>
9 #include <vector>
10 #include <set>
11 #include <boost/function.hpp>
12 #include <core/parallel/pthread_tools.hpp>
13 #include <core/system/nanosockets/zmq_msg_vector.hpp>
14 #include <core/export.hpp>
15 namespace turi {
16 
17 namespace nanosockets {
18 /**
19  * \ingroup nanosockets
20  *
21  * Constructs a subscribe socket.
22  *
23  * The subscribe socket binds to at least one endpoint bound by a publish socket
24  * (\ref publish_socket). Everything the publish socket publishes will be
25  * received by the callback. You can register interest in prefix matches
26  * of the message.
27  *
28  * \code
29  * void callback(const std::string& message) {
30  * std::cout << message;
31  * }
32  * subscribe_socket subsock(callback);
33  * subsock.connect("ipc:///tmp/publish");
34  * subsock.subscribe("ABC"); // only received messages beginning with ABC
35  * \endcode
36  */
37 class EXPORT subscribe_socket {
38  public:
39 
40  typedef boost::function<void(const std::string& message)> callback_type;
41 
42  /**
43  * Constructs a subscribe socket.
44  * \param callback The function used to process replies.
45  *
46  * keyval can be NULL in which case all "connect/disconnect" calls
47  * must refer to a ZeroMQ endpoints.
48  */
49  subscribe_socket(callback_type callback);
50 
51  /**
52  * Closes the socket. Once closed. It cannot be opened again
53  */
54  void close();
55 
56  /**
57  * the argument must be a Nanomsg endpoint to connect to.
58  */
59  void connect(std::string endpoint);
60 
61  /**
62  * Disconnects from a given endpoint.
63  */
64  void disconnect(std::string endpoint);
65 
66  /**
67  * Subscribes to a topic. A topic is any message prefix. Only messages
68  * with prefix matching the topic will be received.
69  */
70  void subscribe(std::string topic);
71 
72  /**
73  * Unsubscribes from a topic. A topic is any message prefix. See \subscribe
74  */
75  void unsubscribe(std::string topic);
76 
77  bool unsubscribe_all();
78 
80 
81  private:
82  int z_socket = -1;
83  volatile bool shutting_down = false;
84 
85  std::map<std::string, size_t> publishers;
86 
87  callback_type callback;
88  std::set<std::string> topics;
89  mutex lock;
90  thread thr;
91 
92 
93  void thread_function();
94 };
95 
96 } // nanosockets
97 }
98 #endif