Blocking safe by default

This document elaborates how asynchronous sources provide ways to offload. General philosophy of writing blocking code while using ServiceTalk is explained here and is a recommended read.

Execution chain

Any ServiceTalk asynchronous source (Publisher, Single and Completable) offers multiple operators that can be applied to the source. Any instance of such a source and all the operators applied to it before it is subscribed, is called an execution chain.

Data and control flow in an execution chain

The image below details the flow of data (Subscriber methods) and control (Publisher and Subscription methods) messages. Invocations in either direction may be executed on an EventLoop thread and hence needs to be protected. In case new asynchronous sources are generated/received inside operators, they follow the same model and hence are removed for brevity.

Data and control flow in an execution chain

As shown in the above picture, there are inherently two directions (data and control) of information flow for an execution chain and these signals can be triggered in parallel.

By default, in ServiceTalk, signals are not executed on an EventLoop thread, but instead executed using an Executor provided by the application in the order they are received.

Implications

The implication of the above approach is the following:

Users can execute blocking code inside an execution chain, provided they are not waiting for another data or control message in the same execution chain.

Task Offloading

ServiceTalk uses Netty for network I/O and Netty uses a fixed number of EventLoop threads for executing I/O operations. The number of EventLoop threads correlates to the number of CPU cores and not to the number of requests. This is because in many cases threads typically sit idle while waiting for I/O to complete. Sharing threads helps minimize resource consumption and improves scalability. However, sharing threads for different requests means all control flow for a single request will impact all other requests that share the same thread. If the control flow blocks the current thread for a longer time period (aka "blocks the thread") (e.g. external I/O) this may negatively impact latency and throughput. One approach to ensure I/O thread availability is to carefully limit the scope of work done by I/O threads and, whenever practical, delegate all other necessary tasks that are not related to I/O to some other thread. Moving tasks from I/O threads to other threads is called “offloading” and is a core technique used by ServiceTalk.

Offloading is used for two purposes within ServiceTalk; firstly for the execution needed for the handling of asynchronous events, aka Signals, and secondly when handling asynchronous events, protecting scarce resources from being monopolized by blocking, untrusted or expensive application code. Asynchronous execution requires offloading – the initiating calling thread is not available. The use of protective offloading, where offloading is used for avoid resource monopolization, is a practical consideration, but just as necessary for reliable and predictable operation.

ServiceTalk will, by default, execute most application code on threads other than the Netty I/O threads. For most invocations of application code, if the application developer knows that their code cannot block and always executes quickly in near constant time they can request that ServiceTalk not offload their code. This will improve application performance by reducing latency and overhead. Requests to not offload will be honored by ServiceTalk if all the other components in the same execution path have also opted out of offloading. See Execution Strategy for more information on how components may specify their offloading requirements. As a last resort, tasks may also be queued to be performed as threads are available.

ServiceTalk is designed to be fully asynchronous except where the API provides explicit blocking behavior as a convenience.

ServiceTalk uses a task based approach for offloading, using Executor in the standard "fire-and-forget" way to run the offloaded tasks. Often the Executor has a pool of threads, possibly unbounded, and tasks are run using whatever thread is available. In particular, different threads may be used for each task executed and code running in tasks cannot depend upon a consistent thread being used for invoking program logic. This approach is generally the most scalable because it makes the best utilization of threads. If it is necessary to share state between tasks then `ContextMap`s can be used.

If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a different Executor in specific cases. ServiceTalk provides reactive operators for explicitly offloading both subscribe and Subscriber via the subscribeOn(executor) operator as well as the Subscription methods via the publishOn(executor) operator. These operators will unconditionally offload to the specified Executor. Additional, conditional offloading operators; subscribeOn(executor, predicate) and publishOn(executor, predicate) are also available.

publishOn() Example

Using the publishOn(executor) operator allows processing of signals related to the source content on a different thread than is generating the content. It is the most common form of offloading used as it relates to the data path, specifically the Subscription methods.

Collection<Integer> result = Publisher.range(1, 10) (2) (5)
        .map(element -> element)  // non-offloaded NO-OP
        .publishOn(publishExecutor)  (4)
        .map(element -> element)  // offloaded NO-OP
        .toFuture()  (3)
        .get();  (1) (6)
1 toFuture() begins by calling subscribe(Subscriber). Executing on the calling thread, execution flows up the operator chain towards the source; mappublishOnmapPublisher.range(1, 10).
2 Still executing on the calling thread, Range will call Subscriber.onSubscribe(Subscription) on the Subscriber. This flows back down the operator chain, RangemappublishOn (offloads on to publishExecutor thread) → maptoFuture.
3 The onSubscribe(Subscription) method of toFuture()'s Subscriber will call Subscription.request(Long.MAX_VALUE). Execution flows up the operator chain towards the source; mappublishOnmapRange. Range will publish synchronously via onNext(element) nine items, the integers “1” through “9”.
4 Each onNext flows down the operator chain, RangemappublishOn (offloads on to publishExecutor thread) → maptoFuture where the element is collected for the result. For each offloaded item a thread of publishExecutor will be used for executing the second map operator and final collect operation.
5 After all items, Range sends the terminal onComplete() signal synchronously which flows down the operator chain, Rangemap → `publishOn (offloads on to publishExecutor thread) → maptoFuture and will complete the Future with the integer collection result.
6 The calling thread will wait at get() for the Future result to be asynchronously completed.

subscribeOn() Example

Using the subscribeOn(executor) operator allows processing of the subscription and demand on a specific thread. Using the subscribeOn(executor) operator is necessary if the thread doing the subscribe or interacting with the Subscription is sensitive to blocking, for example, an EventLoop thread.

Collection<Integer> result = Publisher.range(1, 10) (2) (4)
        .map(element -> element)  // NO-OP
        .subscribeOn(subscribeExecutor)
        .toFuture() (1) (3) (5)
        .get(); (6)
1 toFuture() will do a subscribe(Subscriber). This flows up the operator chain toward the source; subscribeOn (offload onto subscribeExecutor thread) → mapRange.
2 Still on a thread from subscribeExecutor Range will call Subscriber.onSubscribe(Subscription) on the Subscriber. This flows back down the operator chain, RangemapsubscribeOntoFuture.
3 Still on the`subscribeExecutor` thread, toFuture()’s `onSubscribe(Subscription) call Subscription.request(Long.MAX_VALUE). This flows up the operator chain, subscribeOn (offloads again onto another subscribeExecutor thread) → mapRange.
4 Still on thread from the second offload to subscribeExecutor, Range ` will publish synchronously via onNext(element) nine items, the integers “1” through “9”. Each onNext flows back down the operator chain, RangemapsubscribeOntoFuture where the element is collected for the result.
5 Still on thread from the second offload to subscribeExecutor, after all items, Range will call onComplete. When the toFuture() Subscriber receives the onComplete() signal it will complete the Future with the integer collection result.
6 The calling thread will wait at get() for the Future result to be asynchronously completed.

publishOn()/subscribeOn() Detailed Example

These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP map implementations to reveal the active thread during their execution. To show the active thread at the other points described in the callouts the expanded example also adds whenOnSubscribe, whenRequest, liftSync and whenFinally operations in the operator chain. The output of the example shows the thread used for executing each of the operators, while the specialized operators provide examples of how you might use them to debug your own programs.

Collection<?> result = Publisher.range(1, 3)
        .map(element -> {
            System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
            return element;
        })
        .whenOnSubscribe(subscription -> {
            System.out.println("\nonSubscribe starts on " + Thread.currentThread());
        })
        .publishOn(publishExecutor)
        .map(element -> {
            System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
            return element;
        })
        .whenRequest(request -> {
            System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
        })
        .liftSync(subscriber -> {
            System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
            return subscriber;
        })
        .subscribeOn(subscribeExecutor)
        .liftSync(subscriber -> {
            System.out.println("\nSubscribe begins on " + Thread.currentThread());
            return subscriber;
        })
        .whenOnSubscribe(subscription -> {
            System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
        })
        .whenRequest(request -> {
            System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
        })
        .whenFinally(new TerminalSignalConsumer() {
            @Override
            public void onComplete() {
                System.out.println("\ncomplete on " + Thread.currentThread());
            }

            @Override
            public void onError(final Throwable throwable) {
                System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
            }

            @Override
            public void cancel() {
                System.out.println("\ncancel on " + Thread.currentThread());
            }
        })
        .toFuture()
        .get();

If the default ServiceTalk offloading has been disabled then user code may still wish to offload execution to a different Executor in specific cases. ServiceTalk provides reactive operators for explicitly offloading both subscribe and Subscriber via the subscribeOn(executor) operator as well as the Subscription methods via the publishOn(executor) operator. These operators will unconditionally offload to the specified Executor. Additional, conditional offloading operators; subscribeOn(executor, predicate) and publishOn(executor, predicate) are also available.

publishOn() Example

Using the publishOn(executor) operator allows processing of signals related to the source content on a different thread than is generating the content. It is the most common form of offloading used as it relates to the data path, specifically the Subscription methods.

Collection<Integer> result = Publisher.range(1, 10) (2) (5)
        .map(element -> element)  // non-offloaded NO-OP
        .publishOn(publishExecutor)  (4)
        .map(element -> element)  // offloaded NO-OP
        .toFuture()  (3)
        .get();  (1) (6)
1 toFuture() begins by calling subscribe(Subscriber). Executing on the calling thread, execution flows up the operator chain towards the source; mappublishOnmapPublisher.range(1, 10).
2 Still executing on the calling thread, Range will call Subscriber.onSubscribe(Subscription) on the Subscriber. This flows back down the operator chain, RangemappublishOn (offloads on to publishExecutor thread) → maptoFuture.
3 The onSubscribe(Subscription) method of toFuture()'s Subscriber will call Subscription.request(Long.MAX_VALUE). Execution flows up the operator chain towards the source; mappublishOnmapRange. Range will publish synchronously via onNext(element) nine items, the integers “1” through “9”.
4 Each onNext flows down the operator chain, RangemappublishOn (offloads on to publishExecutor thread) → maptoFuture where the element is collected for the result. For each offloaded item a thread of publishExecutor will be used for executing the second map operator and final collect operation.
5 After all items, Range sends the terminal onComplete() signal synchronously which flows down the operator chain, Rangemap → `publishOn (offloads on to publishExecutor thread) → maptoFuture and will complete the Future with the integer collection result.
6 The calling thread will wait at get() for the Future result to be asynchronously completed.

subscribeOn() Example

Using the subscribeOn(executor) operator allows processing of the subscription and demand on a specific thread. Using the subscribeOn(executor) operator generally requires an understanding of the behavior of the source; using a different source may change the need for offloading. subscribeOn(executor) is used less frequently than publishOn(executor) but is useful when it is necessary to offload the control path; the subscribe method or Subscription methods.

Collection<Integer> result = Publisher.range(1, 10) (2) (4)
        .map(element -> element)  // NO-OP
        .subscribeOn(subscribeExecutor)
        .toFuture() (1) (3) (5)
        .get(); (6)
1 toFuture() will do a subscribe(Subscriber). This flows up the operator chain toward the source; subscribeOn (offload onto subscribeExecutor thread) → mapRange.
2 Still on a thread from subscribeExecutor Range will call Subscriber.onSubscribe(Subscription) on the Subscriber. This flows back down the operator chain, RangemapsubscribeOntoFuture.
3 Still on the`subscribeExecutor` thread, toFuture()’s `onSubscribe(Subscription) call Subscription.request(Long.MAX_VALUE). This flows up the operator chain, subscribeOn (offloads again onto another subscribeExecutor thread) → mapRange.
4 Still on thread from the second offload to subscribeExecutor, Range ` will publish synchronously via onNext(element) nine items, the integers “1” through “9”. Each onNext flows back down the operator chain, RangemapsubscribeOntoFuture where the element is collected for the result.
5 Still on thread from the second offload to subscribeExecutor, after all items, Range will call onComplete. When the toFuture() Subscriber receives the onComplete() signal it will complete the Future with the integer collection result.
6 The calling thread will wait at get() for the Future result to be asynchronously completed.

publishOn()/subscribeOn() Detailed Example

These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP map implementations to reveal the active thread during their execution. To show the active thread at the other points described in the callouts the expanded example also adds whenOnSubscribe, whenRequest, liftSync and whenFinally operations in the operator chain. The output of the example shows the thread used for executing each of the operators, while the specialized operators provide examples of how you might use them to debug your own programs.

Collection<?> result = Publisher.range(1, 3)
        .map(element -> {
            System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
            return element;
        })
        .whenOnSubscribe(subscription -> {
            System.out.println("\nonSubscribe starts on " + Thread.currentThread());
        })
        .publishOn(publishExecutor)
        .map(element -> {
            System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
            return element;
        })
        .whenRequest(request -> {
            System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
        })
        .liftSync(subscriber -> {
            System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
            return subscriber;
        })
        .subscribeOn(subscribeExecutor)
        .liftSync(subscriber -> {
            System.out.println("\nSubscribe begins on " + Thread.currentThread());
            return subscriber;
        })
        .whenOnSubscribe(subscription -> {
            System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
        })
        .whenRequest(request -> {
            System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
        })
        .whenFinally(new TerminalSignalConsumer() {
            @Override
            public void onComplete() {
                System.out.println("\ncomplete on " + Thread.currentThread());
            }

            @Override
            public void onError(final Throwable throwable) {
                System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
            }

            @Override
            public void cancel() {
                System.out.println("\ncancel on " + Thread.currentThread());
            }
        })
        .toFuture()
        .get();

Implementation

In order to use ServiceTalk’s blocking support feature, one does not need to know about implementation details and the above information is sufficient. However, if you are developing some operators in ServiceTalk or are just curious, blocking-implementation.adoc describes the design.