Turi Create  4.0
worker_pool.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_LAMBDA_WORKER_POOL_HPP
7 #define TURI_LAMBDA_WORKER_POOL_HPP
8 
9 #include<core/system/lambda/lambda_utils.hpp>
10 #include<core/logging/assertions.hpp>
11 #include<core/storage/fileio/temp_files.hpp>
12 #include<boost/filesystem/operations.hpp>
13 #include<boost/filesystem/path.hpp>
14 #include<core/parallel/lambda_omp.hpp>
15 #include<core/parallel/pthread_tools.hpp>
16 #include<process/process.hpp>
17 #include<core/system/cppipc/client/comm_client.hpp>
18 #include<timer/timer.hpp>
19 
20 namespace turi {
21 
22 extern double LAMBDA_WORKER_CONNECTION_TIMEOUT;
23 
24 namespace lambda {
25 
26 /**
27  * This class manages the all the resources of a single lambda worker.
28  */
29 template<typename ProxyType>
31  // worker_id. starts from 0 and increments by one
32  size_t id;
33  // cppipc proxy object
34  std::unique_ptr<ProxyType> proxy;
35  // cppipc comm_client
36  std::unique_ptr<cppipc::comm_client> client;
37  // cppipc address
38  std::string address;
39  // process object
40  std::unique_ptr<process> process_;
41 
42  // next avaiable worker id
43  static int get_next_id() {
44  static atomic<int> next_id;
45  return next_id.inc_ret_last();
46  }
47 
48  // constructor
49  worker_process() {
50  id = get_next_id();
51  }
52 
53  // destructor
54  ~worker_process() noexcept {
55  logstream(LOG_INFO) << "Destroying worker_process " << id << std::endl;
56  try {
57  proxy.reset();
58  client->stop();
59  client.reset();
60  process_->kill(false);
61  process_.reset();
62  } catch (...) {
63  logstream(LOG_ERROR) << "Exception in destroying worker_process " << id << std::endl;
64  }
65  }
66 }; // end of worker_process
67 
68 /**
69  * Create a worker process using given binary path and the worker_address.
70  * May throw exception on error.
71  */
72 template<typename ProxyType>
73 std::unique_ptr<worker_process<ProxyType>> spawn_worker(std::vector<std::string> worker_binary_args,
74  std::string worker_address,
75  int connection_timeout) {
76  namespace fs = boost::filesystem;
77 
78  // Sanity check arguments
79  ASSERT_MSG(worker_binary_args.size() >= 1, "Unexpected number of arguments.");
80  const std::string& worker_binary = worker_binary_args.front();
81  auto the_path = fs::path(worker_binary);
82  if (!fs::exists(the_path)) { throw std::string("Executable: ") + worker_binary + " not found."; }
83 
84  // Step 1: start a new process
85  logstream(LOG_INFO) << "Start lambda worker at " << worker_address
86  << " using binary: " << worker_binary << std::endl;
87  std::unique_ptr<process> new_process(new process());
88  std::vector<std::string> args(worker_binary_args.begin() + 1, worker_binary_args.end());
89  args.push_back(worker_address);
90  if(new_process->launch(worker_binary, args) == false) {
91  throw("Fail launching lambda worker.");
92  }
93 
94  // Step 2: create cppipc client and connect it to the launched process
95  size_t retry = 0;
96 
97  timer conn_timer;
98  conn_timer.start();
99 
100  std::unique_ptr<cppipc::comm_client> new_client;
101 
102  auto check_process_exists = [&]() -> bool {
103  // Make sure the server process is still around.
104  if (!new_process->exists()) {
105  int ret_code = new_process->get_return_code();
106 
107  logstream(LOG_ERROR) << "Lambda worker process " << new_process->get_pid()
108  << " terminated unexpectedly with code " << ret_code
109  << "; conn attempt time = " << conn_timer.current_time()
110  << "; attempt count = " << retry
111  << std::endl;
112  return false;
113  } else {
114  return true;
115  }
116  };
117 
118  auto log_exception = [&](const std::string& e) {
120  << "Error starting CPPIPC connection in connecting to lambda worker at "
121  << worker_address
122  << " (conn_time = " << conn_timer.current_time()
123  << "; attempt count = " << retry << "): " << e
124  << std::endl;
125  };
126 
127  while (true) {
128 
129  // Do initial check to make sure the process exists.
130  if(!check_process_exists()) { break; }
131 
132  // Try connecting to the server
133  retry++;
134  try {
135  std::unique_ptr<cppipc::comm_client> tmp_cli(
136  new cppipc::comm_client({}, worker_address, connection_timeout));
137 
138  cppipc::reply_status status = tmp_cli->start();
139 
140  if (status == cppipc::reply_status::OK) {
141  // Success
143  << "Connected to worker " << new_process->get_pid()
144  << " at " << worker_address
145  << "; conn_time = " << conn_timer.current_time()
146  << "; attempt count = " << retry
147  << std::endl;
148 
149  new_client.swap(tmp_cli);
150  break;
151  } else {
152  // Connecting to server failed
154  << "CPPIPC failure connecting to worker at " << worker_address
155  << ". status = " << cppipc::reply_status_to_string(status)
156  << "; conn_time = " << conn_timer.current_time()
157  << "; attempt count = " << retry
158  << std::endl;
159  }
160  // Exception happened during comm_client construction/starting
161  } catch (std::string error) {
162  log_exception(error + " (0)");
163  check_process_exists();
164  break;
165  } catch (const char* error) {
166  log_exception(std::string(error) + " (1)");
167  check_process_exists();
168  break;
169  } catch (const std::exception& e) {
170  check_process_exists();
171  log_exception(std::string(e.what()) + " (2)");
172  } catch (...) {
173  check_process_exists();
174  log_exception("Unknown Error");
175  break;
176  }
177  // Check again if the process exists. Thus we are less likely to
178  // error out on timeout if the process actually crashed.
179  if(!check_process_exists()) { break; }
180 
181  // Exit if we are out of time.
182  if(LAMBDA_WORKER_CONNECTION_TIMEOUT >= 0
183  && (conn_timer.current_time() >= LAMBDA_WORKER_CONNECTION_TIMEOUT)) {
184 
186  << "Timeout connecting to lambda worker process" << new_process->get_pid()
187  << "; conn attempt time = " << conn_timer.current_time()
188  << "; timeout = " << LAMBDA_WORKER_CONNECTION_TIMEOUT
189  << "; retry count = " << retry
190  << std::endl;
191  break;
192  }
193  } // end of while
194 
195  if (new_client == nullptr) {
196  throw std::string("Failure launching lambda workers; see log for details. ");
197  }
198 
199  // Step 3. Create proxy object
200  std::unique_ptr<ProxyType> new_proxy(new ProxyType(*new_client));
201 
202  // Step 4. Return the worker process
203  std::unique_ptr<worker_process<ProxyType>> ret(new worker_process<ProxyType>());
204  ret->proxy.swap(new_proxy);
205  ret->client.swap(new_client);
206  ret->address = worker_address;
207  ret->process_.swap(new_process);
208 
209  // Done!
210  logstream(LOG_INFO) << "Successfully launched lambda worker " << ret->id
211  << std::endl;
212  return ret;
213 } // end spawn worker
214 
215 
216 /**
217  * Exception free wrapper of spawn_worker().
218  * Return nullptr on exception, and log the error.
219  */
220 template<typename ProxyType>
221 std::unique_ptr<worker_process<ProxyType>> try_spawn_worker(std::vector<std::string> worker_binary_args,
222  std::string worker_address,
223  int connection_timeout) noexcept {
224  try {
225  return spawn_worker<ProxyType>(worker_binary_args, worker_address, connection_timeout);
226  } catch(std::string e) {
227  logstream(LOG_ERROR) << e << std::endl;
228  } catch(const char* e) {
229  logstream(LOG_ERROR) << e << std::endl;
230  } catch (...) {
231  std::exception_ptr eptr = std::current_exception();
232  logstream(LOG_ERROR) << "Fail spawning worker. Unknown error." << std::endl;
233  try {
234  if(eptr) {
235  std::rethrow_exception(eptr);
236  }
237  } catch (const std::exception& e) {
238  logstream(LOG_ERROR) << "Caught exception \"" << e.what() << "\"\n";
239  }
240  }
241  return nullptr;
242 }
243 
244 // Forward declaration
245 template<typename ProxyType>
247 
248 
249 
250 /**
251  * Manage a list of worker_processes.
252  *
253  * - Initialize:
254  * The pool is initialized with a fixed number of workers. Due to system resource
255  * limitation, the actual pool may contain less workers than intended.
256  *
257  * - Acquire worker:
258  * User request a worker process by calling get_worker(),
259  * which returns a unique_ptr and transfers the ownership of the worker process.
260  *
261  * - Release worker:
262  * After the use of the worker, The requested worker_process must be returned
263  * to the worker_pool by calling release_worker().
264  * Alternatively, use the RAII pattern by calling get_worker_guard() to
265  * guarantee return of the resource.
266  *
267  * get_worker()/release_worker() are thread safe.
268  *
269  * - Worker crash recovery:
270  * On release_worker(), if the worker_process is dead, a new worker_process
271  * will be started and released back to the pool. In the worst case where
272  * no new process can be started, the pool size will be decreased.
273  *
274  * - Call all workers:
275  * You can issue a function call to all workers using the call_all_workers()
276  * function. It will block until all workers become avaialble, call the function
277  * on all workers, and block until all the call finishes or exception is thrown.
278  * During the call if the worker_process is dead, replace the dead worker with
279  * new process.
280  *
281  * - Hang
282  * It is extremely important to guarantee the release of a worker after use,
283  * or the pool will hang on termination or barrier function such as
284  * call_all_workers().
285  */
286 template<typename ProxyType>
287 class worker_pool {
288 public:
289  /**
290  * Return the next available worker.
291  * Block until any worker is available.
292  * Throws error if worker_pool has zero workers.
293  *
294  * \note: This function must be used in pair with release_worker(), or
295  * get the RAII protector using get_worker_guard() when release_worker()
296  * cannot be called reliably. Otherwise, the worker_pool may hang
297  * waiting for worker to return.
298  */
299  std::unique_ptr<worker_process<ProxyType>> get_worker() {
300  std::unique_lock<turi::mutex> lck(m_mutex);
301  wait_for_one(lck);
302  auto worker = std::move(m_available_workers.front());
303  m_available_workers.pop_front();
304  return worker;
305  }
306 
307  /**
308  * Returns a worker_guard for the given worker.
309  * When the worker_guard goes out of the scope, the guarded
310  * worker will be automatically released.
311  */
312  std::shared_ptr<worker_guard<ProxyType>> get_worker_guard(std::unique_ptr<worker_process<ProxyType>>& worker) {
313  return std::make_shared<worker_guard<ProxyType>>(this, worker);
314  }
315 
316  /**
317  * Put the worker back to the availablity queue.
318  * If the worker process is dead, try replace with a new worker process.
319  * If a new worker process cannot be started, decrease the pool size.
320  */
321  void release_worker(std::unique_ptr<worker_process<ProxyType>>& worker) {
322  logstream(LOG_DEBUG) << "Release worker " << worker->id << std::endl;
323  std::unique_lock<turi::mutex> lck(m_mutex);
324  if (check_alive(worker) == true) {
325  // put the worker back to queue
326  m_available_workers.push_back(std::move(worker));
327  } else {
328  logstream(LOG_WARNING) << "Replacing dead worker " << worker->id << std::endl;
329  // clear dead worker
330  worker.reset(nullptr);
331  // start new worker
332  auto new_worker = try_spawn_worker<ProxyType>(m_worker_binary_and_args, new_worker_address(), m_connection_timeout);
333  if (new_worker != nullptr) {
334  // put new worker back to queue
335  m_available_workers.push_back(std::move(new_worker));
336  } else {
337  --m_num_workers;
338  logstream(LOG_WARNING) << "Decrease number of workers to "
339  << m_num_workers << std::endl;
340  }
341  }
342  lck.unlock();
343  cv.notify_one();
344  }
345 
346  /**
347  * Return number of total workers in the pool.
348  */
349  size_t num_workers() const { return m_num_workers; };
350 
351  /**
352  * Return number of avaiable workers in the pool.
353  */
355  std::unique_lock<turi::mutex> lck(m_mutex);
356  return m_available_workers.size();
357  }
358 
359  /**
360  * Call the function on all worker in parallel and return the results.
361  * Block until all workers are available.
362  */
363  template<typename RetType, typename Fn>
364  std::vector<RetType> call_all_workers(Fn f) {
365  std::unique_lock<turi::mutex> lck(m_mutex);
366  wait_for_all(lck);
367  // take out all workers from m_avaiable_workers
368  // equivalent to call get_worker() in batch with lck acquired.
369  std::vector<std::unique_ptr<worker_process<ProxyType>>> temp_workers;
370  for (size_t i = 0; i < m_num_workers; ++i) {
371  temp_workers.push_back(std::move(m_available_workers.front()));
372  m_available_workers.pop_front();
373  }
374 
375  // The following code calls release_worker() for crash recovery,
376  // and release_worker() calls lock inside.
377  // Because no workers is avaialable externally, we can safely release the lock.
378  lck.unlock();
379 
380  try {
381  std::vector<std::shared_ptr<worker_guard<ProxyType>>> guards;
382  for (auto& worker: temp_workers)
383  guards.push_back(get_worker_guard(worker));
384  std::vector<RetType> ret(m_num_workers);
385  parallel_for(0, m_num_workers, [&](size_t i) {
386  try {
387  ret[i] = f(temp_workers[i]->proxy);
388  } catch (cppipc::ipcexception e) {
389  throw reinterpret_comm_failure(e);
390  }
391  });
392  return ret;
393  } catch (...) {
394  ASSERT_EQ(m_available_workers.size(), m_num_workers);
395  std::rethrow_exception(std::current_exception());
396  }
397  }
398 
399  /// constructor
400  worker_pool(size_t num_workers,
401  std::vector<std::string> worker_binary_and_args,
402  int connection_timeout = 3) {
403  m_connection_timeout = connection_timeout;
404  m_worker_binary_and_args = worker_binary_and_args;
405  m_num_workers = 0;
406  init(num_workers);
407  }
408 
409  /// destructor
411  std::unique_lock<turi::mutex> lck(m_mutex);
412  try {
413  wait_for_all(lck);
414  } catch (...) { }
415  parallel_for(0, m_available_workers.size(), [&](size_t i) {
416  m_available_workers[i].reset();
417  });
418  }
419 
420 private:
421  /**
422  * Wait until all workers are returned. Throw error if pool size become 0.
423  */
424  void wait_for_all(std::unique_lock<turi::mutex>& lck) {
425  while (m_available_workers.size() < m_num_workers || m_num_workers == 0) {
426  cv.wait(lck);
427  }
428  if (m_num_workers == 0) {
429  throw("Worker pool is empty");
430  }
431  }
432 
433  /**
434  * Wait until a single worker is aviailable. Throw error if pool size become 0.
435  */
436  void wait_for_one(std::unique_lock<turi::mutex>& lck) {
437  while (m_available_workers.empty() || m_num_workers == 0) {
438  cv.wait(lck);
439  }
440  if (m_num_workers == 0) {
441  throw("Worker pool is empty");
442  }
443  }
444 
445  /**
446  * Return true if the worker process is alive.
447  */
448  bool check_alive(std::unique_ptr<worker_process<ProxyType>>& worker) {
449  return (worker->process_ != nullptr) && worker->process_->exists();
450  }
451 
452  /**
453  * Return a unique ipc socket file address.
454  */
455  std::string new_worker_address() {
456  return "ipc://" + get_temp_name();
457  }
458 
459  /**
460  * Initialize the pool with N workers.
461  */
462  void init(size_t num_workers) {
463  parallel_for(0, num_workers, [&](size_t i) {
464  auto new_worker = try_spawn_worker<ProxyType>(m_worker_binary_and_args,
465  new_worker_address(),
466  m_connection_timeout);
467  if (new_worker != nullptr) {
468  std::unique_lock<turi::mutex> lck(m_mutex);
469  m_available_workers.push_back(std::move(new_worker));
470  ++m_num_workers;
471  }
472  });
473 
474  if (m_num_workers == 0) {
475  log_and_throw("Cannot evaluate lambda. No Lambda workers have been successfully started.");
476  } else if (m_num_workers < num_workers) {
477  logprogress_stream << "Less than " << num_workers << " successfully started. "
478  << "Using only " << m_num_workers << " workers." << std::endl;
479  logprogress_stream << "All operations will proceed as normal, but "
480  << "lambda operations will not be able to use all "
481  << "available cores." << std::endl;
482  logstream(LOG_ERROR) << "Less than " << num_workers << " successfully started."
483  << "Using only " << m_num_workers << std::endl;
484  }
485  }
486 
487 private:
488  std::vector<std::string> m_worker_binary_and_args;
489  int m_connection_timeout;
490  std::deque<std::unique_ptr<worker_process<ProxyType>>> m_available_workers;
491  size_t m_num_workers;
493  turi::mutex m_mutex;
494 }; // end of worker_pool
495 
496 
497 /**
498  * RAII for allocating worker proxy.
499  * When the worker_guard is destoryed, the guarded worker
500  * be released by the worker_pool.
501  */
502 template<typename ProxyType>
503 class worker_guard {
504  public:
505  worker_guard(worker_pool<ProxyType>* worker_pool_,
506  std::unique_ptr<worker_process<ProxyType>>& worker_process_)
507  : m_pool(worker_pool_), m_worker(worker_process_) { }
508 
509  ~worker_guard() {
510  m_pool->release_worker(m_worker);
511  }
512 
513  private:
514  worker_pool<ProxyType>* m_pool;
515  std::unique_ptr<worker_process<ProxyType>>& m_worker;
516 };
517 
518 } // end of lambda
519 } // end of turicreate
520 #endif
void start()
Reset the timer.
Definition: timer.hpp:75
#define logstream(lvl)
Definition: logger.hpp:276
void parallel_for(size_t begin, size_t end, const FunctionType &fn)
Definition: lambda_omp.hpp:93
worker_pool(size_t num_workers, std::vector< std::string > worker_binary_and_args, int connection_timeout=3)
constructor
size_t num_workers() const
#define LOG_INFO
Definition: logger.hpp:101
std::string get_temp_name(const std::string &prefix="", bool _prefer_hdfs=false)
std::shared_ptr< worker_guard< ProxyType > > get_worker_guard(std::unique_ptr< worker_process< ProxyType >> &worker)
#define LOG_DEBUG
Definition: logger.hpp:102
Call was successful.
#define LOG_WARNING
Definition: logger.hpp:98
std::vector< RetType > call_all_workers(Fn f)
double current_time() const
Returns the elapsed time in seconds since turi::timer::start was last called.
Definition: timer.hpp:83
#define logprogress_stream
Definition: logger.hpp:325
void release_worker(std::unique_ptr< worker_process< ProxyType >> &worker)
#define LOG_ERROR
Definition: logger.hpp:97
std::string reply_status_to_string(reply_status)
reply_status start()
std::unique_ptr< worker_process< ProxyType > > get_worker()
A simple class that can be used for benchmarking/timing up to microsecond resolution.
Definition: timer.hpp:59