Asynchronous Context
Traditional thread-per-request libraries and applications often leverage the concept of a ThreadLocal
to provide a
static interface for conveying contextual information across function calls. Important use case for this include MDC
,
OpenTelemetry Context
, etc. This ThreadLocal
model allows libraries to pass information across APIs that can be
oblivious to the extra information being conveyed. However, ServiceTalk is an asynchronous library and the simple
ThreadLocal
model does not work because many computation flows may be happening on a shared set of threads at any
given time. To provide a similar experience, ServiceTalk offers a pair of abstractions, CapturedContext
for capturing
and restoring contextual information for use across async boundaries, and AsyncContext
(which is just a ServiceTalk
defined case of CapturedContext
) for defining your own contextual information.
Expectations
State is shared across asynchronous boundaries
ThreadLocal
state is preserved and modifiable across synchronous boundaries, and our operators should provide the same
functionality. To clarify this means if someone is calling a method in the imperative style and the state is shared
then the same property should hold for adjacent operators. To demonstrate this, consider the following code snippets:
Function composition
Single<String> single = ...
single.map(v -> {
AsyncContext.put(AUTH, v.length() > 2);
return v + "1";
}).filter(v -> {
return isValid(v);
}) // do something else in the function composition chain
boolean isValid(String in) {
return AsyncContext.get(AUTH) // ignore null
}
Imperative approach
void main() {
String value = ...;
value = doMap(value);
if (isValid(value)) {
// do something else
}
}
String doMap(String in) {
AsyncContext.put(AUTH, v.length() > 2);
return v + "1";
}
In both cases, the result of the program is the same. Similarly, a program can use a CapturedContextProvider to save any state they desire and it will be passed along the ServiceTalk async boundaries.
Isolation and defined scope
We want a static API that provides us with access to state, but we need to define the scope for which the state is
valid. A single bag of static state may become difficult to manage and reason about and also lead to memory leaks.
Instead, we would like to have the scope in which the state is modifiable in be defined. For example ThreadLocal
is
modifiable from anywhere within the same thread. Since ServiceTalk is built on an asynchronous framework where the same
thread may process multiple requests, and the same request may be processed on multiple threads this isn’t sufficient.
However, what folks typically use ThreadLocal
state for is to track static state per-request. The isolation and scope
of state must therefore also be able to follow per-request processing through the asynchronous control flow.
Works with offloading
ServiceTalk is an asynchronous framework at its core, but in order to avoid user code blocking I/O threads we offload to other threads. This means that every time we invoke user code we may have to jump threads. It is also possible on subsequent calls for the same request we may use a different thread (although not concurrently). We need to make sure the same static state is carried along through these different threads.
Works with Third-Party Libraries
Many existing APIs assume a ThreadLocal
type of state (e.g. MDC, OpenTracing). For this reason we would like to
provide compatibility within our asynchronous primitives and control flow provided by operators. ServiceTalk needs to
provide a way to capture third-party context and properly restore it within the reactive execution chain.
ServiceTalk Approach
In order to accommodate the Expectations of Asynchronous Context we need specific behavior from ServiceTalk. As described above, we are after a static state shared across synchronous boundaries, available across asynchronous boundaries, and is also sufficiently isolated in scope so that it can represent request/response control flow.
To achieve these
requirements the approach is to define context during the subscribe process and propagate it along Subscriber
chain.
ServiceTalk follows this set of rule set:
-
Asynchronous context will be captured and copied at
subscribe
time.-
When using
AsyncContext
, copying provides isolation from other asynchronous operations which typically represent independent processing. -
If users want to share context across boundaries the
shareContextOnSubscribe()
operator will share the sameAsyncContext
instance across asynchronous operations. This necessary when chaining together different async-sources such as with theflatMap
,defer
, and other related operators.
-
-
Asynchronous context will be saved/restored across asynchronous boundaries.
-
This is true for asynchronous operators (e.g.
flatMap
) and alsoExecutor
operations. -
If isolation of
AsyncContext
is required for a specific control flow,defer(..)
operators can be used to create new boundaries.
-
The above approach will provide the isolation required so that the state set inside an offloaded
HttpService#handle(..)
call is visible when processing the request/response.
Due to the intricacies of control flow this mechanism is directly implemented in our operators.
Complexity of implementation
Due to the shared state across the asynchronous boundaries we have a defined process for propagating asynchronous context:
-
When subscribing to an asynchronous primitive ServiceTalk will capture the
captured
context. -
The
captured
context will be restored for use during asynchronous operations. This process has it’s own rules:-
Save the
pre-existing
context that may be present on the executing thread, including empty or null context. -
Restore the
captured
context to the executing thread. -
Execute the asynchronous logic which now will see the
captured
context state. -
When the async logic is complete, restore the
pre-existing
context to the thread, including empty or null values.
-
This set of rules ensures that asynchronous primitives see the correct state at all times, including during recursive calls where different parts of the reactive chain may have different context information.
Disable Asynchronous Context
Asynchronous Context is enabled by default to accommodate for easy setup, but it can be disabled via
AsyncContext.disable()
. Note that this disables both the AsyncContext
and the CapturedContext
.
AsyncContext
Specifics
Understandability
The approach has a few succinct rules as to how AsyncContext
propagates and isolation is achieved. It is assumed the
more subtle and difficult to understand part will be due to concurrency on the underlying Map
, and modifications made
“later” in the control flow being visible “earlier” in the control flow. These scenarios are demonstrated in the
examples below:
-
Any time a
Publisher
(aka stream) of data comes in to an operator, there is a possibility for concurrency on theAsyncContext
map.
Publisher<String> publisher = ...;
AsyncContext.put(KEY, 10); // (1) put a value into AsyncContext before a .subscribe(..)
publisher.flatMapMergeSingle(v -> {
Integer contextValue = AsyncContext.get(KEY);
assert contextValue == 10 || contextValue == 30; // (2) Subscriber chain may see either value.
// AsyncContext will be copied when Single.subscribe(..) is called. Changes to the AsyncContext map from operators on
// the inner Single operator chain will therefore not be visible in the outer Publisher operator chain.
return client.request(/*do something with v*/)
.map(x -> {
AsyncContext.put(KEY, 20); // (3) put a new value for the same key
return x;
});
}).map(v -> {
Integer contextValue = AsyncContext.get(KEY);
assert contextValue == 10 || contextValue == 30;
// `publisher` may emit more items, and if it does then `flatMapMergeSingle` `Function` may be invoked concurrently
// with this code. This is because `client.request(..)` may complete on a different thread than `publisher` is
// delivering data on. This code has access to the same map as (2) which may result in concurrent modifications on
// `AsyncContext`. This is allowed by `AsyncContext` but may not be obvious due to modifications made "later" in the
// operator chain being visible "earlier" in the operator chain.
AsyncContext.put(KEY, 30);
return v;
})
-
Saving/restoring
AsyncContext
across asynchronous boundaries (e.g.Executor
) may lead to modifications being visible outside the asynchronous boundary.
Executor executor = ...
AsyncContext.put(KEY, "foo");
executor.execute(() -> {
AsyncContext.put(KEY, "bar");
});
String value = AsyncContext.get(KEY);
// value maybe "foo" or "bar" due to concurrent modifications
Cost Of Retention
This approach still requires thread local state in order to preserve state across method calls without explicitly
passing it. The ThreadLocal
class provides general retention of thread local state, but is backed by a Map
. The
frequency in which we need to save/restore the static state has been shown to introduce non-trivial costs. Since we know
that all of our threads will require this thread local state we can have our threads explicitly have a AsyncContext
member variable (see
ContextMapHolder
). There is also additional wrapping/unwrapping introduced on the asynchronous boundaries so there is additional object
allocation.
AsyncContext
Examples
AsyncContext
is designed to provide a static API to retain state associated across asynchronous boundaries.
Motivation for providing support for AsyncContext
can be found
here.
High Level Usage
At a high level AsyncContext
provides a Map
-like API for storing static state, and is isolated/scoped for each
request and response to simulate ThreadLocal
storage. Here is some code
// Main.java - application logic
Single<Response> single = authenticate(client.request(...));
single.map(response -> {
if (AsyncContext.get(USER_ID_KEY).equals("admin")) {
// do something for admin
} else {
// do something for non-admin
}
})// do something else in the function composition chain
// AuthenticationFilter.java
public static final Key<String> USER_ID_KEY = Key.newKey("userId", String.class);
public static Single<Resposne> authenticate(Single<Response> responseSingle) {
AsyncContext.put(USER_ID_KEY, client.headers().get("userId"));
}
CapturedContext
Specifics
In contrast to AsyncContext
, the main use case of CapturedContext
is to work with third-party context abstractions.
Users can define a
CapturedContextProvider
which will provide a way for ServiceTalk to capture and restore third-party context along the execution chain. An
important example of third-party context information is the OpenTelemetry Context. By using the
CapturedContextProvider
users can correctly propagate OpenTelemetry context information in a non-invasive way which
makes it much more likely to work with other third-party libraries.
See the JavaDocs for CapturedContextProvider for an example of how to define the context capture and restore process.
Cost Of Retention
Unlike AsyncContext
which has a well-defined cost model, the cost of saving and restoring arbitrary contextual
information is not possible to predict because it is based on the third-party APIs used to access and set it. In
general, good candidates for the CapturedContext
model are those where capturing and setting context is 'cheap',
such as saving and restoring the state of a ThreadLocal
. Because of the frequency of the save and restore process
in an asynchronous computation chain careful through should go into the cost of capturing and restoring third-party
context.