Blocking safe by default (Implementation Details)

As described here, ServiceTalk by default allows users to write blocking code when interacting with ServiceTalk. This document describes the details of the implementation and is addressed to audiences who intend to know the internals of how this is achieved.

It is not required to read this document if you just want to use ServiceTalk.

Asynchronous Sources

Everything inside ServiceTalk is somehow connected to one of the three asynchronous sources, viz., Publisher, Single and Completable. Since these sources are the building blocks for program control flow if they provide safety guarantees for blocking code execution these guarantees apply outside the scope of preventing blocking code from executing on event loop thread. This approach is designed to make the task of ensuring we don’t block the event loop threads less error prone, and also allows for certain optimizations around thread context propagation and re-use.

Threads and asynchronous sources

An asynchronous source has to decide at two points about which thread will be used:

  1. Thread which is used to do the actual work related to a source. eg: for an HTTP client, the work is to send an HTTP request and read the HTTP response.

  2. Thread which is used to interact with the Subscriber corresponding to its `Subscription`s.

Part 1. above is not governed by the ReactiveStreams specification and hence sources are free to use any thread. ServiceTalk typically will use Netty’s EventLoop to do the actual work. Part 2. defines all the interactions using the ReactiveStreams specifications, i.e. all methods in Publisher, Subscriber and Subscription.

ServiceTalk concurrency APIs defines which thread will be used by any asynchronous source for Part 2.

Executor and asynchronous sources

ServiceTalk enforces that each asynchronous source MUST be associated with an Executor which is used to interact with its Subscriber. The below diagram illustrates the interaction between an asynchronous source, its Subscriber, its operators, and the Executor.

Executor inference

The above interaction has a few important points:

  • All operators inherit the Executor from the source, unless overridden explicitly by the user.

  • A SignalOffloader is chosen when a Subscriber subscribes to a source decorated by zero or more operators.

  • The chosen SignalOffloader is passed to all operators and then to the source, per Subscriber.

The above interaction provides all operators and the source an opportunity to piggy-back on the same Executor for all interactions with the Subscriber. Any operator in a chain has two options while accepting a Subscriber:

  • Wrap the Subscriber such that all Subscriber and Subscription method calls are offloaded to the chosen Executor. This mode is for operators that process signals from the original Publisher asynchronously.

  • Do not wrap the Subscriber and forward all Subscriber and Subscription methods directly on the calling thread to avoid the overhead of wrapping each and every Subscriber and Subscription in a chain. This mode is for operators that process signals from the original Publisher synchronously.

Taking the same example from here

 client.request() (1)
       .map(resp -> {
            return resp.toString(); (2)
       .flatMap(stringResp -> { (3)
            return client2.request(stringResp);
       .filter(stringResp -> {
            stringResp.equals("Hello World");  (4)
1 A hypothetical client which provides a request() method that returns a Single<Response>.
2 Converting the response to a String.
3 Call another client2 that provides a new Single which is returned from flatMap.
4 Only allow "Hello World" messages to be emitted.

In the above example the operators map and filter will not wrap Subscriber and Subscription since they do not do any asynchronous work. However, flatmap will wrap Subscriber and Subscription to offload the calls to the chosen SignalOffloader.

Why should we wrap Subscriber and Subscription?

There are two places we would wrap Subscriber and Subscription:

  1. Original asynchronous sources.

  2. Asynchronous operators. eg: flatMap

Since every asynchronous source is associated with an Executor, it is required to use the Executor for interacting with Subscriber and Subscription.

If we do not wrap for asynchronous operators then in the above example, operator filter (4) will be invoked in the Executor defined by client2 inside the flatmap. This may lead to inadvertent and not initially obvious blocking of an event loop thread. Consider a scenario where client2 executes user code on an event loop thread, but the original client executes user code on an application thread (which allows blocking). In this scenario it may look like blocking code is OK at point (4) above, but that will actually result in blocking `client2’s event loop thread.