Blocking safe by default (Implementation Details)
As described here, ServiceTalk by default allows users to write blocking code when interacting with ServiceTalk. This document describes the details of the implementation and is addressed to audiences who intend to know the internals of how this is achieved.
It is not required to read this document if you just want to use ServiceTalk. |
Asynchronous Sources
Everything inside ServiceTalk is somehow connected to one of the three asynchronous sources, viz., Publisher
, Single
and Completable
. Since these sources are the building blocks for program control flow if they provide safety
guarantees for blocking code execution these guarantees apply outside the scope of preventing blocking code from
executing on event loop thread. This approach is designed to make the task of ensuring we don’t block the event loop
threads less error prone, and also allows for certain optimizations around thread context propagation and re-use.
Threads and asynchronous sources
An asynchronous source has to decide at two points about which thread will be used:
-
Thread which is used to do the actual work related to a source. eg: for an HTTP client, the work is to send an HTTP request and read the HTTP response.
-
Thread which is used to interact with the
Subscriber
corresponding to its `Subscription`s.
Part 1. above is not governed by the
ReactiveStreams specification
and hence sources are free to use any thread. ServiceTalk typically will use Netty’s EventLoop
to do the actual work.
Part 2. defines all the interactions using the ReactiveStreams specifications, i.e. all methods in Publisher
,
Subscriber
and Subscription
.
ServiceTalk concurrency APIs defines which thread will be used by any asynchronous source for Part 2.
Executor and asynchronous sources
ServiceTalk enforces that each asynchronous source MUST be associated with an Executor
which is used to interact
with its Subscriber
. The below diagram illustrates the interaction between an asynchronous source, its Subscriber
,
its operators, and the Executor
.
The above interaction has a few important points:
-
All operators inherit the
Executor
from the source, unless overridden explicitly by the user. -
A
SignalOffloader
is chosen when aSubscriber
subscribes to a source decorated by zero or more operators. -
The chosen
SignalOffloader
is passed to all operators and then to the source, perSubscriber
.
The above interaction provides all operators and the source an opportunity to piggy-back on the same Executor
for all
interactions with the Subscriber
. Any operator in a chain has two options while accepting a Subscriber
:
-
Wrap the
Subscriber
such that allSubscriber
andSubscription
method calls are offloaded to the chosenExecutor
. This mode is for operators that process signals from the originalPublisher
asynchronously. -
Do not wrap the
Subscriber
and forward allSubscriber
andSubscription
methods directly on the calling thread to avoid the overhead of wrapping each and everySubscriber
andSubscription
in a chain. This mode is for operators that process signals from the originalPublisher
synchronously.
Taking the same example from here
client.request() (1)
.map(resp -> {
return resp.toString(); (2)
})
.flatMap(stringResp -> { (3)
return client2.request(stringResp);
})
.filter(stringResp -> {
stringResp.equals("Hello World"); (4)
});
1 | A hypothetical client which provides a request() method that returns a Single<Response> . |
2 | Converting the response to a String . |
3 | Call another client2 that provides a new Single which is returned from flatMap . |
4 | Only allow "Hello World" messages to be emitted. |
In the above example the operators map
and filter
will not wrap Subscriber
and Subscription
since they do not do
any asynchronous work. However, flatmap
will wrap Subscriber
and Subscription
to offload the calls to the chosen
SignalOffloader
.
Why should we wrap Subscriber
and Subscription
?
There are two places we would wrap Subscriber
and Subscription
:
-
Original asynchronous sources.
-
Asynchronous operators. eg:
flatMap
Since every asynchronous source is associated with an Executor
, it is required to use the Executor
for interacting
with Subscriber
and Subscription
.
If we do not wrap for asynchronous operators then in the above example, operator filter
(4) will be invoked in the
Executor
defined by client2
inside the flatmap
. This may lead to inadvertent and not initially obvious
blocking of an event loop thread. Consider a scenario where client2
executes user code on an event loop thread, but
the original client
executes user code on an application thread (which allows blocking). In this scenario it may look
like blocking code is OK at point (4) above, but that will actually result in blocking `client2’s event loop thread.