Asynchronous Context

Traditional thread-per-request libraries and applications often leverage the concept of a ThreadLocal to provide a static interface for conveying contextual information across function calls. Important use case for this include MDC, OpenTelemetry Context, etc. This ThreadLocal model allows libraries to pass information across APIs that can be oblivious to the extra information being conveyed. However, ServiceTalk is an asynchronous library and the simple ThreadLocal model does not work because many computation flows may be happening on a shared set of threads at any given time. To provide a similar experience, ServiceTalk offers a pair of abstractions, CapturedContext for capturing and restoring contextual information for use across async boundaries, and AsyncContext (which is just a ServiceTalk defined case of CapturedContext) for defining your own contextual information.

Expectations

State is shared across asynchronous boundaries

ThreadLocal state is preserved and modifiable across synchronous boundaries, and our operators should provide the same functionality. To clarify this means if someone is calling a method in the imperative style and the state is shared then the same property should hold for adjacent operators. To demonstrate this, consider the following code snippets:

Function composition

Single<String> single = ...
single.map(v -> {
  AsyncContext.put(AUTH, v.length() > 2);
  return v + "1";
}).filter(v -> {
  return isValid(v);
}) // do something else in the function composition chain

boolean isValid(String in) {
  return AsyncContext.get(AUTH) // ignore null
}

Imperative approach

void main() {
  String value = ...;
  value = doMap(value);
  if (isValid(value)) {
    // do something else
  }
}

String doMap(String in) {
  AsyncContext.put(AUTH, v.length() > 2);
  return v + "1";
}

In both cases, the result of the program is the same. Similarly, a program can use a CapturedContextProvider to save any state they desire and it will be passed along the ServiceTalk async boundaries.

Isolation and defined scope

We want a static API that provides us with access to state, but we need to define the scope for which the state is valid. A single bag of static state may become difficult to manage and reason about and also lead to memory leaks. Instead, we would like to have the scope in which the state is modifiable in be defined. For example ThreadLocal is modifiable from anywhere within the same thread. Since ServiceTalk is built on an asynchronous framework where the same thread may process multiple requests, and the same request may be processed on multiple threads this isn’t sufficient. However, what folks typically use ThreadLocal state for is to track static state per-request. The isolation and scope of state must therefore also be able to follow per-request processing through the asynchronous control flow.

Works with offloading

ServiceTalk is an asynchronous framework at its core, but in order to avoid user code blocking I/O threads we offload to other threads. This means that every time we invoke user code we may have to jump threads. It is also possible on subsequent calls for the same request we may use a different thread (although not concurrently). We need to make sure the same static state is carried along through these different threads.

Works with Third-Party Libraries

Many existing APIs assume a ThreadLocal type of state (e.g. MDC, OpenTracing). For this reason we would like to provide compatibility within our asynchronous primitives and control flow provided by operators. ServiceTalk needs to provide a way to capture third-party context and properly restore it within the reactive execution chain.

ServiceTalk Approach

In order to accommodate the Expectations of Asynchronous Context we need specific behavior from ServiceTalk. As described above, we are after a static state shared across synchronous boundaries, available across asynchronous boundaries, and is also sufficiently isolated in scope so that it can represent request/response control flow.

To achieve these requirements the approach is to define context during the subscribe process and propagate it along Subscriber chain.

ServiceTalk follows this set of rule set:

  1. Asynchronous context will be captured and copied at subscribe time.

    1. When using AsyncContext, copying provides isolation from other asynchronous operations which typically represent independent processing.

    2. If users want to share context across boundaries the shareContextOnSubscribe() operator will share the same AsyncContext instance across asynchronous operations. This necessary when chaining together different async-sources such as with the flatMap, defer, and other related operators.

  2. Asynchronous context will be saved/restored across asynchronous boundaries.

    1. This is true for asynchronous operators (e.g. flatMap) and also Executor operations.

    2. If isolation of AsyncContext is required for a specific control flow, defer(..) operators can be used to create new boundaries.

The above approach will provide the isolation required so that the state set inside an offloaded HttpService#handle(..) call is visible when processing the request/response.

Due to the intricacies of control flow this mechanism is directly implemented in our operators.

Complexity of implementation

Due to the shared state across the asynchronous boundaries we have a defined process for propagating asynchronous context:

  1. When subscribing to an asynchronous primitive ServiceTalk will capture the captured context.

  2. The captured context will be restored for use during asynchronous operations. This process has it’s own rules:

    1. Save the pre-existing context that may be present on the executing thread, including empty or null context.

    2. Restore the captured context to the executing thread.

    3. Execute the asynchronous logic which now will see the captured context state.

    4. When the async logic is complete, restore the pre-existing context to the thread, including empty or null values.

This set of rules ensures that asynchronous primitives see the correct state at all times, including during recursive calls where different parts of the reactive chain may have different context information.

Disable Asynchronous Context

Asynchronous Context is enabled by default to accommodate for easy setup, but it can be disabled via AsyncContext.disable(). Note that this disables both the AsyncContext and the CapturedContext.

AsyncContext Specifics

Understandability

The approach has a few succinct rules as to how AsyncContext propagates and isolation is achieved. It is assumed the more subtle and difficult to understand part will be due to concurrency on the underlying Map, and modifications made “later” in the control flow being visible “earlier” in the control flow. These scenarios are demonstrated in the examples below:

  • Any time a Publisher (aka stream) of data comes in to an operator, there is a possibility for concurrency on the AsyncContext map.

Publisher<String> publisher = ...;
AsyncContext.put(KEY, 10); // (1) put a value into AsyncContext before a .subscribe(..)
publisher.flatMapMergeSingle(v -> {
  Integer contextValue = AsyncContext.get(KEY);
  assert contextValue == 10 || contextValue == 30; // (2) Subscriber chain may see either value.

  // AsyncContext will be copied when Single.subscribe(..) is called. Changes to the AsyncContext map from operators on
  // the inner Single operator chain will therefore not be visible in the outer Publisher operator chain.
  return client.request(/*do something with v*/)
               .map(x -> {
                    AsyncContext.put(KEY, 20); // (3) put a new value for the same key
                    return x;
                });
}).map(v -> {
  Integer contextValue = AsyncContext.get(KEY);
  assert contextValue == 10 || contextValue == 30;

  // `publisher` may emit more items, and if it does then `flatMapMergeSingle` `Function` may be invoked concurrently
  // with this code. This is because `client.request(..)` may complete on a different thread than `publisher` is
  // delivering data on. This code has access to the same map as (2) which may result in concurrent modifications on
  // `AsyncContext`. This is allowed by `AsyncContext` but may not be obvious due to modifications made "later" in the
  // operator chain being visible "earlier" in the operator chain.
  AsyncContext.put(KEY, 30);

  return v;
})
  • Saving/restoring AsyncContext across asynchronous boundaries (e.g. Executor) may lead to modifications being visible outside the asynchronous boundary.

Executor executor = ...

AsyncContext.put(KEY, "foo");
executor.execute(() -> {
  AsyncContext.put(KEY, "bar");
});
String value = AsyncContext.get(KEY);
// value maybe "foo" or "bar" due to concurrent modifications

Cost Of Retention

This approach still requires thread local state in order to preserve state across method calls without explicitly passing it. The ThreadLocal class provides general retention of thread local state, but is backed by a Map. The frequency in which we need to save/restore the static state has been shown to introduce non-trivial costs. Since we know that all of our threads will require this thread local state we can have our threads explicitly have a AsyncContext member variable (see ContextMapHolder ). There is also additional wrapping/unwrapping introduced on the asynchronous boundaries so there is additional object allocation.

AsyncContext Examples

AsyncContext is designed to provide a static API to retain state associated across asynchronous boundaries. Motivation for providing support for AsyncContext can be found here.

High Level Usage

At a high level AsyncContext provides a Map-like API for storing static state, and is isolated/scoped for each request and response to simulate ThreadLocal storage. Here is some code

// Main.java - application logic
Single<Response> single = authenticate(client.request(...));
single.map(response -> {
  if (AsyncContext.get(USER_ID_KEY).equals("admin")) {
    // do something for admin
  } else {
    // do something for non-admin
  }
})// do something else in the function composition chain

// AuthenticationFilter.java
public static final Key<String> USER_ID_KEY = Key.newKey("userId", String.class);

public static Single<Resposne> authenticate(Single<Response> responseSingle) {
  AsyncContext.put(USER_ID_KEY, client.headers().get("userId"));
}

CapturedContext Specifics

In contrast to AsyncContext, the main use case of CapturedContext is to work with third-party context abstractions. Users can define a CapturedContextProvider which will provide a way for ServiceTalk to capture and restore third-party context along the execution chain. An important example of third-party context information is the OpenTelemetry Context. By using the CapturedContextProvider users can correctly propagate OpenTelemetry context information in a non-invasive way which makes it much more likely to work with other third-party libraries.

See the JavaDocs for CapturedContextProvider for an example of how to define the context capture and restore process.

Cost Of Retention

Unlike AsyncContext which has a well-defined cost model, the cost of saving and restoring arbitrary contextual information is not possible to predict because it is based on the third-party APIs used to access and set it. In general, good candidates for the CapturedContext model are those where capturing and setting context is 'cheap', such as saving and restoring the state of a ThreadLocal. Because of the frequency of the save and restore process in an asynchronous computation chain careful through should go into the cost of capturing and restoring third-party context.