AsyncContext

AsyncContext is designed to provide a static API to retain state associated across asynchronous boundaries. Motivation for providing support for AsyncContext can be found here.

High Level Usage

At a high level AsyncContext provides a Map-like API for storing static state, and is isolated/scoped for each request and response to simulate ThreadLocal storage. Here is some code

// Main.java - application logic
Single<Response> single = authenticate(client.request(...));
single.map(response -> {
  if (AsyncContext.get(USER_ID_KEY).equals("admin")) {
    // do something for admin
  } else {
    // do something for non-admin
  }
})// do something else in the function composition chain

// AuthenticationFilter.java
public static final Key<String> USER_ID_KEY = Key.newKey("userId", String.class);

public static Single<Resposne> authenticate(Single<Response> responseSingle) {
  AsyncContext.put(USER_ID_KEY, client.headers().get("userId"));
}

Expectations

Perhaps unfortunately, but many existing APIs assume a ThreadLocal type of state (e.g. MDC, OpenTracing). For this reason we would like to provide a similar experience within our asynchronous primitives and control flow provided by operators. From a practical stand point we want to leverage this tool as the foundation for infrastructure type tasks that require static state, or cannot otherwise be explicitly accounted for in the API. This leads us to the following desirable characteristics.

State is shared across synchronous boundaries

ThreadLocal state is preserved and modifiable across synchronous boundaries, and our operators should provide the same functionality. To clarify this means if someone is calling an method in the imperative style and the state is shared then the same property should hold for adjacent operators. To demonstrate this check out the following code snippets:

Function composition

Single<String> single = ...
single.map(v -> {
  AsyncContext.put(AUTH, v.length() > 2);
  return v + "1";
}).filter(v -> {
  return isValid(v);
}) // do something else in the function composition chain

boolean isValid(String in) {
  return AsyncContext.get(AUTH) // ignore null
}

Imperative approach

void main() {
  String value = ...;
  value = doMap(value);
  if (isValid(value)) {
    // do something else
  }
}

String doMap(String in) {
  AsyncContext.put(AUTH, v.length() > 2);
  return v + "1";
}

Isolation and defined scope

We want a static API that provides us with access to state, but we need to define the scope for which the state is valid. A single bag of static state may become difficult to manage and reason about and also lead to memory leaks. Instead we would like to have the scope in which the state is modifiable in be defined. For example ThreadLocal is modifiable from anywhere within the same thread. Since ServiceTalk is built on an asynchronous framework where the same thread may process multiple requests, and the same request may be processed on multiple threads this isn’t sufficient. However what folks typically use ThreadLocal state for is to track static state per-request. The isolation and scope of state must therefore also be able to follow per-request processing through the asynchronous control flow.

Work with offloading

ServiceTalk is an asynchronous framework at its core, but in order to avoid user code blocking EventLoop threads we offload to other threads. This means that every time we invoke user code we may have to jump threads. It is also possible on subsequent calls for the same request we may use a different thread (although not concurrently). We need to make sure the same static state is carried along through these different threads.

ServiceTalk Approach

In order to accommodate the Expectations of AsyncContext we need specific behavior from AsyncContext. As described above we are after static state shared across synchronous boundaries, available across asynchronous boundaries, and is also sufficiently isolated in scope so that it can represent request/response control flow. To achieve these requirements the approach is to have AsyncContext backed by a modifiable (concurrent) map, and is associated with a Subscriber chain. Since the map is modifiable we will need to define the scope (e.g. how the state is isolated) of this static state. The AsyncContext will follow the following rule set:

  1. AsyncContext will be captured and copied at subscribe time.

    1. This provides isolation from other asynchronous operations which typically represent independent processing.

  2. AsyncContext will be saved/restored across asynchronous boundaries.

    1. This is true for asynchronous operators (e.g. flatMap) and also Executor operations.

    2. If isolation is required for a specific control flow, there will be operators (e.g. asyncContextBoundary) and utilities (e.g. Runnable asyncContextBoundary(Runnable)) to create boundaries.

The above approach will provide the isolation required so that the state set inside an offloaded HttpService#handle(..) call is visible when processing the request/response.

Due to the intricacies of control flow this mechanism is directly implemented in our operators.

Complexity of implementation

Due to the shared state across the asynchronous boundaries we need to make sure we save the current context before the boundary, save the original context before executing user code on the new thread, restore the current context while executing user code, and then restore the original context. This requires object wrapping/unwrapping and leveraging ThreadLocal (or an ContextMapHolder ) to retain the state. We also need to capture the current context at subscribe, propagate it up the operator chain, and capture it effectively in the source (or in a wrapped Subscriber just outside the source).

Understandability

The approach has a few succinct rules as to how AsyncContext propagates and isolation is achieved. It is assumed the more subtle and difficult to understand part will be due to concurrency on the underlying Map, and modifications made “later” in the control flow being visible “earlier” in the control flow. These scenarios are demonstrated in the examples below:

  • Any time a Publisher (aka stream) of data comes in to an operator, there is a possibility for concurrency on the AsyncContext map.

Publisher<String> publisher = ...;
AsyncContext.put(KEY, 10); // (1) put a value into AsyncContext before a .subscribe(..)
publisher.flatMapMergeSingle(v -> {
  Integer contextValue = AsyncContext.get(KEY);
  assert contextValue == 10 || contextValue == 30; // (2) Subscriber chain may see either value.

  // AsyncContext will be copied when Single.subscribe(..) is called. Changes to the AsyncContext map from operators on
  // the inner Single operator chain will therefore not be visible in the outer Publisher operator chain.
  return client.request(/*do something with v*/)
               .map(x -> {
                    AsyncContext.put(KEY, 20); // (3) put a new value for the same key
                    return x;
                });
}).map(v -> {
  Integer contextValue = AsyncContext.get(KEY);
  assert contextValue == 10 || contextValue == 30;

  // `publisher` may emit more items, and if it does then `flatMapMergeSingle` `Function` may be invoked concurrently
  // with this code. This is because `client.request(..)` may complete on a different thread than `publisher` is
  // delivering data on. This code has access to the same map as (2) which may result in concurrent modifications on
  // `AsyncContext`. This is allowed by `AsyncContext` but may not be obvious due to modifications made "later" in the
  // operator chain being visible "earlier" in the operator chain.
  AsyncContext.put(KEY, 30);

  return v;
})
  • Saving/restoring AsyncContext across asynchronous boundaries (e.g. Executor) may lead to modifications being visible outside the asynchronous boundary.

Executor executor = ...

AsyncContext.put(KEY, "foo");
executor.execute(() -> {
  AsyncContext.put(KEY, "bar");
});
String value = AsyncContext.get(KEY);
// value maybe "foo" or "bar" due to concurrent modifications

Cost Of Retention

This approach still requires thread local state in order to preserve state across method calls without explicitly passing it. The ThreadLocal class provides general retention of thread local state, but is backed by a Map. The frequency in which we need to save/restore the static state has been shown to introduce non-trivial costs. Since we know that all of our threads will require this thread local state we can have our threads explicitly have a AsyncContext member variable (see ContextMapHolder ). There is also additional wrapping/unwrapping introduced on the asynchronous boundaries so there is additional object allocation.

Disable AsyncContext

AsyncContext is enabled by default to accommodate for easy setup, but it can be disabled via AsyncContext.disable().