14 #include <core/util/std/make_unique.hpp> 15 #include <core/util/Verify.hpp> 19 namespace neural_net {
33 class FuturesSubscriber :
public Subscriber<T> {
48 std::future<std::unique_ptr<T>>
Request() {
49 std::promise<std::unique_ptr<T>> promise;
50 auto future = promise.get_future();
53 promise.set_exception(failure_);
54 }
else if (completed_) {
56 promise.set_value(
nullptr);
59 promises_.push(std::move(promise));
61 subscription_->Request(
Demand(1));
68 if (completed_)
return;
74 subscription_->Cancel();
75 subscription_.reset();
79 while (!promises_.empty()) {
80 promises_.front().set_value(
nullptr);
85 void Receive(std::shared_ptr<Subscription> subscription)
override {
88 VerifyIsTrue(subscription_ ==
nullptr, TuriErrorCode::LogicError);
92 if (subscription_ || completed_) {
93 subscription->Cancel();
97 subscription_ = std::move(subscription);
100 if (!promises_.empty()) {
101 Demand demand(static_cast<int>(promises_.size()));
102 subscription_->Request(demand);
108 if (completed_)
return Demand::None();
111 auto input = std::make_unique<Input>(std::move(element));
112 promises_.front().set_value(std::move(input));
114 return Demand::None();
119 if (!completion.IsFinished()) {
120 failure_ = completion.
failure();
124 while (!promises_.empty()) {
125 auto promise = std::move(promises_.front());
128 promise.set_exception(completion.
failure());
130 promise.set_value(
nullptr);
136 std::shared_ptr<Subscription> subscription_;
138 std::queue<std::promise<std::unique_ptr<T>>> promises_;
140 bool completed_ =
false;
141 std::exception_ptr failure_;
148 template <
typename T>
152 : subscriber_(std::move(subscriber)) {}
156 std::future<std::unique_ptr<T>> Next() {
return subscriber_->Request(); }
159 std::shared_ptr<FuturesSubscriber<T>> subscriber_;
void Receive(Completion completion) override
std::exception_ptr failure() const
Demand Receive(Input element) override
std::future< std::unique_ptr< T > > Request()
void Receive(std::shared_ptr< Subscription > subscription) override