Turi Create  4.0
lambda_master.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_LAMBDA_LAMBDA_MASTER_HPP
7 #define TURI_LAMBDA_LAMBDA_MASTER_HPP
8 
9 #include <map>
10 #include <core/globals/globals.hpp>
11 #include <core/system/lambda/lambda_interface.hpp>
12 #include <core/system/lambda/worker_pool.hpp>
13 
14 namespace turi {
15 
16 namespace shmipc {
17  class client;
18 }
19 
20 namespace lambda {
21 
22  /**
23  * \defgroup lambda
24  */
25 
26  /**
27  * \ingroup lambda
28  *
29  * The lambda_master provides functions to evaluate a lambda
30  * function on different input types (single value, list, dictionary)
31  * in parallel.
32  *
33  * \ref set_lambda_worker_binary or must be called first to inform
34  * the location of the lambda worker binaries.
35  *
36  * Internally, it manages a worker pool of lambda_workers.
37  *
38  * Each evaluation call is allocated to a worker, and block until the evaluation
39  * returns or throws an exception.
40  *
41  * The evaluation functions can be called in parallel. When this happens,
42  * the master evenly allocates the jobs to workers who has the shortest job queue.
43  *
44  * \code
45  *
46  * std::vector<flexible_type> args{0,1,2,3,4};
47  *
48  * // creates a master with 10 workers;
49  * lambda_master master(10);
50  *
51  * // Evaluate a single argument.
52  * // plus_one_lambda is equivalent to lambda x: x + 1
53  * auto lambda_hash = master.make_lambda(plus_one_lambda);
54  *
55  * std::vector<flexible_type> out;
56  * master.bulk_eval(lambda_hash, {0}, out);
57  * ASSERT_EQ(out[0], 1);
58  * master.bulk_eval(lambda_hash, {1}, out);
59  * ASSERT_EQ(out[0], 2);
60  *
61  *
62  * // Evaluate in parallel, still using plus_one_lambda.
63  * std::vector< std::vector<flexible_type> > out_vec;
64  * parallel_for(0, args.size(), [&](size_t i) {
65  * master.bulk_eval(lambda_hash, {args[i]}, out_vec[i]);
66  * });
67  *
68  * for (auto val : args) {
69  * ASSERT_EQ(out_vec[i][0], (val + 1));
70  * }
71  * master.release_lambda(plus_one_lambda);
72  *
73  * \endcode
74  */
75  class lambda_master {
76  public:
77 
78  static lambda_master& get_instance();
79 
80  static void shutdown_instance();
81 
82  /**
83  * Constructor. Do not use directly. Instead, use get_instance()
84  */
85  lambda_master(size_t nworkers);
86 
87  /**
88  * Register the lambda_str for all workers, and returns the id for the lambda.
89  * Throws the exception
90  */
91  size_t make_lambda(const std::string& lambda_str);
92 
93  /**
94  * Unregister the lambda_str.
95  */
96  void release_lambda(size_t lambda_hash) noexcept;
97 
98  /**
99  * Evaluate lambda on batch of inputs.
100  */
101  void bulk_eval(size_t lambda_hash, const std::vector<flexible_type>& args,
102  std::vector<flexible_type>& out,
103  bool skip_undefined, uint64_t seed);
104 
105  /**
106  * \overload
107  */
108  void bulk_eval(size_t lambda_hash,
109  const sframe_rows& args,
110  std::vector<flexible_type>& out,
111  bool skip_undefined, uint64_t seed);
112 
113  /**
114  * \overload
115  * Lambda takes dictionary argument.
116  */
117  void bulk_eval(size_t lambda_hash,
118  const std::vector<std::string>& keys,
119  const std::vector<std::vector<flexible_type>>& args,
120  std::vector<flexible_type>& out,
121  bool skip_undefined, uint64_t seed);
122 
123  /**
124  * \overload
125  */
126  void bulk_eval(size_t lambda_hash,
127  const std::vector<std::string>& keys,
128  const sframe_rows& args,
129  std::vector<flexible_type>& out,
130  bool skip_undefined, uint64_t seed);
131 
132  inline size_t num_workers() { return m_worker_pool->num_workers(); }
133 
134  static void set_lambda_worker_binary(const std::vector<std::string>& path) {
135  lambda_worker_binary_and_args = path;
136  std::ostringstream ss;
137 
138  for(size_t i = 0; i < path.size(); ++i) {
139  if(i != 0) ss << ' ';
140  ss << path[i];
141  }
142 
143  logstream(LOG_INFO) << "Pylambda worker binary: " << ss.str() << std::endl;
144  };
145 
146  static void set_lambda_worker_binary(const std::string& path) {
147  lambda_worker_binary_and_args = {path};
148  logstream(LOG_INFO) << "Pylambda worker binary: " << path << std::endl;
149  };
150 
151  static const std::vector<std::string>& get_lambda_worker_binary() {
152  return lambda_worker_binary_and_args;
153  };
154 
155  private:
156 
157  lambda_master(lambda_master const&) = delete;
158 
159  lambda_master& operator=(lambda_master const&) = delete;
160 
161  private:
162  std::shared_ptr<worker_pool<lambda_evaluator_proxy>> m_worker_pool;
163  std::map<void*, std::shared_ptr<shmipc::client>> m_shared_memory_worker_connections;
164 
165  std::unordered_map<size_t, size_t> m_lambda_object_counter;
166  turi::mutex m_mtx;
167 
168  /** The binary for executing the lambda_workers.
169  */
170  static std::vector<std::string> lambda_worker_binary_and_args;
171 
172  };
173 
174 
175 /**
176  * Set the path to the pylambda_worker binary from environment variables:
177  * "__GL_PYTHON_EXECUTABLE__" points to the python executable
178  * "__GL_PYLAMBDA_SCRIPT__" points to the lambda worker driver script
179  *
180  * The binaries are used for evaluate python lambdas parallel in separate processes.
181  */
182 void set_pylambda_worker_binary_from_environment_variables();
183 
184 } // end lambda
185 } // end turicreate
186 
187 #endif
#define logstream(lvl)
Definition: logger.hpp:276
std::set< Key > keys(const std::map< Key, T > &map)
Definition: stl_util.hpp:358
#define LOG_INFO
Definition: logger.hpp:101