Turi Create  4.0
cppipc::comm_client Class Reference

#include <core/system/cppipc/client/comm_client.hpp>

Public Member Functions

 comm_client (std::vector< std::string > zkhosts, std::string name, size_t num_tolerable_ping_failures=(size_t)(-1), std::string alternate_control_address="", std::string alternate_publish_address="", const std::string public_key="", const std::string secret_key="", const std::string server_public_key="", bool ops_interruptible=false)
 
 comm_client (std::string name, void *zmq_ctx)
 
void set_server_alive_watch_pid (int32_t pid)
 
void init (bool ops_interruptible=false)
 
reply_status start ()
 
 ~comm_client ()
 
void stop ()
 
size_t make_object (std::string object_type_name)
 
std::string ping (std::string)
 
void delete_object (size_t objectid)
 
size_t incr_ref_count (size_t object_id)
 
void status_callback_thread_function ()
 
void stop_status_callback_thread ()
 
void start_status_callback_thread ()
 
void add_status_watch (std::string watch_prefix, std::function< void(std::string)> callback)
 
void remove_status_watch (std::string watch_prefix)
 
void clear_status_watch ()
 
void stop_ping_thread ()
 
int send_deletion_list (const std::vector< size_t > &object_ids)
 
template<typename MemFn >
void register_function (MemFn f, std::string function_string)
 
template<typename MemFn , typename... Args>
detail::member_function_return_type< MemFn >::type call (size_t objectid, MemFn f, const Args &... args)
 

Public Attributes

boost::thread * status_callback_thread = NULL
 
boost::mutex status_buffer_mutex
 

Detailed Description

The client side of the IPC communication system.

The comm_client manages the serialization, and the calling of functions on remote machines. The comm_client and the comm_server reaches each other through the use of zookeeper. If both client and server connect to the same zookeeper host, and on construction are provided the same "name", they are connected.

The comm_client provides communication capability for the object_proxy objects. Here, we will go through an example of proxying the file_write class described in the comm_server documentation.

Basic Utilization

Given a base class file_write_base, which is implemented on the server side, we can construct on the client side, an object_proxy object which creates and binds to an implementation of file_write_base on the server side, and allows function calls across the network.

We first repeat the description for the base class here:

class file_write_base {
public:
virtual int open(std::string s) = 0;
virtual void write(std::string s) = 0;
virtual void close() = 0;
virtual ~file_write_base() {}
REGISTRATION_BEGIN(file_write_base)
REGISTER(file_write_base::open)
REGISTER(file_write_base::write)
REGISTER(file_write_base::close)
REGISTRATION_END
};

We can create an object_proxy by templating over the base class, and providing the comm_client object in the constructor.

object_proxy<file_write_base> proxy(client);

This will create on the remote machine, an instance of the file_write_impl object, and the object_proxy class provides the capability to call functions on the remote object. For instance, to call the "open" function, followed by some "writes" and "close".

int ret = proxy.call(&file_write_base::open, "log.txt");
proxy.call(&file_write_base::write, "hello");
proxy.call(&file_write_base::write, "world");
proxy.call(&file_write_base::close);

The return type of the proxy.call function will match the return type of the member function pointer provided. For instance, &file_write_base::open returns an integer, and the result is forwarded across the network and returned.

On destruction of the proxy object, the remote object is also deleted.

Wrapping the Proxy Object

It might be convenient in many ways to wrap the object_proxy in a way to make it easy to use. This is a convenient pattern that is very useful.

For instance, a proxy wrapper for the file_write_base object might look like:

class file_write_proxy: public file_write_base {
object_proxy<file_write_base> proxy;
public:
file_write_proxy(comm_client& comm): proxy(comm) { }
int open(std::string s) {
return proxy.call(&file_write_base::open, s);
}
void write(std::string s) {
return proxy.call(&file_write_base::write , s);
}
void close() {
return proxy.call(&file_write_base::close);
}
};

Preprocessor Magic

To facilitate the creation of base interfaces, calling REGISTER macros appropriately, and implementing the proxy wrapper, we provide the GENERATE_INTERFACE, GENERATE_PROXY and GENERATE_INTERFACE_AND_PROXY magic macros.

For instance, to generate the proxy and the base class for the above file_write_base object, we can write

GENERATE_INTERFACE_AND_PROXY(file_write_base, file_write_proxy,
(int, open, (std::string))
(void, write, (std::string))
(void, close, )
)

This is the recommended way to create proxy and base objects since this allows a collection of interesting functionality to be injected. For instance, this will allow functions to take base pointers as arguments and return base pointers (for instance file_write_base*). On the client side, proxy objects are serialized in a way so that the server side uses the matching implementation instance. When an object is returned, the object is registered on the server side and converted to a new proxy object on the client side. a

Implementation Details

Many other details regarding safety when interfaces, or interface argument type modifications are described in the comm_server documentation.

The comm client internally maintains the complete mapping of all member function pointers to strings. The object_proxy class then has the simple task of just maintaining the object_ids: i.e. what remote object does it connect to.

There is a special "root object" which manages all "special" tasks that operate on the comm_server itself. This root object always has object ID 0 and is the object_factory_base. This is implemented on the server side by object_factory_impl, and on the client side as object_factory_proxy. The comm_client exposes the object_factory functionality as member functions in the comm_client itself. These are make_object(), ping() and delete_object()

Definition at line 198 of file comm_client.hpp.

Constructor & Destructor Documentation

◆ comm_client() [1/2]

cppipc::comm_client::comm_client ( std::vector< std::string >  zkhosts,
std::string  name,
size_t  num_tolerable_ping_failures = (size_t)(-1),
std::string  alternate_control_address = "",
std::string  alternate_publish_address = "",
const std::string  public_key = "",
const std::string  secret_key = "",
const std::string  server_public_key = "",
bool  ops_interruptible = false 
)

Constructs a comm client which uses remote communication via zookeeper/zeromq. The client may find the remote server via either zookeeper (in which case zkhosts must be a list of zookeeper servers, and name must be a unique key value), or you can provide the address explicitly. Note that if a server is listening via zookeeper, the client MUST connect via zookeeper; providing the server's actual tcp bind address will not work. And similarly if the server is listening on an explicit zeromq endpoint address and not using zookeeper, the client must connect directly also without using zookeeper.

After construction, authentication methods can be added then start() must be called to initiate the connection.

Parameters
zkhostsThe zookeeper hosts to connect to. May be empty. If empty, the "name" parameter must be a zeromq endpoint address to bind to.
nameThe key name to connect to. This must match the "name" argument on construction of the remote's comm_server. If zkhosts is empty, this must be a valid zeromq endpoint address.
num_tolerable_ping_failuresThe number of allowable consecutive ping failures before the server is considered dead.
alternate_publish_addressThis should match the "alternate_publish_address" argument on construction of the remote's comm_server. If zkhosts is empty, this must be a valid zeromq endpoint address. This can be empty, in which case the client will ask the server for the appropriate address. It is recommended that this is not specified.

◆ comm_client() [2/2]

cppipc::comm_client::comm_client ( std::string  name,
void *  zmq_ctx 
)

Constructs an inproc comm_client. The inproc comm_client and comm_server are required to have the same zmq_ctx.

Parameters
nameThe inproc socket address, must start with inproc://

◆ ~comm_client()

cppipc::comm_client::~comm_client ( )

Destructor. Calls stop if not already called

Member Function Documentation

◆ add_status_watch()

void cppipc::comm_client::add_status_watch ( std::string  watch_prefix,
std::function< void(std::string)>  callback 
)

Adds a callback for server status messages. The callback will receive all messages matching the specified prefix. For instance:

client.add_status_watch("A", callback);

will match all the following server messages

server.report_status("A", "hello world"); // emits A: hello world
server.report_status("ABC", "hello again"); // emits ABC: hello again

On the other hand

client.add_status_watch("A:", callback);

will only match the first.

Callbacks should be processed relatively quickly and should be thread safe. Multiple callbacks may be processed simultaneously in different threads. Callback function also should not call add_status_watch or remove_status_watch, or a deadlock may result.

If multiple callbacks are registered for exactly the same prefix, only the last callback is recorded.

Note
The current prefix checking implementation is not fast, and is simply linear in the number of callbacks registered.

◆ call()

template<typename MemFn , typename... Args>
detail::member_function_return_type<MemFn>::type cppipc::comm_client::call ( size_t  objectid,
MemFn  f,
const Args &...  args 
)
inline

Calls a remote function returning the result. The return type is the actual return value. May throw an exception of type reply_status on failure.

NOTE: ONLY the main thread can call this. If this becomes untrue, some invariants will be violated (only one thread is allowed to change the currently running command).

Definition at line 586 of file comm_client.hpp.

◆ clear_status_watch()

void cppipc::comm_client::clear_status_watch ( )

Clears all status callbacks. Note that status callbacks may still be called even after this function returns. To ensure complete removal of the function, stop_status_callback_thread() and start_status_callback_thread() must be called.

◆ delete_object()

void cppipc::comm_client::delete_object ( size_t  objectid)

Delete object. Deletes the object with ID objectid on the remote machine.

Note
This call redirects to the object_factory_proxy

◆ incr_ref_count()

size_t cppipc::comm_client::incr_ref_count ( size_t  object_id)

Functions for manipulating local reference counting data structure

◆ init()

void cppipc::comm_client::init ( bool  ops_interruptible = false)

Initialize the comm_client, called right inside the constructor.

◆ make_object()

size_t cppipc::comm_client::make_object ( std::string  object_type_name)

Creates an object of a given type on the remote machine. Returns an object ID. If return value is (-1), this is a failure.

Note
This call redirects to the object_factory_proxy

◆ ping()

std::string cppipc::comm_client::ping ( std::string  )

Ping test. Sends a string to the remote system, and replies with the same string.

Note
This call redirects to the object_factory_proxy

◆ register_function()

template<typename MemFn >
void cppipc::comm_client::register_function ( MemFn  f,
std::string  function_string 
)
inline

Registers a member function which then can be used in the call() function

Definition at line 544 of file comm_client.hpp.

◆ remove_status_watch()

void cppipc::comm_client::remove_status_watch ( std::string  watch_prefix)

Removes a status callback for a given prefix. Note that the function associated with the prefix may still be called even after this function returns. To ensure complete removal of the function, stop_status_callback_thread() and start_status_callback_thread() must be called.

◆ send_deletion_list()

int cppipc::comm_client::send_deletion_list ( const std::vector< size_t > &  object_ids)

Tries to synchronize the list of tracked objects with the server by sending a list of objects to be deleted. Returns 0 on success, -1 on failure.

◆ set_server_alive_watch_pid()

void cppipc::comm_client::set_server_alive_watch_pid ( int32_t  pid)

Sets a pid to watch. If this pid goes away, server is considered dead. This is a more robust way compared to "pings" for local interprocess communication. Set to 0 to disable.

◆ start()

reply_status cppipc::comm_client::start ( )

Initializes connections with the servers Must be called prior to creation of any client objects. Returns reply_status::OK on success, and an error code failure. Failure could be caused by an inability to connect to the server, or could also be due to authentication errors.

◆ start_status_callback_thread()

void cppipc::comm_client::start_status_callback_thread ( )

Starts the status callback thread if not already started.

◆ status_callback_thread_function()

void cppipc::comm_client::status_callback_thread_function ( )

The function which implements the thread which issues the messages to the status callback handlers.

◆ stop()

void cppipc::comm_client::stop ( )

Stops the comm client object. Closes all open sockets

◆ stop_ping_thread()

void cppipc::comm_client::stop_ping_thread ( )

Stops the ping thread.

◆ stop_status_callback_thread()

void cppipc::comm_client::stop_status_callback_thread ( )

Terminates the thread which calls the callback handlers. Unprocessed messages are dropped.

Member Data Documentation

◆ status_buffer_mutex

boost::mutex cppipc::comm_client::status_buffer_mutex

The lock / cv pair around the ping_thread_done value

Definition at line 451 of file comm_client.hpp.

◆ status_callback_thread

boost::thread* cppipc::comm_client::status_callback_thread = NULL

This thread is used to serve the status callbacks. This prevents status callback locks from blocking the server

Definition at line 447 of file comm_client.hpp.


The documentation for this class was generated from the following file: