Turi Create  4.0
Technical Details: CPPIPC

Communication

The CPPIPC client and server communicates with each other via libfault's ZeroMQ sanity wrappers. Specifically, the libfault::async_request_socket (client) and the libfault::async_reply_socket (server) which implements a reliable asynchronous request-reply pattern. (i.e. Each request sent must be paired with a reply. Multiple requests can be sent simultaneously).

Libfault's wrappers were originally designed to watch for state changes on Zookeeper and react accordingly. For instance, a collection of servers could have running processes which implement a service name called "echo" (which echos messages). The service is implemented using the async_reply_socket which registers a key called "echo" on Zookeeper, which keep tracks of the TCP/IP address of each server process. A client (with an async_reply_socket) could then connect to the "echo" service and request messages can be sent to the "service" reliably. i.e. When servers go down, they will be unregistered on Zookeeper automatically and the clients will pick up those changes and adapt accordingly. Similarly when new servers come up. The restriction to simple request-reply patterns (and pub/sub patterns) allow reliability to be provided easily (as opposed to arbitrarily interesting protocols).

However, for the purposes of the current Unity engine, Zookeeper is quite unnecessary since service migration/faults are not a concern. Instead we would only like the reliable communication patterns implemented. As such, libfault::reply_socket, libfault::request_socket, libfault::async_reply_socket, and libfault::async_request_socket has been modified to operate without the use of Zookeeper.

The Libfault interface is moderately well documented, but the internals are generally not hugely well documented since a large part of it is in working around ZeroMQ's oddities. We will try to go into a few relevant details here, but you should look at ZeroMQ for the specifics.

zmq_msg_vector

To understand the deeper parts of the Comm layer, it is uesful to get a quick brief scan at ZeroMQ, and in particular ZeroMQ's message object (zmq_msg*) in http://api.zeromq.org/4-0:_start (for which I have a wrapper in libfault::zmq_msg_vector). The

ZeroMQ message object (zmq_msg_t) is basically a reference counted character buffer. It has several optimizations such as for small messages, it will handle it in-place without a malloc, and for larger messages, it will use a reference counted buffer on the heap. If you use the zmq_msg_* functions from ZeroMQ, there will be no issues with leaks and such. Only thing that you have to be careful of, is that zmq_msg_t should not be copied: i.e.

// given zmq_msg_t object msg1 and msg2
msg1 = msg2 // This is not safe!

You should always manage zmq_msg_t object as pointers.

The libfault::zmq_msg_vector object manages safely, an array of zmq_msg_t buffers with appropriate iteration and manipulation capabilities (most unusually pop_front, and push_back). The zmq_msg_vector is the key message object that is sent and received.

It is important to note that ZeroMQ provides a message protocol abstraction and not a stream protocol abstraction. i.e. if you send a zmq_msg_vector with 4 parts, the receiver will also receive a zmq_msg_vector with the same 4 parts. This makes the pop_front/push_front functionality extremely useful this this allows you to easily stack additional headers on the sender and strip them one by one on the receiver. (This is also how the DEALER-ROUTER sockets work to tag messages with their appropriate destinations. See ZeroMQ Message Envelopes).

REQUEST-REPLY Pattern

The REQUEST-REPLY pattern is one of the simplest ZeroMQ pattern. Basically, the Reply socket is a socket pattern where you can only perform alternating receive and send operations. And the REQUEST socket is exactly the opposite where you can only perform alternating send and receive operations. Thus the simplest pattern is where a REQUEST socket connects to a Reply socket:

  • Request socket sends a request message
  • Request socket then waits on reply
  • Reply socket receives the request message
  • Reply socket sends the reply
  • Request socket receives the reply and this can then repeat.

The cool part, is that multiple REQUEST sockets can connect to one Reply socket. The strict sequential recv/send pattern however means that the reply socket can only process one message at a time.

The REQUEST and REPLY socket are wrapped in libfault::request_socket and libfault::reply_socket respectively and appropriately handles situations such as message failures, malformed messages, timeouts, etc.

(DEALER-ROUTER Pattern) Async Request-Async Reply

The DEALER is a generalized REQUEST socket, and the ROUTER is a generalized REPLY socket. The key difference is that the DEALER and ROUTER sockets can send and receive arbitrarily without restrictions. DEALER can also connect and load balance requests to multiple servers, though we are not using that capability.

The DEALER (async request) and ROUTER (async reply) sockets are wrapped in libfault::async_request_socket and libfault::async_reply_socket respectively and appropriately handles situations such as message failures, malformed messages, timeouts, etc.

The async_request_socket has the following internal architecture:

  • When a "send" is called, the message is dropped into an inproc PULL/PUSH pair which collect messages sent from arbitrary threads to be processed by a single thread. The send then immediately returns a future.
  • The single thread (libfault::async_request_socket::pull_socket_callback) waits on the PUSH side of the inproc socket, picks up the message, and forwards it to the appropriate destination. (Reason for the pull/push to single thread thing is that in the "Zookeeper Service" case, this thread may actually need to create new DEALER sockets and it simplifies the logic substantially if the logic is centralized in one place.)
  • Received messages from the server wake up a second callback (lifault::async_request_socket::remote_message_callback) which finds the matching promise and resolves the original future

The async_reply_socket has a very similar internal architecture, but in the opposite direction.

  • Messages received from the ROUTER wake up a callback (libfault::async_reply_socket::wrapped_callback) which drop the message into a queue protected by a mutex/condition variable pair.)
  • A collection of handler threads waiting on the queue will wake up to wake handle the message and produce replies.
  • The replies from the handler threads are dropped into an inproc PULL/PUSH pair to collect messages into a single thread.
  • A callback waiting on the PULL socket (libfault::async_reply_socket::pull_socket_callback) forwards the messages back out through the ROUTER.

Object Registry

The CPPIPC Server (cppipc::comm_server) internally manages a list of object types and the knowledge of how to create instances of such objects. So when the client (cppipc::comm_client) asks to create a new object of a particular type, it can be instantiated on the server.

Types are registered by name of the base class and on the server, cppipc::comm_server::register_type is used to associate it with a construction function. The actual list is really managed by the object_factory (in cppipc/common/) which is in itself, a CPPIPC shared object (you will see there is a proxy, base, and impl).

Note
This introduces an interesting chicken and egg problem: how do you register the type of the object factory in the first place? This is basically done by some manual construction and registration of the object_factory in both the server and the client. This design allows for a very simple means of proxy construction on the client side: you will observe the implementation of cppipc::comm_client::make_object is only one line of code: directing the call to the object_factory_proxy which simply reuses the regular IPC communication channel to actually construct the object. This design means that the Client and Server do not require any special additional handling for internal coordination, but can simply use the same protocol as everything else.

Function Registry

In addition to a registry of types, we also need a registry of the functions that can be called. This function list is known to both the client and server and at the moment, no interesting "mangling" is performed. It is simply the full name if the function in the form <typename>::<functionname>.

Note
In an earlier attempt of the cppipc, the function name also includes the complete C++ name mangled type signature thus allowing different overloads the same function to have completely different registrations. This will be substantially more robust since any changes in the signature will result in a completely different function call making backward compatibility much easier to achieve. However, as it turns out due to various reasons, Mac-CLang has a different name mangling scheme than Linux thus when this is used, Mac can no longer communicate with Linux servers, and vice versa.

The server needs the function registry to know how to convert a function name to a function pointer. The client needs the function registry to perform the inverse: conversion from a function pointer to a function name (see The True Proxy Object to see why).

To make sure that both client and server are registered equivalently, the actual function registration function is implemented in the exported base class, as a register function. See the REGISTRATION_BEGIN and REGISTER macros. They basically implement a templated function of the form:

template <typename Registry>
static inline void __register__(Registry& reg) {
reg.register_function(&basic_counter_base::add, "basic_counter_base::add");
reg.register_function(&basic_counter_base::add_multiple, "basic_counter_base::add_multiple");
}

Then when the type is registered on the comm_server, it simply calls:

template <typename T>
void register_type(std::function<T*()> constructor_call) {
T::__register__(*this);
...
}

to get all the member functions.

On the client side, the function registration happens on construction of the object_proxy in pretty much the same way.

The True Proxy Object

The proxy object generated by GENERATE_INTERFACE_AND_PROXY is not the true proxy object, but is really only a light-weight wrapper around cppipc::object_proxy which actually implements the call logic. cppipc::object_proxy is a general purpose proxy which can wrap any interface class. For instance: in the basic_counter example in Implementing a Server and a Client Program, instead of using the basic_counter_proxy in the client example, I could equivalently write:

// Client Example
#include <iostream>
#include <core/system/cppipc/cppipc.hpp>
#include "basic_counter.hpp"
int main(int argc, char** argv) {
// Connects to the server
cppipc::comm_client client({}, "ipc:///tmp/cppipc_server_test");
// Creates a proxy object. This calls the factory on the server to create
// a basic_counter on the server. The proxy object on the client only
// maintains an object ID.
object_proxy<basic_counter_base> proxy(client);
//adds 50 to the counter
proxy.call(&basic_counter_base::add, 50);
//adds 12 * 5 to the counter
proxy.call(&basic_counter_base::add_multiple, 12, 5);
// prints the counter value
std::cout << "Counter Value: " << proxy.call(&basic_counter_base::get_val) << "\n";
// when proxy is destroyed, it destroys the object on the server
}

Now, one might ask why does proxy.call take a function pointer as the first argument and not simply the function name? Why not:

proxy.call("basic_counter_base::add", 50);

in which case, the client side does not need a function registry (Function Registry) The reason is that by using the function pointer, I can fully type check the argument types at compile time, and automatically cast to the types the server is expecting. This simplifies the process of exporting objects substantially since we do not need a run-time "schema". Essentially the "schema" is defined entirely by the type of the function, and we can rely on C++ typing to enforce the type signature.

Basic Call Serialization/Deserialization

The basic call serialization/deserialization code is in core/system/cppipc/client/issue.hpp and core/system/cppipc/server/dispatch_impl.hpp respectively. The call/issue (client) side is surprisingly simple.

The actual issue call is a variadic template function which tries to serialize the arguments of the call into the archive. The trick is that to ensure that types match up on the client side, it is important to cast the argument type right now (thus we need to know the Member Function pointer as well).

template <typename MemFn, typename... Args>
MemFn fn,
const Args&... args);

The issue function basically takes the argument types of fn, converting it to a tuple over the argument types (cppipc::function_args_to_tuple), which then calls into cppipc::detail::issue_disect which basically extracts of the left most argument, and the left most entry of the tuple, cast the argument to the appropriate type, serializing it, and calling cppipc::detail::issue_disect tail recursively until we run out of arguments.

The dispatch side is slightly more complicated since it most both deserialize, call the target function, and serialize the result. But it basically operates on the same principle. cppipc::execute_disect takes a tuple of the argument types and a list of arguments deserialized so far. It then extracts the left most entry from the tuple and deserializes it, and tail recurses until it runs out of arguments.

Serializing Object Pointers

Now, how about the "magic-trick" involving the ability to serialize pointers to shared objects and have then resurrect appropriately as either pointer to proxy objects (on the client) or pointer to implementation objects (on the server)? See Advanced Object Creation.

This is accomplished very simply by hacking the serialization library. It is useful to understand the serialization library technical details first.

The source for the serialization "hack" is in ipc_deserializer.hpp/ipc_deserializer.cpp. We will walk through this slowly.

Firstly, we want to catch every attempt to serialize/deserialize exported objects. To do this we must be able to identify these objects, and the easiest way to do so is to have them inherit from a common base class, and that will be cppipc::ipc_object_base which is simply an empty base class. The GENERATE_INTERFACE_AND_PROXY will automatically have the interface class inherit from cppipc::ipc_object_base thus allowing all descendents (proxy and implementation) to all be descendents of cppipc::ipc_object_base.

Next, we perform a partial specialization of the serialization classes serialize_impl and deserialize_impl (see Technical Details: Serialization for details). Note that we are going to intercept attempts at serializing pointers to the proxy/implementation classes. The enable_if line basically means that the code exists if and only if T* is convertible to cppipc::ipc_object_base*, i.e. T inherits from ipc_object_base. As a result attempts to serialize other regular pointers (which is an unsafe operation anyway), will not hit the following code.

template <typename OutArcType, typename T>
struct serialize_impl<OutArcType, T*, true> {
inline static
typename std::enable_if<std::is_convertible<T*, cppipc::ipc_object_base*>::value>::type
exec(OutArcType& oarc, const T* value) {
...
}
};
template <typename InArcType, typename T>
struct deserialize_impl<InArcType, T*, true> {
inline static
typename std::enable_if<std::is_convertible<T*, cppipc::ipc_object_base*>::value>::type
exec(InArcType& iarc, T*& value) {
...
}
};

Now, the key annoyance is that this code is the same on both the server and the client, so I have to know whether I am on the server side, or the client side and act appropriately. To do that, I rely on a collection of two functions, which are called by the cppipc::comm_client and cppipc::comm_server immediately before attempting to serialize/deserialize a call.

extern void set_deserializer_to_server(comm_server* server);
extern void set_deserializer_to_client(comm_client* client);

In the implementation of these (in ipc_deserializer.cpp), they basically set a thread local variable. This allows the functions above to be fully thread safe, and allows both server/client to reside on the same machine in different threads if necessary.

The serialize_impl and deserialize_impl structs can then use

extern void get_deserialization_type(comm_server** server, comm_client** client);

to figure out whether it is currently working on the server-side or the client-side.

Serializing Object Pointers-Serializing

Serialization is simple,

if (server) {
// server to client message is a pair of "is new object" and "object ID"
oarc << get_server_object_id(server, value);
} else {
oarc << (*value);
}

The proxy object has a built in save/load function that simply serializes the object ID. For an impl object however, I will need to ask the comm_server to find the object ID.

Serializing Object Pointers-Deserializing

Deserializing is slightly more involving. On the server-side, I have to search on the comm_server object for the object matching the object ID,

if (server) {
size_t object_id;
iarc >> object_id;
void* obj = cppipc::detail::get_server_object_ptr(server, object_id);
if (obj == NULL) {
throw std::to_string(object_id) + " Object not found";
}
value = reinterpret_cast<T*>(obj);
}

On the client side, the proxy object is constructed with the object_id received. (The proxy_object's class name is always typedef'd to 'proxy_object_type')

size_t object_id;
iarc >> object_id;
value = new typename
std::remove_pointer<T>::type::proxy_object_type(*client, false,
object_id);

Limitations

The current serializer/deserializer was designed for performance in mind and is meant to be used between "trusted" and equivalent systems. We do not have "type-aware" or "robust" serialization/deserialization. We rely heavily on the client and server agreeing on the function argument types, and having serialization behave the same way. If there are any malformed messages, or if for whatever reason the client/server function types disagree, the serializer will crash and burn.