Class SubscriberUtils
- java.lang.Object
-
- io.servicetalk.concurrent.internal.SubscriberUtils
-
public final class SubscriberUtils extends java.lang.Object
A set of utilities for commonPublisherSource.Subscriber
tasks.
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> int
calculateSourceRequested(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> requestNUpdater, java.util.concurrent.atomic.AtomicLongFieldUpdater<T> sourceRequestedUpdater, java.util.concurrent.atomic.AtomicLongFieldUpdater<T> emittedUpdater, int limit, T owner)
Attempts to incrementsourceRequestedUpdater
in order to make it the same asrequestNUpdater
while not exceeding thelimit
.static boolean
checkDuplicateSubscription(PublisherSource.Subscription existing, PublisherSource.Subscription next)
Checks for an already existingPublisherSource.Subscription
and if one is given callsCancellable.cancel()
onnext
and returnsfalse
.static void
deliverCompleteFromSource(CompletableSource.Subscriber subscriber)
Deliver a terminal complete to aCompletableSource.Subscriber
that has not yet hadCompletableSource.Subscriber.onSubscribe(Cancellable)
called.static <T> void
deliverCompleteFromSource(PublisherSource.Subscriber<T> subscriber)
Deliver a terminal complete to aPublisherSource.Subscriber
that has not yet hadPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
called.static void
deliverErrorFromSource(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
Deliver a terminal error to aCompletableSource.Subscriber
that has not yet hadCompletableSource.Subscriber.onSubscribe(Cancellable)
called.static <T> void
deliverErrorFromSource(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Deliver a terminal error to aPublisherSource.Subscriber
that has not yet hadPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
called.static <T> void
deliverErrorFromSource(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Deliver a terminal error to aSingleSource.Subscriber
that has not yet hadSingleSource.Subscriber.onSubscribe(Cancellable)
called.static <T> void
deliverSuccessFromSource(SingleSource.Subscriber<T> subscriber, T value)
InvokesSingleSource.Subscriber.onSuccess(Object)
ignoring an occurred exception if any.static void
handleExceptionFromOnSubscribe(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
Handle the case when a call toCompletableSource.Subscriber.onSubscribe(Cancellable)
throws from a source.static <T> void
handleExceptionFromOnSubscribe(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Handle the case when a call toPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
throws from a source.static <T> void
handleExceptionFromOnSubscribe(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Handle the case when a call toSingleSource.Subscriber.onSubscribe(Cancellable)
throws from a source.static boolean
isRequestNValid(long n)
Returnsfalse
if the requested amount of elementsn
is not-positive,true
otherwise.static java.lang.IllegalArgumentException
newExceptionForInvalidRequestN(long n)
Create a new exception for an invalid amount ofPublisherSource.Subscription.request(long)
according to Reactive Streams, Rule 3.9.static void
safeCancel(Cancellable cancellable)
InvokesCancellable.cancel()
ignoring any exceptions that are thrown.static void
safeOnComplete(CompletableSource.Subscriber subscriber)
InvokesCompletableSource.Subscriber.onComplete()
ignoring an occurred exception if any.static <T> void
safeOnComplete(PublisherSource.Subscriber<T> subscriber)
InvokesPublisherSource.Subscriber.onComplete()
ignoring an occurred exception if any.static void
safeOnError(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
InvokesCompletableSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.static <T> void
safeOnError(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
InvokesPublisherSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.static <T> void
safeOnError(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
InvokesSingleSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.static <T> void
safeOnSuccess(SingleSource.Subscriber<T> subscriber, T value)
InvokesSingleSource.Subscriber.onSuccess(Object)
ignoring an occurred exception if any.static <R> boolean
trySetTerminal(TerminalNotification toSet, boolean overrideComplete, java.util.concurrent.atomic.AtomicReferenceFieldUpdater<R,TerminalNotification> terminalNotificationUpdater, R flagOwner)
There are some scenarios where a completionTerminalNotification
can be overridden with an error if errors are produced asynchronously.
-
-
-
Method Detail
-
checkDuplicateSubscription
public static boolean checkDuplicateSubscription(@Nullable PublisherSource.Subscription existing, PublisherSource.Subscription next)
Checks for an already existingPublisherSource.Subscription
and if one is given callsCancellable.cancel()
onnext
and returnsfalse
.- Parameters:
existing
- the existingPublisherSource.Subscription
ornull
if none exists.next
- the nextPublisherSource.Subscription
to use.- Returns:
true
if noPublisherSource.Subscription
exists,false
otherwise.
-
isRequestNValid
public static boolean isRequestNValid(long n)
Returnsfalse
if the requested amount of elementsn
is not-positive,true
otherwise.- Parameters:
n
- the number of elements to request.- Returns:
false
if the requested amount of elementsn
is not-positive,true
otherwise.
-
newExceptionForInvalidRequestN
public static java.lang.IllegalArgumentException newExceptionForInvalidRequestN(long n)
Create a new exception for an invalid amount ofPublisherSource.Subscription.request(long)
according to Reactive Streams, Rule 3.9.- Parameters:
n
- the invalid request count.- Returns:
- The exception which clarifies the invalid behavior.
-
calculateSourceRequested
public static <T> int calculateSourceRequested(java.util.concurrent.atomic.AtomicLongFieldUpdater<T> requestNUpdater, java.util.concurrent.atomic.AtomicLongFieldUpdater<T> sourceRequestedUpdater, java.util.concurrent.atomic.AtomicLongFieldUpdater<T> emittedUpdater, int limit, T owner)
Attempts to incrementsourceRequestedUpdater
in order to make it the same asrequestNUpdater
while not exceeding thelimit
.- Type Parameters:
T
- The type of object which owns the atomic updater parameters.- Parameters:
requestNUpdater
- The total number which has been requested (typically fromPublisherSource.Subscription.request(long)
).sourceRequestedUpdater
- The total number which has actually been passed toPublisherSource.Subscription.request(long)
. This outstanding countsourceRequestedUpdater() - emittedUpdater.get()
should never exceedlimit
.emittedUpdater
- The amount of data that has been emitted/delivered by the source.limit
- The maximum outstanding demand from the source at any given time.owner
- The object which all atomic updater parameters are associated with.- Returns:
- The amount that
sourceRequestedUpdater
was increased by. This value is typically used to callPublisherSource.Subscription.request(long)
.
-
trySetTerminal
public static <R> boolean trySetTerminal(TerminalNotification toSet, boolean overrideComplete, java.util.concurrent.atomic.AtomicReferenceFieldUpdater<R,TerminalNotification> terminalNotificationUpdater, R flagOwner)
There are some scenarios where a completionTerminalNotification
can be overridden with an error if errors are produced asynchronously.This method helps set
TerminalNotification
atomically providing such an override.- Type Parameters:
R
- Type offlagOwner
.- Parameters:
toSet
-TerminalNotification
to set.overrideComplete
- Whether exisitingTerminalNotification.complete()
should be overridden with thetoSet
.terminalNotificationUpdater
-AtomicReferenceFieldUpdater
to access the currentTerminalNotification
.flagOwner
- instance ofSubscriberUtils
that owns the currentTerminalNotification
field referenced byterminalNotificationUpdater
.- Returns:
true
iftoSet
is updated as the currentTerminalNotification
.
-
deliverCompleteFromSource
public static <T> void deliverCompleteFromSource(PublisherSource.Subscriber<T> subscriber)
Deliver a terminal complete to aPublisherSource.Subscriber
that has not yet hadPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
called.- Type Parameters:
T
- The type ofPublisherSource.Subscriber
.- Parameters:
subscriber
- ThePublisherSource.Subscriber
to terminate.
-
deliverSuccessFromSource
public static <T> void deliverSuccessFromSource(SingleSource.Subscriber<T> subscriber, @Nullable T value)
InvokesSingleSource.Subscriber.onSuccess(Object)
ignoring an occurred exception if any.- Type Parameters:
T
- The type ofSingleSource.Subscriber
.- Parameters:
subscriber
- TheSingleSource.Subscriber
that may throw an exception fromSingleSource.Subscriber.onSuccess(Object)
.value
- The value to pass toSingleSource.Subscriber.onSuccess(Object)
.
-
deliverCompleteFromSource
public static void deliverCompleteFromSource(CompletableSource.Subscriber subscriber)
Deliver a terminal complete to aCompletableSource.Subscriber
that has not yet hadCompletableSource.Subscriber.onSubscribe(Cancellable)
called.- Parameters:
subscriber
- TheCompletableSource.Subscriber
to terminate.
-
deliverErrorFromSource
public static <T> void deliverErrorFromSource(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Deliver a terminal error to aPublisherSource.Subscriber
that has not yet hadPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
called.- Type Parameters:
T
- The type ofPublisherSource.Subscriber
.- Parameters:
subscriber
- ThePublisherSource.Subscriber
to terminate.cause
- The terminal event.
-
deliverErrorFromSource
public static <T> void deliverErrorFromSource(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Deliver a terminal error to aSingleSource.Subscriber
that has not yet hadSingleSource.Subscriber.onSubscribe(Cancellable)
called.- Type Parameters:
T
- The type ofSingleSource.Subscriber
.- Parameters:
subscriber
- TheSingleSource.Subscriber
to terminate.cause
- The terminal event.
-
deliverErrorFromSource
public static void deliverErrorFromSource(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
Deliver a terminal error to aCompletableSource.Subscriber
that has not yet hadCompletableSource.Subscriber.onSubscribe(Cancellable)
called.- Parameters:
subscriber
- TheCompletableSource.Subscriber
to terminate.cause
- The terminal event.
-
handleExceptionFromOnSubscribe
public static <T> void handleExceptionFromOnSubscribe(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Handle the case when a call toPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
throws from a source.- Type Parameters:
T
- The type ofPublisherSource.Subscriber
.- Parameters:
subscriber
- ThePublisherSource.Subscriber
that threw an exception fromPublisherSource.Subscriber.onSubscribe(PublisherSource.Subscription)
.cause
- The exception thrown bysubscriber
.
-
handleExceptionFromOnSubscribe
public static <T> void handleExceptionFromOnSubscribe(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
Handle the case when a call toSingleSource.Subscriber.onSubscribe(Cancellable)
throws from a source.- Type Parameters:
T
- The type ofSingleSource.Subscriber
.- Parameters:
subscriber
- TheSingleSource.Subscriber
that threw an exception fromSingleSource.Subscriber.onSubscribe(Cancellable)
.cause
- The exception thrown bysubscriber
.
-
handleExceptionFromOnSubscribe
public static void handleExceptionFromOnSubscribe(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
Handle the case when a call toCompletableSource.Subscriber.onSubscribe(Cancellable)
throws from a source.- Parameters:
subscriber
- TheCompletableSource.Subscriber
that threw an exception fromCompletableSource.Subscriber.onSubscribe(Cancellable)
.cause
- The exception thrown bysubscriber
.
-
safeOnError
public static void safeOnError(CompletableSource.Subscriber subscriber, java.lang.Throwable cause)
InvokesCompletableSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.- Parameters:
subscriber
- TheCompletableSource.Subscriber
that may throw an exception fromCompletableSource.Subscriber.onError(Throwable)
.cause
- The occurredThrowable
forCompletableSource.Subscriber.onError(Throwable)
.
-
safeOnError
public static <T> void safeOnError(SingleSource.Subscriber<T> subscriber, java.lang.Throwable cause)
InvokesSingleSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.- Type Parameters:
T
- The type ofSingleSource.Subscriber
.- Parameters:
subscriber
- TheSingleSource.Subscriber
that may throw an exception fromSingleSource.Subscriber.onError(Throwable)
.cause
- The occurredThrowable
forSingleSource.Subscriber.onError(Throwable)
.
-
safeOnError
public static <T> void safeOnError(PublisherSource.Subscriber<T> subscriber, java.lang.Throwable cause)
InvokesPublisherSource.Subscriber.onError(Throwable)
ignoring an occurred exception if any.- Type Parameters:
T
- The type ofPublisherSource.Subscriber
.- Parameters:
subscriber
- ThePublisherSource.Subscriber
that may throw an exception fromPublisherSource.Subscriber.onError(Throwable)
.cause
- The occurredThrowable
forPublisherSource.Subscriber.onError(Throwable)
.
-
safeOnComplete
public static <T> void safeOnComplete(PublisherSource.Subscriber<T> subscriber)
InvokesPublisherSource.Subscriber.onComplete()
ignoring an occurred exception if any.- Type Parameters:
T
- The type ofPublisherSource.Subscriber
.- Parameters:
subscriber
- ThePublisherSource.Subscriber
that may throw an exception fromPublisherSource.Subscriber.onComplete()
.
-
safeOnSuccess
public static <T> void safeOnSuccess(SingleSource.Subscriber<T> subscriber, @Nullable T value)
InvokesSingleSource.Subscriber.onSuccess(Object)
ignoring an occurred exception if any.- Type Parameters:
T
- The type ofSingleSource.Subscriber
.- Parameters:
subscriber
- TheSingleSource.Subscriber
that may throw an exception fromSingleSource.Subscriber.onSuccess(Object)
.value
- The value to pass toSingleSource.Subscriber.onSuccess(Object)
.
-
safeOnComplete
public static void safeOnComplete(CompletableSource.Subscriber subscriber)
InvokesCompletableSource.Subscriber.onComplete()
ignoring an occurred exception if any.- Parameters:
subscriber
- TheCompletableSource.Subscriber
that may throw an exception fromCompletableSource.Subscriber.onComplete()
.
-
safeCancel
public static void safeCancel(Cancellable cancellable)
InvokesCancellable.cancel()
ignoring any exceptions that are thrown.- Parameters:
cancellable
- TheCancellable
tocancel
.
-
-