6 #ifndef TURI_LAMBDA_WORKER_POOL_HPP 7 #define TURI_LAMBDA_WORKER_POOL_HPP 9 #include<core/system/lambda/lambda_utils.hpp> 10 #include<core/logging/assertions.hpp> 11 #include<core/storage/fileio/temp_files.hpp> 12 #include<boost/filesystem/operations.hpp> 13 #include<boost/filesystem/path.hpp> 14 #include<core/parallel/lambda_omp.hpp> 15 #include<core/parallel/pthread_tools.hpp> 16 #include<process/process.hpp> 17 #include<core/system/cppipc/client/comm_client.hpp> 18 #include<timer/timer.hpp> 22 extern double LAMBDA_WORKER_CONNECTION_TIMEOUT;
29 template<
typename ProxyType>
34 std::unique_ptr<ProxyType> proxy;
36 std::unique_ptr<cppipc::comm_client> client;
40 std::unique_ptr<process> process_;
43 static int get_next_id() {
44 static atomic<int> next_id;
45 return next_id.inc_ret_last();
60 process_->kill(
false);
72 template<
typename ProxyType>
73 std::unique_ptr<worker_process<ProxyType>> spawn_worker(std::vector<std::string> worker_binary_args,
74 std::string worker_address,
75 int connection_timeout) {
76 namespace fs = boost::filesystem;
79 ASSERT_MSG(worker_binary_args.size() >= 1,
"Unexpected number of arguments.");
80 const std::string& worker_binary = worker_binary_args.front();
81 auto the_path = fs::path(worker_binary);
82 if (!fs::exists(the_path)) {
throw std::string(
"Executable: ") + worker_binary +
" not found."; }
86 <<
" using binary: " << worker_binary << std::endl;
87 std::unique_ptr<process> new_process(
new process());
88 std::vector<std::string> args(worker_binary_args.begin() + 1, worker_binary_args.end());
89 args.push_back(worker_address);
90 if(new_process->launch(worker_binary, args) ==
false) {
91 throw(
"Fail launching lambda worker.");
100 std::unique_ptr<cppipc::comm_client> new_client;
102 auto check_process_exists = [&]() ->
bool {
104 if (!new_process->exists()) {
105 int ret_code = new_process->get_return_code();
108 <<
" terminated unexpectedly with code " << ret_code
109 <<
"; conn attempt time = " << conn_timer.
current_time()
110 <<
"; attempt count = " << retry
118 auto log_exception = [&](
const std::string& e) {
120 <<
"Error starting CPPIPC connection in connecting to lambda worker at " 123 <<
"; attempt count = " << retry <<
"): " << e
130 if(!check_process_exists()) {
break; }
135 std::unique_ptr<cppipc::comm_client> tmp_cli(
143 <<
"Connected to worker " << new_process->get_pid()
144 <<
" at " << worker_address
146 <<
"; attempt count = " << retry
149 new_client.swap(tmp_cli);
154 <<
"CPPIPC failure connecting to worker at " << worker_address
157 <<
"; attempt count = " << retry
161 }
catch (std::string error) {
162 log_exception(error +
" (0)");
163 check_process_exists();
165 }
catch (
const char* error) {
166 log_exception(std::string(error) +
" (1)");
167 check_process_exists();
169 }
catch (
const std::exception& e) {
170 check_process_exists();
171 log_exception(std::string(e.what()) +
" (2)");
173 check_process_exists();
174 log_exception(
"Unknown Error");
179 if(!check_process_exists()) {
break; }
182 if(LAMBDA_WORKER_CONNECTION_TIMEOUT >= 0
183 && (conn_timer.
current_time() >= LAMBDA_WORKER_CONNECTION_TIMEOUT)) {
186 <<
"Timeout connecting to lambda worker process" << new_process->get_pid()
187 <<
"; conn attempt time = " << conn_timer.
current_time()
188 <<
"; timeout = " << LAMBDA_WORKER_CONNECTION_TIMEOUT
189 <<
"; retry count = " << retry
195 if (new_client ==
nullptr) {
196 throw std::string(
"Failure launching lambda workers; see log for details. ");
200 std::unique_ptr<ProxyType> new_proxy(
new ProxyType(*new_client));
204 ret->proxy.swap(new_proxy);
205 ret->client.swap(new_client);
206 ret->address = worker_address;
207 ret->process_.swap(new_process);
220 template<
typename ProxyType>
221 std::unique_ptr<worker_process<ProxyType>> try_spawn_worker(std::vector<std::string> worker_binary_args,
222 std::string worker_address,
223 int connection_timeout) noexcept {
225 return spawn_worker<ProxyType>(worker_binary_args, worker_address, connection_timeout);
226 }
catch(std::string e) {
228 }
catch(
const char* e) {
231 std::exception_ptr eptr = std::current_exception();
235 std::rethrow_exception(eptr);
237 }
catch (
const std::exception& e) {
245 template<
typename ProxyType>
286 template<
typename ProxyType>
300 std::unique_lock<turi::mutex> lck(m_mutex);
302 auto worker = std::move(m_available_workers.front());
303 m_available_workers.pop_front();
313 return std::make_shared<worker_guard<ProxyType>>(
this, worker);
323 std::unique_lock<turi::mutex> lck(m_mutex);
324 if (check_alive(worker) ==
true) {
326 m_available_workers.push_back(std::move(worker));
330 worker.reset(
nullptr);
332 auto new_worker = try_spawn_worker<ProxyType>(m_worker_binary_and_args, new_worker_address(), m_connection_timeout);
333 if (new_worker !=
nullptr) {
335 m_available_workers.push_back(std::move(new_worker));
339 << m_num_workers << std::endl;
355 std::unique_lock<turi::mutex> lck(m_mutex);
356 return m_available_workers.size();
363 template<
typename RetType,
typename Fn>
365 std::unique_lock<turi::mutex> lck(m_mutex);
369 std::vector<std::unique_ptr<worker_process<ProxyType>>> temp_workers;
370 for (
size_t i = 0; i < m_num_workers; ++i) {
371 temp_workers.push_back(std::move(m_available_workers.front()));
372 m_available_workers.pop_front();
381 std::vector<std::shared_ptr<worker_guard<ProxyType>>> guards;
382 for (
auto& worker: temp_workers)
383 guards.push_back(get_worker_guard(worker));
384 std::vector<RetType> ret(m_num_workers);
387 ret[i] = f(temp_workers[i]->proxy);
388 }
catch (cppipc::ipcexception e) {
389 throw reinterpret_comm_failure(e);
394 ASSERT_EQ(m_available_workers.size(), m_num_workers);
395 std::rethrow_exception(std::current_exception());
401 std::vector<std::string> worker_binary_and_args,
402 int connection_timeout = 3) {
403 m_connection_timeout = connection_timeout;
404 m_worker_binary_and_args = worker_binary_and_args;
411 std::unique_lock<turi::mutex> lck(m_mutex);
415 parallel_for(0, m_available_workers.size(), [&](
size_t i) {
416 m_available_workers[i].reset();
424 void wait_for_all(std::unique_lock<turi::mutex>& lck) {
425 while (m_available_workers.size() < m_num_workers || m_num_workers == 0) {
428 if (m_num_workers == 0) {
429 throw(
"Worker pool is empty");
436 void wait_for_one(std::unique_lock<turi::mutex>& lck) {
437 while (m_available_workers.empty() || m_num_workers == 0) {
440 if (m_num_workers == 0) {
441 throw(
"Worker pool is empty");
449 return (worker->process_ !=
nullptr) && worker->process_->exists();
455 std::string new_worker_address() {
462 void init(
size_t num_workers) {
464 auto new_worker = try_spawn_worker<ProxyType>(m_worker_binary_and_args,
465 new_worker_address(),
466 m_connection_timeout);
467 if (new_worker !=
nullptr) {
468 std::unique_lock<turi::mutex> lck(m_mutex);
469 m_available_workers.push_back(std::move(new_worker));
474 if (m_num_workers == 0) {
475 log_and_throw(
"Cannot evaluate lambda. No Lambda workers have been successfully started.");
476 }
else if (m_num_workers < num_workers) {
478 <<
"Using only " << m_num_workers <<
" workers." << std::endl;
480 <<
"lambda operations will not be able to use all " 481 <<
"available cores." << std::endl;
483 <<
"Using only " << m_num_workers << std::endl;
488 std::vector<std::string> m_worker_binary_and_args;
489 int m_connection_timeout;
490 std::deque<std::unique_ptr<worker_process<ProxyType>>> m_available_workers;
491 size_t m_num_workers;
502 template<
typename ProxyType>
507 : m_pool(worker_pool_), m_worker(worker_process_) { }
510 m_pool->release_worker(m_worker);
515 std::unique_ptr<worker_process<ProxyType>>& m_worker;
void start()
Reset the timer.
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
worker_pool(size_t num_workers, std::vector< std::string > worker_binary_and_args, int connection_timeout=3)
constructor
size_t num_workers() const
std::string get_temp_name(const std::string &prefix="", bool _prefer_hdfs=false)
std::shared_ptr< worker_guard< ProxyType > > get_worker_guard(std::unique_ptr< worker_process< ProxyType >> &worker)
std::vector< RetType > call_all_workers(Fn f)
double current_time() const
Returns the elapsed time in seconds since turi::timer::start was last called.
#define logprogress_stream
void release_worker(std::unique_ptr< worker_process< ProxyType >> &worker)
std::string reply_status_to_string(reply_status)
std::unique_ptr< worker_process< ProxyType > > get_worker()
size_t num_available_workers()
A simple class that can be used for benchmarking/timing up to microsecond resolution.