8 #ifndef ML_NEURAL_NET_COMBINE_ITERATOR_HPP_ 9 #define ML_NEURAL_NET_COMBINE_ITERATOR_HPP_ 13 #include <type_traits> 18 namespace neural_net {
31 class Iterator :
public std::enable_shared_from_this<Iterator<T>> {
43 virtual bool HasNext()
const = 0;
46 virtual Output
Next() = 0;
50 return std::make_shared<IteratorPublisher<T>>(this->shared_from_this());
57 template <
typename Callable>
59 :
public Iterator<typename std::result_of<Callable()>::type> {
61 using Output =
typename std::result_of<Callable()>::type;
65 bool HasNext()
const override {
return true; }
67 Output
Next()
override {
return impl_(); }
73 template <
typename Callable>
74 std::shared_ptr<IteratorPublisher<typename std::result_of<Callable()>::type>>
75 CreatePublisherFromCallable(Callable impl) {
76 return std::make_shared<CallableIterator<Callable>>(std::move(impl))
92 : iterator_(std::move(iterator)) {}
96 std::make_shared<IteratorSubscription>(subscriber, iterator_);
97 subscriber->Receive(std::move(subscription));
108 : subscriber_(std::move(subscriber)), iterator_(std::move(iterator)) {}
110 bool IsActive()
const {
return subscriber_ !=
nullptr; }
112 void Cancel()
override { subscriber_.reset(); }
114 void Request(
Demand demand)
override {
117 while (IsActive() && !demand.IsNone()) {
122 using OutputStorage =
123 typename std::aligned_storage<
sizeof(Output),
124 alignof(Output)>::type;
125 OutputStorage value_storage;
126 Output* value =
nullptr;
127 std::exception_ptr failure;
129 if (iterator_->HasNext()) {
131 value =
reinterpret_cast<Output*
>(&value_storage);
132 new (value) Output(iterator_->Next());
138 failure = std::current_exception();
153 Demand new_demand = subscriber_->Receive(std::move(*value));
154 demand.
Add(new_demand);
163 std::shared_ptr<Subscriber<Output>> subscriber_;
164 std::shared_ptr<Iterator<Output>> iterator_;
167 std::shared_ptr<Iterator<Output>> iterator_;
173 #endif // ML_NEURAL_NET_COMBINE_ITERATOR_HPP_ virtual bool HasNext() const =0
static Completion Finished()
bool HasNext() const override
void Receive(std::shared_ptr< Subscriber< Output >> subscriber) override
Demand & Add(Demand other)
std::shared_ptr< IteratorPublisher< T > > AsPublisher()
static Completion Failure(std::exception_ptr e)