Blocking safe by default
This document elaborates how asynchronous sources provide ways to offload. General philosophy of writing blocking code while using ServiceTalk is explained here and is a recommended read.
Execution chain
Any ServiceTalk asynchronous source (Publisher
, Single
and Completable
) offers multiple operators that can be
applied to the source. Any instance of such a source and all the operators applied to it before it is subscribed, is
called an execution chain.
Data and control flow in an execution chain
The image below details the flow of data (Subscriber
methods) and control (Publisher
and Subscription
methods)
messages. Invocations in either direction may be executed on an EventLoop thread and hence needs to be protected. In
case new asynchronous sources are generated/received inside operators, they follow the same model and hence are removed
for brevity.
As shown in the above picture, there are inherently two directions (data and control) of information flow for an execution chain and these signals can be triggered in parallel.
By default, in ServiceTalk, signals are not executed on an EventLoop thread, but instead executed using an Executor provided by the application in the order they are received.
Implications
The implication of the above approach is the following:
Users can execute blocking code inside an execution chain, provided they are not waiting for another data or control message in the same execution chain.
Task Offloading
ServiceTalk uses Netty for network I/O and Netty uses a fixed number of EventLoop threads for executing I/O operations. The number of EventLoop threads correlates to the number of CPU cores and not to the number of requests. This is because in many cases threads typically sit idle while waiting for I/O to complete. Sharing threads helps minimize resource consumption and improves scalability. However, sharing threads for different requests means all control flow for a single request will impact all other requests that share the same thread. If the control flow blocks the current thread for a longer time period (aka "blocks the thread") (e.g. external I/O) this may negatively impact latency and throughput. One approach to ensure I/O thread availability is to carefully limit the scope of work done by I/O threads and, whenever practical, delegate all other necessary tasks that are not related to I/O to some other thread. Moving tasks from I/O threads to other threads is called “offloading” and is a core technique used by ServiceTalk.
Offloading is used for two purposes within ServiceTalk; firstly for the execution needed for the handling of asynchronous events, aka Signals, and secondly when handling asynchronous events, protecting scarce resources from being monopolized by blocking, untrusted or expensive application code. Asynchronous execution requires offloading – the initiating calling thread is not available. The use of protective offloading, where offloading is used for avoid resource monopolization, is a practical consideration, but just as necessary for reliable and predictable operation.
ServiceTalk will, by default, execute most application code on threads other than the Netty I/O threads. For most invocations of application code, if the application developer knows that their code cannot block and always executes quickly in near constant time they can request that ServiceTalk not offload their code. This will improve application performance by reducing latency and overhead. Requests to not offload will be honored by ServiceTalk if all the other components in the same execution path have also opted out of offloading. See Execution Strategy for more information on how components may specify their offloading requirements. As a last resort, tasks may also be queued to be performed as threads are available.
ServiceTalk is designed to be fully asynchronous except where the API provides explicit blocking behavior as a convenience.
ServiceTalk uses a task based approach for offloading, using Executor
in the standard "fire-and-forget" way to run the
offloaded tasks. Often the Executor
has a pool of threads, possibly unbounded, and tasks are run using whatever thread
is available. In particular, different threads may be used for each task executed and code running in tasks cannot
depend upon a consistent thread being used for invoking program logic. This approach is generally the most scalable
because it makes the best utilization of threads. If it is necessary to share state between tasks then
`ContextMap`s
can be used.
If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a
different Executor
in specific cases. ServiceTalk provides reactive operators for explicitly offloading both
subscribe
and Subscriber
via the subscribeOn(executor)
operator as well as the Subscription
methods via the
publishOn(executor)
operator. These operators will unconditionally offload to the specified Executor
. Additional,
conditional offloading operators; subscribeOn(executor, predicate)
and publishOn(executor, predicate)
are also
available.
publishOn()
Example
Using the publishOn(executor)
operator allows processing of signals related to the source content on a different
thread than is generating the content. It is the most common form of offloading used as it relates to the data path,
specifically the Subscription
methods.
Collection<Integer> result = Publisher.range(1, 10) (2) (5)
.map(element -> element) // non-offloaded NO-OP
.publishOn(publishExecutor) (4)
.map(element -> element) // offloaded NO-OP
.toFuture() (3)
.get(); (1) (6)
1 | toFuture() begins by calling subscribe(Subscriber) . Executing on the calling thread, execution flows up
the operator chain towards the source; map → publishOn → map → Publisher.range(1, 10) . |
2 | Still executing on the calling thread, Range will call Subscriber.onSubscribe(Subscription) on the
Subscriber . This flows back down the operator chain, Range → map → publishOn (offloads on to publishExecutor
thread) → map → toFuture . |
3 | The onSubscribe(Subscription) method of toFuture() 's Subscriber will call
Subscription.request(Long.MAX_VALUE) . Execution flows up the operator chain towards the source;
map → publishOn → map → Range . Range will publish synchronously via onNext(element) nine items, the
integers “1” through “9”. |
4 | Each onNext flows down the operator chain, Range → map → publishOn (offloads on to publishExecutor
thread) → map → toFuture where the element is collected for the result. For each offloaded item a thread of
publishExecutor will be used for executing the second map operator and final collect operation. |
5 | After all items, Range sends the terminal onComplete() signal synchronously which flows down the operator chain,
Range → map → `publishOn (offloads on to publishExecutor thread) → map → toFuture and will complete the
Future with the integer collection result. |
6 | The calling thread will wait at get() for the Future result to be asynchronously completed. |
subscribeOn()
Example
Using the subscribeOn(executor)
operator allows processing of the subscription and demand on a specific thread. Using
the subscribeOn(executor)
operator generally requires an understanding of the behavior of the source; using a
different source may change the need for offloading. subscribeOn(executor)
is used less frequently than
publishOn(executor)
but is useful when it is necessary to offload the control path; the subscribe
method or
Subscription
methods.
Collection<Integer> result = Publisher.range(1, 10) (2) (4)
.map(element -> element) // NO-OP
.subscribeOn(subscribeExecutor)
.toFuture() (1) (3) (5)
.get(); (6)
1 | toFuture() will do a subscribe(Subscriber) . This flows up the operator chain toward the source;
subscribeOn (offload onto subscribeExecutor thread) → map → Range . |
2 | Still on a thread from subscribeExecutor Range will call Subscriber.onSubscribe(Subscription) on the
Subscriber . This flows back down the operator chain, Range → map → subscribeOn → toFuture . |
3 | Still on the`subscribeExecutor` thread, toFuture()’s `onSubscribe(Subscription) call
Subscription.request(Long.MAX_VALUE) . This flows up the operator chain, subscribeOn (offloads again onto another
subscribeExecutor thread) → map → Range . |
4 | Still on thread from the second offload to subscribeExecutor , Range ` will publish synchronously via
onNext(element) nine items, the integers “1” through “9”. Each onNext flows back down the operator chain,
Range → map → subscribeOn → toFuture where the element is collected for the result. |
5 | Still on thread from the second offload to subscribeExecutor , after all items, Range will call onComplete .
When the toFuture() Subscriber receives the onComplete() signal it will complete the Future with the integer
collection result. |
6 | The calling thread will wait at get() for the Future result to be asynchronously completed. |
publishOn()/subscribeOn() Detailed Example
These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP
map
implementations to reveal the active thread during their execution. To show the active thread at the other
points described in the callouts the expanded example also adds whenOnSubscribe
, whenRequest
, liftSync
and
whenFinally
operations in the operator chain. The output of the example shows the thread used for executing each of
the operators, while the specialized operators provide examples of how you might use them to debug your own programs.
Collection<?> result = Publisher.range(1, 3)
.map(element -> {
System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
return element;
})
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe starts on " + Thread.currentThread());
})
.publishOn(publishExecutor)
.map(element -> {
System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
return element;
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
})
.liftSync(subscriber -> {
System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
return subscriber;
})
.subscribeOn(subscribeExecutor)
.liftSync(subscriber -> {
System.out.println("\nSubscribe begins on " + Thread.currentThread());
return subscriber;
})
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
})
.whenFinally(new TerminalSignalConsumer() {
@Override
public void onComplete() {
System.out.println("\ncomplete on " + Thread.currentThread());
}
@Override
public void onError(final Throwable throwable) {
System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
}
@Override
public void cancel() {
System.out.println("\ncancel on " + Thread.currentThread());
}
})
.toFuture()
.get();
If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a
different Executor
in specific cases. ServiceTalk provides reactive operators for explicitly offloading both
subscribe
and Subscriber
via the subscribeOn(executor)
operator as well as the Subscription
methods via the
publishOn(executor)
operator. These operators will unconditionally offload to the specified Executor
. Additional,
conditional offloading operators; subscribeOn(executor, predicate)
and publishOn(executor, predicate)
are also
available.
Implementation
In order to use ServiceTalk’s blocking support feature, one does not need to know about implementation details and the above information is sufficient. However, if you are developing some operators in ServiceTalk or are just curious, blocking-implementation.adoc describes the design.