Blocking safe by default

This document elaborates how asynchronous sources provide ways to offload. General philosophy of writing blocking code while using ServiceTalk is explained here and is a recommended read.

Execution chain

Any ServiceTalk asynchronous source (Publisher, Single and Completable) offers multiple operators that can be applied to the source. Any instance of such a source and all the operators applied to it before it is subscribed, is called an execution chain.

Data and control flow in an execution chain

The image below details the flow of data (Subscriber methods) and control (Publisher and Subscription methods) messages. Invocations in either direction may be executed on an event loop thread and hence needs to be protected. In case new asynchronous sources are generated/received inside operators, they follow the same model and hence are removed for brevity.

Data and control flow in an execution chain

As shown in the above picture, there are inherently two directions (data and control) of information flow for an execution chain and these signals can be triggered in parallel.

By default, in ServiceTalk, signals are not executed on an event loop thread, but instead executed using an Executor provided by the application in the order they are received.

Implications

The implication of the above approach is the following:

Users can execute blocking code inside an execution chain, provided they are not waiting for another data or control message in the same execution chain.

Look at Pitfalls section for examples elaborating this implication.

Executor Affinity

In ServiceTalk different protocols will provide request processing by means of an asynchronous source, eg: Single. In such cases, this processing will be expressed as a chain of operators on the asynchronous source. Configuring limits on resources such as a thread pool can be challenging if each operator in the chain may run on different threads. This would mean applying limits to such a thread pool would be sensitive to program’s execution flow and may change over time. In order to make thread pool configuration easier, ServiceTalk provides executor affinity for all asynchronous sources.

Affinity for an asynchronous source

Let us assume that following expresses request processing:

 client.request() (1)
       .map(resp -> {
            return doSomeBlockingWorkAndConvertToString(resp); (2)
       })
       .filter(stringResp -> {
            doMoreBlockingWorkToFilterString(stringResp); (3)
       });
1 A hypothetical client which provides a request() method that returns a Single<Response>.
2 Some blocking work done inside map() operator by this function, which provides a String as a response.
3 Some more blocking work inside filter() operator.

In the above example user provided functions inside both map() and filter() will be invoked using the specified Executor.

Affinity across asynchronous sources

Executor affinity across asynchronous sources is not guaranteed even if they are part of the same processing chain.

Let us assume that following expresses request processing:

 client.request() (1)
       .map(resp -> {
            return doSomeBlockingWorkAndConvertToString(resp); (2)
       })
       .flatMap(stringResp -> {
            return client2.request(stringResp) (3)
                          .map(resp -> {
                            doSomeBlockingWorkAgain(); (4)
                          });
       })
       .filter(stringResp -> {
            doMoreBlockingWorkToFilterString(stringResp); (5)
       });
1 A hypothetical client which provides a request() method that returns a Single<Response>.
2 Some blocking work done inside map() operator by this function, which provides a String as a response.
3 Call another client2 that provides a new Single which is returned from flatmap.
4 Do some blocking work inside map() operator for the nested Single returned by Step (3).
5 Some more blocking work inside filter() operator.

In the above example (2), (3) and (5) will run on one Executor whereas (4) will run on a different Executor.

Pitfalls

As defined in Execution chain, ServiceTalk sequences events in data and control path of processing as if they were done on the same thread. Since, data and control events may happen in parallel, there is always a chance for user code to deadlock if they are executed in sequence.

    CountDownLatch latch = new CountDownLatch(1); (1)
    Publisher.from(1, 2, 3, 4)
            .afterOnNext(integer -> {
                latch.countDown();  (2)
            })
            .beforeRequest(requestN -> {
                latch.await(); (3)
            });
1 Hypothetical synchronization point. In real life it may be due to the code waiting for an event to happen externally.
2 Trigger the external event (hypothetical synchronization point of CountDownLatch) after receiving the item.
3 Wait for the external event to happen (hypothetical synchronization point of CountDownLatch) before sending requestN to the Publisher.

As per ReactiveStreams rule 1.1, request for items MUST happen before the items are delivered. In the above code, we are waiting for an item to be emitted before sending a request to the Publisher. This results in a deadlock as an item can not be emitted by the source without a request being received and user code making sure that the request is not sent before an item is emitted.

In order to avoid such scenarios, it is handy to follow certain best practices while writing blocking code in operators:

  • Avoid co-ordination between two operators on the same source.

  • If such co-ordination is required, try limiting such coordination in either data or control path but not inter-dependent on each other.

  • If co-ordination is required between data and control path, be aware of ReactiveStreams semantics and how the two paths interact with each other.

If these rules are followed the above example can be modified to:

    CountDownLatch latch = new CountDownLatch(1);
    Publisher.from(1, 2, 3, 4)
            .afterOnNext(integer -> {
                latch.countDown();
            })
            .afterRequest(requestN -> { (1)
                latch.await();
            });
1 Use afterRequest which happens after requestN is delivered to the source.

In this modified example, since we now use afterRequest, instead of beforeRequest, we do not block requestN to go to the source and this code is safe.

Implementation

In order to use ServiceTalk’s blocking support feature, one does not need to know about implementation details and the above information is sufficient. However, if you are developing some operators in ServiceTalk or are just curious, blocking-implementation.adoc describes the design.