Async Context
AsyncContext
is designed to provide a static API to retain state associated across asynchronous boundaries.
Motivation for providing support for AsyncContext
can be found
here.
High Level Usage
At a high level AsyncContext
provides a Map
-like API for storing static state, and is isolated/scoped for each
request and response to simulate ThreadLocal
storage. Here is some code
// Main.java - application logic
Single<Response> single = authenticate(client.request(...));
single.map(response -> {
if (AsyncContext.get(USER_ID_KEY).equals("admin")) {
// do something for admin
} else {
// do something for non-admin
}
})// do something else in the function composition chain
// AuthenticationFilter.java
public static final Key<String> USER_ID_KEY = Key.newKey("userId");
public static Single<Resposne> authenticate(Single<Response> responseSingle) {
AsyncContext.put(USER_ID_KEY, client.headers().get("userId"));
}
Expectations
Perhaps unfortunately, but many existing APIs assume a ThreadLocal
type of state (e.g. MDC
, OpenTracing
). For this
reason we would like to provide a similar experience within our asynchronous primitives and control flow provided by
operators. From a practical stand point we want to leverage this tool as the foundation for infrastructure type tasks
that require static state, or cannot otherwise be explicitly accounted for in the API. This leads us to the following
desirable characteristics.
State is shared across synchronous boundaries
ThreadLocal
state is preserved and modifiable across synchronous boundaries, and our operators should provide the same
functionality. To clarify this means if someone is calling an method in the imperative style and the state is shared
then the same property should hold for adjacent operators. To demonstrate this check out the following code snippets:
Function composition
Single<String> single = ...
single.map(v -> {
AsyncContext.put(AUTH, v.length() > 2);
return v + "1";
}).filter(v -> {
return isValid(v);
}) // do something else in the function composition chain
boolean isValid(String in) {
return AsyncContext.get(AUTH) // ignore null
}
Imperative approach
void main() {
String value = ...;
value = doMap(value);
if (isValid(value)) {
// do something else
}
}
String doMap(String in) {
AsyncContext.put(AUTH, v.length() > 2);
return v + "1";
}
Isolation and defined scope
We want a static API that provides us with access to state, but we need to define the scope for which the state is
valid. A single bag of static state may become difficult to manage and reason about and also lead to memory leaks.
Instead we would like to have the scope in which the state is modifiable in be defined. For example ThreadLocal
is
modifiable from anywhere within the same thread. Since ServiceTalk is built on an asynchronous framework where the same
thread may process multiple requests, and the same request may be processed on multiple threads this isn’t sufficient.
However what folks typically use ThreadLocal
state for is to track static state per-request. The isolation and scope
of state must therefore also be able to follow per-request processing through the asynchronous control flow.
Work with offloading
ServiceTalk is an asynchronous framework at its core, but in order to avoid user code blocking EventLoop threads we offload to other threads. This means that every time we invoke user code we may have to jump threads. It is also possible on subsequent calls for the same request we may use a different thread (although not concurrently). We need to make sure the same static state is carried along through these different threads.
ServiceTalk Approach
In order to accommodate the Expectations of AsyncContext we need specific behavior from AsyncContext
. As described
above we are after static state shared across synchronous boundaries, available across asynchronous boundaries, and is
also sufficiently isolated in scope so that it can represent request/response control flow. To achieve these
requirements the approach is to have AsyncContext
backed by a modifiable (concurrent) map, and is associated with a
Subscriber
chain. Since the map is modifiable we will need to define the scope (e.g. how the state is isolated) of
this static state. The AsyncContext
will follow the following rule set:
-
AsyncContext
will be captured and copied atsubscribe
time.-
This provides isolation from other asynchronous operations which typically represent independent processing.
-
-
AsyncContext
will be saved/restored across asynchronous boundaries.-
This is true for asynchronous operators (e.g.
flatMap
) and alsoExecutor
operations. -
If isolation is required for a specific control flow, there will be operators (e.g.
asyncContextBoundary
) and utilities (e.g.Runnable asyncContextBoundary(Runnable)
) to create boundaries.
-
The above approach will provide the isolation required so that the state set inside an offloaded
HttpService#handle(..)
call is visible when processing the request/response.
Due to the intricacies of control flow this mechanism is directly implemented in our operators.
Complexity of implementation
Due to the shared state across the asynchronous boundaries we need to make sure we save the current
context before the
boundary, save the original
context before executing user code on the new thread, restore the current
context
while executing user code, and then restore the original
context. This requires object wrapping/unwrapping and
leveraging ThreadLocal
(or an
AsyncContextMapHolder
) to retain the state. We also need to capture the current context at subscribe
, propagate it up the operator chain,
and capture it effectively in the source (or in a wrapped Subscriber
just outside the source). These mechanics are
similar to how the
SignalOffloader
is captured and propagated, and we use a similar approach here.
Understandability
The approach has a few succinct rules as to how AsyncContext
propagates and isolation is achieved. It is assumed the
more subtle and difficult to understand part will be due to concurrency on the underlying Map
, and modifications made
“later” in the control flow being visible “earlier” in the control flow. These scenarios are demonstrated in the
examples below:
-
Any time a
Publisher
(aka stream) of data comes in to an operator, there is a possibility for concurrency on theAsyncContext
map.
Publisher<String> publisher = ...;
publisher
.flatMapSingle(v -> {
// (1) AsyncContext will be saved before the async boundary
// AsyncContext will be copied/isolated when this async source is subscribed to
client.request(/*do something with v*/)
)
.map(v -> {
// AsyncContext before the async boundary (1) is restored
// Note that modifications made to AsyncContext here may introduce concurrency
// and be visible before the async boundary above (1).
})
-
Saving/restoring
AsyncContext
across asynchronous boundaries (e.g.Executor
) may lead to modifications being visible outside the asynchronous boundary.
Executor executor = ...
AsyncContext.put(key, "foo")
executor.execute(() -> {
AsyncContext.put(key, "bar")
});
String value = AsyncContext.get(key);
// value maybe "foo" or "bar" due to concurrent modifications
Cost Of Retention
This approach still requires thread local state in order to preserve state across method calls without explicitly
passing it. The ThreadLocal
class provides general retention of thread local state, but is backed by a Map
. The
frequency in which we need to save/restore the static state has been shown to introduce non-trivial costs. Since we know
that all of our threads will require this thread local state we can have our threads explicitly have a AsyncContext
member variable (see
AsyncContextMapHolder
). There is also additional wrapping/unwrapping introduced on the asynchronous boundaries so there is additional object
allocation.
Disable AsyncContext
AsyncContext
is enabled by default to accommodate for easy setup, but it can be disabled via AsyncContext.disable()
.