Turi Create
4.0
|
#include <core/system/cppipc/client/comm_client.hpp>
Public Member Functions | |
comm_client (std::vector< std::string > zkhosts, std::string name, size_t num_tolerable_ping_failures=(size_t)(-1), std::string alternate_control_address="", std::string alternate_publish_address="", const std::string public_key="", const std::string secret_key="", const std::string server_public_key="", bool ops_interruptible=false) | |
comm_client (std::string name, void *zmq_ctx) | |
void | set_server_alive_watch_pid (int32_t pid) |
void | init (bool ops_interruptible=false) |
reply_status | start () |
~comm_client () | |
void | stop () |
size_t | make_object (std::string object_type_name) |
std::string | ping (std::string) |
void | delete_object (size_t objectid) |
size_t | incr_ref_count (size_t object_id) |
void | status_callback_thread_function () |
void | stop_status_callback_thread () |
void | start_status_callback_thread () |
void | add_status_watch (std::string watch_prefix, std::function< void(std::string)> callback) |
void | remove_status_watch (std::string watch_prefix) |
void | clear_status_watch () |
void | stop_ping_thread () |
int | send_deletion_list (const std::vector< size_t > &object_ids) |
template<typename MemFn > | |
void | register_function (MemFn f, std::string function_string) |
template<typename MemFn , typename... Args> | |
detail::member_function_return_type< MemFn >::type | call (size_t objectid, MemFn f, const Args &... args) |
Public Attributes | |
boost::thread * | status_callback_thread = NULL |
boost::mutex | status_buffer_mutex |
The client side of the IPC communication system.
The comm_client manages the serialization, and the calling of functions on remote machines. The comm_client and the comm_server reaches each other through the use of zookeeper. If both client and server connect to the same zookeeper host, and on construction are provided the same "name", they are connected.
The comm_client provides communication capability for the object_proxy objects. Here, we will go through an example of proxying the file_write class described in the comm_server documentation.
Given a base class file_write_base, which is implemented on the server side, we can construct on the client side, an object_proxy object which creates and binds to an implementation of file_write_base on the server side, and allows function calls across the network.
We first repeat the description for the base class here:
We can create an object_proxy by templating over the base class, and providing the comm_client object in the constructor.
This will create on the remote machine, an instance of the file_write_impl object, and the object_proxy class provides the capability to call functions on the remote object. For instance, to call the "open" function, followed by some "writes" and "close".
The return type of the proxy.call function will match the return type of the member function pointer provided. For instance, &file_write_base::open returns an integer, and the result is forwarded across the network and returned.
On destruction of the proxy object, the remote object is also deleted.
It might be convenient in many ways to wrap the object_proxy in a way to make it easy to use. This is a convenient pattern that is very useful.
For instance, a proxy wrapper for the file_write_base object might look like:
To facilitate the creation of base interfaces, calling REGISTER macros appropriately, and implementing the proxy wrapper, we provide the GENERATE_INTERFACE, GENERATE_PROXY and GENERATE_INTERFACE_AND_PROXY magic macros.
For instance, to generate the proxy and the base class for the above file_write_base object, we can write
This is the recommended way to create proxy and base objects since this allows a collection of interesting functionality to be injected. For instance, this will allow functions to take base pointers as arguments and return base pointers (for instance file_write_base*). On the client side, proxy objects are serialized in a way so that the server side uses the matching implementation instance. When an object is returned, the object is registered on the server side and converted to a new proxy object on the client side. a
Many other details regarding safety when interfaces, or interface argument type modifications are described in the comm_server documentation.
The comm client internally maintains the complete mapping of all member function pointers to strings. The object_proxy class then has the simple task of just maintaining the object_ids: i.e. what remote object does it connect to.
There is a special "root object" which manages all "special" tasks that operate on the comm_server itself. This root object always has object ID 0 and is the object_factory_base. This is implemented on the server side by object_factory_impl, and on the client side as object_factory_proxy. The comm_client exposes the object_factory functionality as member functions in the comm_client itself. These are make_object(), ping() and delete_object()
Definition at line 198 of file comm_client.hpp.
cppipc::comm_client::comm_client | ( | std::vector< std::string > | zkhosts, |
std::string | name, | ||
size_t | num_tolerable_ping_failures = (size_t)(-1) , |
||
std::string | alternate_control_address = "" , |
||
std::string | alternate_publish_address = "" , |
||
const std::string | public_key = "" , |
||
const std::string | secret_key = "" , |
||
const std::string | server_public_key = "" , |
||
bool | ops_interruptible = false |
||
) |
Constructs a comm client which uses remote communication via zookeeper/zeromq. The client may find the remote server via either zookeeper (in which case zkhosts must be a list of zookeeper servers, and name must be a unique key value), or you can provide the address explicitly. Note that if a server is listening via zookeeper, the client MUST connect via zookeeper; providing the server's actual tcp bind address will not work. And similarly if the server is listening on an explicit zeromq endpoint address and not using zookeeper, the client must connect directly also without using zookeeper.
After construction, authentication methods can be added then start() must be called to initiate the connection.
zkhosts | The zookeeper hosts to connect to. May be empty. If empty, the "name" parameter must be a zeromq endpoint address to bind to. |
name | The key name to connect to. This must match the "name" argument on construction of the remote's comm_server. If zkhosts is empty, this must be a valid zeromq endpoint address. |
num_tolerable_ping_failures | The number of allowable consecutive ping failures before the server is considered dead. |
alternate_publish_address | This should match the "alternate_publish_address" argument on construction of the remote's comm_server. If zkhosts is empty, this must be a valid zeromq endpoint address. This can be empty, in which case the client will ask the server for the appropriate address. It is recommended that this is not specified. |
cppipc::comm_client::comm_client | ( | std::string | name, |
void * | zmq_ctx | ||
) |
Constructs an inproc comm_client. The inproc comm_client and comm_server are required to have the same zmq_ctx.
name | The inproc socket address, must start with inproc:// |
cppipc::comm_client::~comm_client | ( | ) |
Destructor. Calls stop if not already called
void cppipc::comm_client::add_status_watch | ( | std::string | watch_prefix, |
std::function< void(std::string)> | callback | ||
) |
Adds a callback for server status messages. The callback will receive all messages matching the specified prefix. For instance:
will match all the following server messages
On the other hand
will only match the first.
Callbacks should be processed relatively quickly and should be thread safe. Multiple callbacks may be processed simultaneously in different threads. Callback function also should not call add_status_watch or remove_status_watch, or a deadlock may result.
If multiple callbacks are registered for exactly the same prefix, only the last callback is recorded.
|
inline |
Calls a remote function returning the result. The return type is the actual return value. May throw an exception of type reply_status on failure.
NOTE: ONLY the main thread can call this. If this becomes untrue, some invariants will be violated (only one thread is allowed to change the currently running command).
Definition at line 586 of file comm_client.hpp.
void cppipc::comm_client::clear_status_watch | ( | ) |
Clears all status callbacks. Note that status callbacks may still be called even after this function returns. To ensure complete removal of the function, stop_status_callback_thread() and start_status_callback_thread() must be called.
void cppipc::comm_client::delete_object | ( | size_t | objectid | ) |
Delete object. Deletes the object with ID objectid on the remote machine.
size_t cppipc::comm_client::incr_ref_count | ( | size_t | object_id | ) |
Functions for manipulating local reference counting data structure
void cppipc::comm_client::init | ( | bool | ops_interruptible = false | ) |
Initialize the comm_client, called right inside the constructor.
size_t cppipc::comm_client::make_object | ( | std::string | object_type_name | ) |
Creates an object of a given type on the remote machine. Returns an object ID. If return value is (-1), this is a failure.
std::string cppipc::comm_client::ping | ( | std::string | ) |
Ping test. Sends a string to the remote system, and replies with the same string.
|
inline |
Registers a member function which then can be used in the call() function
Definition at line 544 of file comm_client.hpp.
void cppipc::comm_client::remove_status_watch | ( | std::string | watch_prefix | ) |
Removes a status callback for a given prefix. Note that the function associated with the prefix may still be called even after this function returns. To ensure complete removal of the function, stop_status_callback_thread() and start_status_callback_thread() must be called.
int cppipc::comm_client::send_deletion_list | ( | const std::vector< size_t > & | object_ids | ) |
Tries to synchronize the list of tracked objects with the server by sending a list of objects to be deleted. Returns 0 on success, -1 on failure.
void cppipc::comm_client::set_server_alive_watch_pid | ( | int32_t | pid | ) |
Sets a pid to watch. If this pid goes away, server is considered dead. This is a more robust way compared to "pings" for local interprocess communication. Set to 0 to disable.
reply_status cppipc::comm_client::start | ( | ) |
Initializes connections with the servers Must be called prior to creation of any client objects. Returns reply_status::OK on success, and an error code failure. Failure could be caused by an inability to connect to the server, or could also be due to authentication errors.
void cppipc::comm_client::start_status_callback_thread | ( | ) |
Starts the status callback thread if not already started.
void cppipc::comm_client::status_callback_thread_function | ( | ) |
The function which implements the thread which issues the messages to the status callback handlers.
void cppipc::comm_client::stop | ( | ) |
Stops the comm client object. Closes all open sockets
void cppipc::comm_client::stop_ping_thread | ( | ) |
Stops the ping thread.
void cppipc::comm_client::stop_status_callback_thread | ( | ) |
Terminates the thread which calls the callback handlers. Unprocessed messages are dropped.
boost::mutex cppipc::comm_client::status_buffer_mutex |
The lock / cv pair around the ping_thread_done value
Definition at line 451 of file comm_client.hpp.
boost::thread* cppipc::comm_client::status_callback_thread = NULL |
This thread is used to serve the status callbacks. This prevents status callback locks from blocking the server
Definition at line 447 of file comm_client.hpp.