Blocking safe by default
This document elaborates how asynchronous sources provide ways to offload. General philosophy of writing blocking code while using ServiceTalk is explained here and is a recommended read.
Execution chain
Any ServiceTalk asynchronous source (Publisher
, Single
and Completable
) offers multiple operators that can be
applied to the source. Any instance of such a source and all the operators applied to it before it is subscribed, is
called an execution chain.
Data and control flow in an execution chain
The image below details the flow of data (Subscriber
methods) and control (Publisher
and Subscription
methods)
messages. Invocations in either direction may be executed on an event loop thread and hence needs to be protected. In
case new asynchronous sources are generated/received inside operators, they follow the same model and hence are removed
for brevity.
As shown in the above picture, there are inherently two directions (data and control) of information flow for an execution chain and these signals can be triggered in parallel.
By default, in ServiceTalk, signals are not executed on an event loop thread, but instead executed using an Executor provided by the application in the order they are received.
Implications
The implication of the above approach is the following:
Users can execute blocking code inside an execution chain, provided they are not waiting for another data or control message in the same execution chain.
Look at Pitfalls section for examples elaborating this implication.
Executor Affinity
In ServiceTalk different protocols will provide request processing by means of an asynchronous source, eg: Single
.
In such cases, this processing will be expressed as a chain of operators on the asynchronous source. Configuring limits
on resources such as a thread pool can be challenging if each operator in the chain may run on different threads.
This would mean applying limits to such a thread pool would be sensitive to program’s execution flow and may change over
time. In order to make thread pool configuration easier, ServiceTalk provides executor affinity for all asynchronous
sources.
Affinity for an asynchronous source
Let us assume that following expresses request processing:
client.request() (1)
.map(resp -> {
return doSomeBlockingWorkAndConvertToString(resp); (2)
})
.filter(stringResp -> {
doMoreBlockingWorkToFilterString(stringResp); (3)
});
1 | A hypothetical client which provides a request() method that returns a Single<Response> . |
2 | Some blocking work done inside map() operator by this function, which provides a String as a response. |
3 | Some more blocking work inside filter() operator. |
In the above example user provided functions inside both map()
and filter()
will be invoked using the specified
Executor
.
Affinity across asynchronous sources
Executor
affinity across asynchronous sources is not guaranteed even if they are part of the same processing chain.
Let us assume that following expresses request processing:
client.request() (1)
.map(resp -> {
return doSomeBlockingWorkAndConvertToString(resp); (2)
})
.flatMap(stringResp -> {
return client2.request(stringResp) (3)
.map(resp -> {
doSomeBlockingWorkAgain(); (4)
});
})
.filter(stringResp -> {
doMoreBlockingWorkToFilterString(stringResp); (5)
});
1 | A hypothetical client which provides a request() method that returns a Single<Response> . |
2 | Some blocking work done inside map() operator by this function, which provides a String as a response. |
3 | Call another client2 that provides a new Single which is returned from flatmap . |
4 | Do some blocking work inside map() operator for the nested Single returned by Step (3). |
5 | Some more blocking work inside filter() operator. |
In the above example (2), (3) and (5) will run on one Executor
whereas (4) will run on a different Executor
.
Pitfalls
As defined in Execution chain, ServiceTalk sequences events in data and control path of processing as if they were done on the same thread. Since, data and control events may happen in parallel, there is always a chance for user code to deadlock if they are executed in sequence.
CountDownLatch latch = new CountDownLatch(1); (1)
Publisher.from(1, 2, 3, 4)
.afterOnNext(integer -> {
latch.countDown(); (2)
})
.beforeRequest(requestN -> {
latch.await(); (3)
});
1 | Hypothetical synchronization point. In real life it may be due to the code waiting for an event to happen externally. |
2 | Trigger the external event (hypothetical synchronization point of CountDownLatch ) after receiving the item. |
3 | Wait for the external event to happen (hypothetical synchronization point of CountDownLatch ) before sending
requestN to the Publisher . |
As per ReactiveStreams rule 1.1, request for items
MUST happen before the items are delivered. In the above code, we are waiting for an item to be emitted before
sending a request to the Publisher
. This results in a deadlock as an item can not be emitted by the source without a
request being received and user code making sure that the request is not sent before an item is emitted.
In order to avoid such scenarios, it is handy to follow certain best practices while writing blocking code in operators:
-
Avoid co-ordination between two operators on the same source.
-
If such co-ordination is required, try limiting such coordination in either data or control path but not inter-dependent on each other.
-
If co-ordination is required between data and control path, be aware of ReactiveStreams semantics and how the two paths interact with each other.
If these rules are followed the above example can be modified to:
CountDownLatch latch = new CountDownLatch(1);
Publisher.from(1, 2, 3, 4)
.afterOnNext(integer -> {
latch.countDown();
})
.afterRequest(requestN -> { (1)
latch.await();
});
1 | Use afterRequest which happens after requestN is delivered to the source. |
In this modified example, since we now use afterRequest
, instead of beforeRequest
, we do not block requestN
to
go to the source and this code is safe.
Implementation
In order to use ServiceTalk’s blocking support feature, one does not need to know about implementation details and the above information is sufficient. However, if you are developing some operators in ServiceTalk or are just curious, blocking-implementation.adoc describes the design.