Concurrency Pitfalls
ServiceTalk sequences events in data and control path of processing as if they were done on the same thread. Since, data and control events may happen in parallel, there is always a chance for user code to deadlock if they are executed in sequence.
CountDownLatch latch = new CountDownLatch(1); (1)
Publisher.from(1, 2, 3, 4)
.afterOnNext(integer -> {
latch.countDown(); (2)
})
.beforeRequest(requestN -> {
latch.await(); (3)
});
1 | Hypothetical synchronization point. In real life it may be due to the code waiting for an event to happen externally. |
2 | Trigger the external event (hypothetical synchronization point of CountDownLatch ) after receiving the item. |
3 | Wait for the external event to happen (hypothetical synchronization point of CountDownLatch ) before sending
requestN to the Publisher . |
As per ReactiveStreams rule 1.1, request for items
MUST happen before the items are delivered. In the above code, we are waiting for an item to be emitted before
sending a request to the Publisher
. This results in a deadlock as an item can not be emitted by the source without a
request being received and user code making sure that the request is not sent before an item is emitted.
In order to avoid such scenarios, it is handy to follow certain best practices while writing blocking code in operators:
-
Avoid coordination between two operators on the same source.
-
If such coordination is required, try limiting such coordination in either data or control path but not inter-dependent on each other.
-
If coordination is required between data and control path, be aware of ReactiveStreams semantics and how the two paths interact with each other.
If these rules are followed the above example can be modified to:
CountDownLatch latch = new CountDownLatch(1);
Publisher.from(1, 2, 3, 4)
.afterOnNext(integer -> {
latch.countDown();
})
.afterRequest(requestN -> { (1)
latch.await();
});
1 | Use afterRequest which happens after requestN is delivered to the source. |
In this modified example, since we now use afterRequest
, instead of beforeRequest
, we do not block requestN
to
go to the source and this code is safe.