Turi Create  4.0
turi::lambda::worker_pool< ProxyType > Class Template Reference

#include <core/system/lambda/worker_pool.hpp>

Public Member Functions

std::unique_ptr< worker_process< ProxyType > > get_worker ()
 
std::shared_ptr< worker_guard< ProxyType > > get_worker_guard (std::unique_ptr< worker_process< ProxyType >> &worker)
 
void release_worker (std::unique_ptr< worker_process< ProxyType >> &worker)
 
size_t num_workers () const
 
size_t num_available_workers ()
 
template<typename RetType , typename Fn >
std::vector< RetType > call_all_workers (Fn f)
 
 worker_pool (size_t num_workers, std::vector< std::string > worker_binary_and_args, int connection_timeout=3)
 constructor
 
 ~worker_pool ()
 destructor
 

Detailed Description

template<typename ProxyType>
class turi::lambda::worker_pool< ProxyType >

Manage a list of worker_processes.

  • Initialize: The pool is initialized with a fixed number of workers. Due to system resource limitation, the actual pool may contain less workers than intended.
  • Acquire worker: User request a worker process by calling get_worker(), which returns a unique_ptr and transfers the ownership of the worker process.
  • Release worker: After the use of the worker, The requested worker_process must be returned to the worker_pool by calling release_worker(). Alternatively, use the RAII pattern by calling get_worker_guard() to guarantee return of the resource.

get_worker()/release_worker() are thread safe.

  • Worker crash recovery: On release_worker(), if the worker_process is dead, a new worker_process will be started and released back to the pool. In the worst case where no new process can be started, the pool size will be decreased.
  • Call all workers: You can issue a function call to all workers using the call_all_workers() function. It will block until all workers become avaialble, call the function on all workers, and block until all the call finishes or exception is thrown. During the call if the worker_process is dead, replace the dead worker with new process.
  • Hang It is extremely important to guarantee the release of a worker after use, or the pool will hang on termination or barrier function such as call_all_workers().

Definition at line 287 of file worker_pool.hpp.

Member Function Documentation

◆ call_all_workers()

template<typename ProxyType>
template<typename RetType , typename Fn >
std::vector<RetType> turi::lambda::worker_pool< ProxyType >::call_all_workers ( Fn  f)
inline

Call the function on all worker in parallel and return the results. Block until all workers are available.

Definition at line 364 of file worker_pool.hpp.

◆ get_worker()

template<typename ProxyType>
std::unique_ptr<worker_process<ProxyType> > turi::lambda::worker_pool< ProxyType >::get_worker ( )
inline

Return the next available worker. Block until any worker is available. Throws error if worker_pool has zero workers.

Note
: This function must be used in pair with release_worker(), or get the RAII protector using get_worker_guard() when release_worker() cannot be called reliably. Otherwise, the worker_pool may hang waiting for worker to return.

Definition at line 299 of file worker_pool.hpp.

◆ get_worker_guard()

template<typename ProxyType>
std::shared_ptr<worker_guard<ProxyType> > turi::lambda::worker_pool< ProxyType >::get_worker_guard ( std::unique_ptr< worker_process< ProxyType >> &  worker)
inline

Returns a worker_guard for the given worker. When the worker_guard goes out of the scope, the guarded worker will be automatically released.

Definition at line 312 of file worker_pool.hpp.

◆ num_available_workers()

template<typename ProxyType>
size_t turi::lambda::worker_pool< ProxyType >::num_available_workers ( )
inline

Return number of avaiable workers in the pool.

Definition at line 354 of file worker_pool.hpp.

◆ num_workers()

template<typename ProxyType>
size_t turi::lambda::worker_pool< ProxyType >::num_workers ( ) const
inline

Return number of total workers in the pool.

Definition at line 349 of file worker_pool.hpp.

◆ release_worker()

template<typename ProxyType>
void turi::lambda::worker_pool< ProxyType >::release_worker ( std::unique_ptr< worker_process< ProxyType >> &  worker)
inline

Put the worker back to the availablity queue. If the worker process is dead, try replace with a new worker process. If a new worker process cannot be started, decrease the pool size.

Definition at line 321 of file worker_pool.hpp.


The documentation for this class was generated from the following file: