8 #ifndef ML_NEURAL_NET_COMBINE_BASE_HPP_ 9 #define ML_NEURAL_NET_COMBINE_BASE_HPP_ 21 #include <ml/neural_net/TaskQueue.hpp> 24 namespace neural_net {
29 template <
typename T,
typename Callable>
38 template <
typename T,
typename U>
47 template <
typename T,
typename U>
62 bool IsUnlimited()
const {
return max_ < 0; }
63 bool IsNone()
const {
return max_ == 0; }
66 int max()
const {
return max_; }
70 if (IsUnlimited() || other.IsUnlimited()) {
109 virtual void Cancel() = 0;
121 virtual void Request(
Demand demand) = 0;
139 bool IsFinished()
const {
return failure_ ==
nullptr; }
142 std::exception_ptr
failure()
const {
return failure_; }
145 explicit Completion(std::exception_ptr e =
nullptr) : failure_(e) {}
147 std::exception_ptr failure_;
157 template <
typename T>
176 virtual void Receive(std::shared_ptr<Subscription> subscription) = 0;
193 virtual void Receive(
Completion completion) = 0;
213 template <
typename T>
214 class Publisher :
public std::enable_shared_from_this<Publisher<T>> {
235 Receive(std::move(subscriber));
238 std::shared_ptr<FuturesStream<Output>> AsFutures() {
239 auto subscriber = std::make_shared<FuturesSubscriber<Output>>();
240 Subscribe(subscriber);
241 return std::make_shared<FuturesStream<Output>>(std::move(subscriber));
244 template <
typename TransformType>
245 std::shared_ptr<Publisher<typename TransformType::Output>> Map(
246 std::shared_ptr<TransformType>
transform) {
247 using TransformInput =
typename TransformType::Input;
248 using TransformOutput =
typename TransformType::Output;
249 return std::make_shared<MapPublisher<TransformInput, TransformOutput>>(
250 this->shared_from_this(), std::move(transform));
253 template <
typename Callable>
254 std::shared_ptr<Publisher<typename std::result_of<Callable(Output)>::type>>
257 auto transform = std::make_shared<TransformType>(std::move(fn));
258 return Map(std::move(transform));
261 std::shared_ptr<Publisher<Output>> SubscribeOn(
262 std::shared_ptr<TaskQueue> queue) {
263 return std::make_shared<SubscribeOnQueuePublisher<Output>>(
264 this->shared_from_this(), std::move(queue));
267 std::shared_ptr<Publisher<Output>> ReceiveOn(
268 std::shared_ptr<TaskQueue> queue) {
269 return std::make_shared<ReceiveOnQueuePublisher<Output>>(
270 this->shared_from_this(), std::move(queue));
277 #endif // ML_NEURAL_NET_COMBINE_BASE_HPP_
static Completion Finished()
std::exception_ptr failure() const
Demand & Add(Demand other)
static Completion Failure(std::exception_ptr e)
void transform(S &&input, T &&output, TransformFn transformfn, std::set< size_t > constraint_segments=std::set< size_t >())