Turi Create  4.0
comm_server.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef CPPIPC_SERVER_COMM_SERVER_HPP
7 #define CPPIPC_SERVER_COMM_SERVER_HPP
8 #include <string>
9 #include <unordered_map>
10 #include <map>
11 #include <unordered_set>
12 #include <functional>
13 #include <memory>
14 #include <algorithm>
15 #include <iterator>
16 #include <atomic>
17 #include <core/parallel/mutex.hpp>
18 #include <boost/thread/mutex.hpp>
19 #include <boost/thread/lock_guard.hpp>
20 #include <core/system/nanosockets/socket_errors.hpp>
21 #include <core/system/nanosockets/async_reply_socket.hpp>
22 #include <core/system/nanosockets/publish_socket.hpp>
23 #include <core/system/cppipc/common/status_types.hpp>
24 #include <core/system/cppipc/server/dispatch.hpp>
25 #include <core/system/cppipc/server/cancel_ops.hpp>
26 
27 
28 namespace cppipc {
29 namespace nanosockets = turi::nanosockets;
30 struct reply_message;
31 struct call_message;
32 
33 // some annoying forward declarations I need to get by some circular references
34 class object_factory_impl;
35 namespace detail {
36  template <typename RetType, typename T, typename MemFn, typename... Args>
38 }
39 
40 /**
41  * \ingroup cppipc
42  * The comm_server manages the server side of the communication interface.
43  *
44  * The comm_server manages the serving of objects. It listens on a bind address
45  * (defaults to an arbitrary TCP port, but an alternate_bind_address can be
46  * provided), and registers its existance in zookeeper. Clients can reach the
47  * server by associating with the same keys in zookeeper.
48  *
49  * The comm_server manages a list of member function pointers and strings they
50  * map to, as well as a complete list of all served objects.
51  *
52  * Basic Utilization
53  * -----------------
54  * To create a object which can be served by remote machines,
55  * first create a base interface class which describes the functions to be
56  * exported using the registration macros \ref REGISTRATION_BEGIN
57  * \ref REGISTRATION_END \ref REGISTER, or the magic macros
58  * \ref GENERATE_INTERFACE and \ref GENERATE_INTERFACE_AND_PROXY.
59  * The actual server side implementation of the object then inherits from
60  * this interface, implementing all the functions.
61  *
62  * For instance, I may have a base interface class called "file_write_base", and
63  * an implementation called "file_write_impl".
64  *
65  * \code
66  * class file_write_base {
67  * public:
68  * virtual int open(std::string s) = 0;
69  * virtual void write(std::string s) = 0;
70  * virtual void close() = 0;
71  * virtual ~file_write_base() {}
72  *
73  * REGISTRATION_BEGIN(file_write_base)
74  * REGISTER(file_write_base::open)
75  * REGISTER(file_write_base::write)
76  * REGISTER(file_write_base::close)
77  * REGISTRATION_END
78  * };
79  *
80  * class file_write_impl: public file_write_base {
81  * file_write_impl(); // regular constructor
82  *
83  * explicit file_write_impl(std::string f) { // open a file on construction
84  * open(f);
85  * }
86  * // ... other implementation details omitted ...
87  * };
88  * \endcode
89  *
90  * To make this class available on the server side, we must tell the server
91  * how to construct an instance of this object by registering a type with the
92  * server, and providing a lambda function returning a pointer to an
93  * implementation.
94  * \code
95  * int main() {
96  * ...
97  * comm_server server(...);
98  * server.register_type<file_write_base>([](){ return new file_write_impl;});
99  * ...
100  * server.start();
101  * ...
102  * }
103  * \endcode
104  * Here we use the trivial constructor, but more generally we can provide
105  * arbitrarily interesting constructors in the lambda. For instance, here
106  * we use the alternate constructor in file_write_impl.
107  * \code
108  * int main() {
109  * ...
110  * comm_server server(...);
111  * server.register_type<file_write_base>([](){ return new file_write_impl("log.txt");});
112  * ...
113  * server.start();
114  * ...
115  * }
116  * \endcode
117  *
118  * Once the server is started, the client will have the ability to create
119  * proxy objects which in turn create matching objects on the server.
120  *
121  * It is important that each base class only describes exactly one
122  * implementation. i.e. register_type<T> should be used only once for any T.
123  *
124  * To see how this code is used, see the comm_client documentation.
125  *
126  * Implementation Details
127  * ----------------------
128  * There is a special "root object" which manages all "special" tasks that
129  * operate on the comm_server itself. This root object always has object ID 0
130  * and is the object_factory_base. This is implemented on the server side by
131  * object_factory_impl, and on the client side as object_factory_proxy.
132  *
133  * The object_factory_impl manages the construction of new object types.
134  *
135  * Interface Modification Safety
136  * -----------------------------
137  * The internal protocol is designed to be robust against changes in interfaces.
138  * i.e. if new functions are added, and the server is recompiled, all previous
139  * client builds will still work. Similarly, if new functions are added and the
140  * client is recompiled, the new client will still work with old servers as long
141  * as the new functions are not called.
142  *
143  */
144 class EXPORT comm_server {
145  private:
146 
147  friend class object_factory_impl;
148 
149  // true if start was called
150  bool started;
151 
152  nanosockets::async_reply_socket* object_socket;
153  nanosockets::async_reply_socket* control_socket;
154  nanosockets::publish_socket* publishsock;
155 
156  /// Internal callback for messages received from zeromq
157  bool callback(nanosockets::zmq_msg_vector& recv, nanosockets::zmq_msg_vector& reply);
158 
159  std::map<std::string, dispatch*> dispatch_map;
160  boost::mutex registered_object_lock;
161  std::map<size_t, std::shared_ptr<void>> registered_objects;
162  std::map<void*, size_t> inv_registered_objects; // A reverse map of the registered objects
163  object_factory_impl* object_factory;
164 
165  /**
166  * Object IDs are generated by using Knuth's LCG
167  * s' = s * 6364136223846793005 + 1442695040888963407 % (2^64)
168  * This guarantees a complete period of 2^64.
169  * We just need a good initialization.
170  */
171  size_t lcg_seed;
172 
173  bool comm_server_debug_mode = false;
174 
175 
176  /**
177  * Registers a mapping from a type name to a constructor call which returns
178  * a pointer to an instance of an object of the specified typename.
179  */
180  void register_constructor(std::string type_name,
181  std::function<std::shared_ptr<void>()> constructor_call);
182 
183  /**
184  * Returns the next freely available object ID
185  */
186  size_t get_next_object_id();
187 
188  /**
189  * Overload of register_object in the event that the object is already
190  * wrapped in a deleting wrapper. This function should not be used directly.
191  * If the object already exists, the existing ID is returned
192  */
193  inline size_t register_object(std::shared_ptr<void> object) {
194  boost::lock_guard<boost::mutex> guard(registered_object_lock);
195  if (inv_registered_objects.count(object.get())) {
196  return inv_registered_objects.at(object.get());
197  }
198  size_t id = get_next_object_id();
199  registered_objects.insert({id, object});
200  inv_registered_objects.insert({object.get(), id});
201  return id;
202  }
203 
204  struct object_map_key_cmp {
205  bool operator()(size_t i,
206  const std::pair<size_t, std::shared_ptr<void>>& map_elem) const {
207  return i < map_elem.first;
208  }
209 
210  bool operator()(const std::pair<size_t,std::shared_ptr<void>>& map_elem,
211  size_t i) const {
212  return map_elem.first < i;
213  }
214  };
215 
216  public:
217 
218  /**
219  * Constructs a comm server which uses remote communication
220  * via zookeeper/zeromq.
221  * \param zkhosts The zookeeper hosts to connect to. May be empty. If empty,
222  * the "alternate_bind_address" parameter must be a zeromq
223  * endpoint address to bind to.
224  * \param name The key name to wait for connections on. All remotes connect
225  * to this server on this name. If zkhosts is empty, this is
226  * ignored.
227  * \param alternate_bind_address The communication defaults to using an
228  * arbitrary TCP port. This can be changed to any URI format supported
229  * by zeroMQ.
230  * \param alternate_publish_address Only valid if zkhosts is empty.
231  * The address to publish server statuses on. If zookeeper
232  * is not used, all remotes should connect to this address
233  * to get server status. If not provided, one is
234  * generated automatically.
235  */
236  comm_server(std::vector<std::string> zkhosts,
237  std::string name,
238  std::string alternate_bind_address="",
239  std::string alternate_control_address="",
240  std::string alternate_publish_address="",
241  std::string secret_key="");
242  /// Destructor. Stops receiving messages, and closes all communication
243  ~comm_server();
244 
245 
246  /** Start receiving messages. Message processing occurs on a seperate
247  * thread, so this function returns immediately..
248  */
249  void start();
250 
251  /**
252  * Stops receiving messages. Has no effect if start() was not called
253  * before this.
254  */
255  void stop();
256 
257  /**
258  * Gets the address we are bound on
259  */
260  std::string get_bound_address();
261 
262  /**
263  * Gets the address where we receive control messages
264  */
265  std::string get_control_address();
266 
267  /**
268  * Gets the address on which you can subscribe to for status updates.
269  */
270  std::string get_status_address();
271 
272  /**
273  * Gets the zeromq context.. Deprecated. Returns NULL.
274  */
275  void* get_zmq_context();
276 
277  /**
278  * Publishes a message of the form "[status_type]: [message]".
279  * Since the client can filter messages, it is important to have a
280  * small set of possible status_type strings. For the purposes of the
281  * comm_server, we define the following:
282  *
283  * \li \b COMM_SERVER_INFO Used for Comm Server informational messages.
284  * \li \b COMM_SERVER_ERROR Used for Comm Server error messages.
285  *
286  * These status strings are defined in \ref core/system/cppipc/common/status_types.hpp
287  */
288  void report_status(std::string status_type, std::string message);
289 
290  /**
291  * Deletes an object of object ID objectid.
292  */
293  inline void delete_object(size_t objectid) {
294  boost::lock_guard<boost::mutex> guard(registered_object_lock);
295  if(registered_objects.count(objectid) != 1) {
296  logstream(LOG_DEBUG) << "Deleting already deleted object " << objectid << std::endl;
297  }
298  inv_registered_objects.erase(registered_objects[objectid].get());
299  logstream(LOG_DEBUG) << "Deleting Object " << objectid << std::endl;
300  registered_objects.erase(objectid);
301  }
302 
303  inline size_t num_registered_objects() {
304  boost::lock_guard<boost::mutex> guard(registered_object_lock);
305  return registered_objects.size();
306  }
307 
308  /**
309  * Registers a type to be managed by the comm_server. After registration of
310  * this type, remote machines will be able to create instances of this object
311  * via the comm_client's make_object function call.
312  */
313  template <typename T>
314  void register_type(std::function<T*()> constructor_call) {
315  T::__register__(*this);
316  register_constructor(T::__get_type_name__(),
317  [=]()->std::shared_ptr<void> {
318  return std::static_pointer_cast<void>(std::shared_ptr<T>(constructor_call()));
319  }
320  );
321  }
322 
323 
324  /**
325  * Registers a type to be managed by the comm_server. After registration of
326  * this type, remote machines will be able to create instances of this object
327  * via the comm_client's make_object function call.
328  */
329  template <typename T>
330  void register_type(std::function<std::shared_ptr<T>()> constructor_call) {
331  T::__register__(*this);
332  register_constructor(T::__get_type_name__(),
333  [=]()->std::shared_ptr<void> {
334  return std::static_pointer_cast<void>(constructor_call());
335  }
336  );
337  }
338 
339 
340 
341  /**
342  * Registers an object to be managed by the comm server, returning the new
343  * object ID. If the object already exists, the existing ID is returned.
344  */
345  template <typename T>
346  size_t register_object(std::shared_ptr<T> object) {
347  boost::lock_guard<boost::mutex> guard(registered_object_lock);
348  if (inv_registered_objects.count(object.get())) {
349  return inv_registered_objects.at(object.get());
350  }
351  size_t id = get_next_object_id();
352  logstream(LOG_DEBUG) << "Registering Object " << id << std::endl;
353  registered_objects.insert({id, std::static_pointer_cast<void>(object)});
354  inv_registered_objects.insert({object.get(), id});
355  return id;
356  }
357 
358  /**
359  * Returns an object ID of the object has been previously registere.d
360  * Returns (size_t)(-1) otherwise.
361  */
362  inline size_t find_object(void* object) {
363  boost::lock_guard<boost::mutex> guard(registered_object_lock);
364  if (inv_registered_objects.count((void*)object)) {
365  return inv_registered_objects.at((void*)object);
366  } else {
367  return (size_t)(-1);
368  }
369  }
370 
371  /**
372  * Returns a pointer to the object with a given object ID.
373  * Returns NULL on failure.
374  */
375  inline std::shared_ptr<void> get_object(size_t objectid) {
376  boost::lock_guard<boost::mutex> guard(registered_object_lock);
377  if (registered_objects.count(objectid) == 1) {
378  return registered_objects[objectid];
379  } else {
380  return nullptr;
381  }
382  }
383 
384  void delete_unused_objects(std::vector<size_t> object_ids,
385  bool active_list);
386 
387  /**
388  * \internal
389  * Registers a member function pointer. Do not use directly. Used by the
390  * REGISTER macros to allow the comm_server to maintain the mapping of
391  * member function pointers to names.
392  */
393  template <typename MemFn>
394  void register_function(MemFn fn, std::string function_name);
395 
396 
397  template <typename RetType, typename T, typename MemFn, typename... Args>
399 };
400 
401 } // cppipc
402 
403 
404 #include <core/system/cppipc/server/dispatch_impl.hpp>
405 
406 namespace cppipc {
407 template <typename MemFn>
408 void comm_server::register_function(MemFn fn, std::string function_name) {
409  if (dispatch_map.count(function_name) == 0) {
410  dispatch_map[function_name] = create_dispatch(fn);
411  logstream(LOG_EMPH) << "Registering function " << function_name << "\n";
412  }
413 }
414 };
415 #endif
#define LOG_EMPH
Definition: logger.hpp:100
#define logstream(lvl)
Definition: logger.hpp:276
void register_type(std::function< std::shared_ptr< T >()> constructor_call)
std::shared_ptr< void > get_object(size_t objectid)
dispatch * create_dispatch(MemFn memfn)
#define LOG_DEBUG
Definition: logger.hpp:102
void delete_object(size_t objectid)
size_t find_object(void *object)
size_t register_object(std::shared_ptr< T > object)
void register_type(std::function< T *()> constructor_call)
void register_function(MemFn fn, std::string function_name)