Turi Create  4.0
pylambda.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_LAMBDA_PYLAMBDA_EVALUATOR_HPP
7 #define TURI_LAMBDA_PYLAMBDA_EVALUATOR_HPP
8 #include <core/system/lambda/lambda_interface.hpp>
9 #include <core/data/flexible_type/flexible_type.hpp>
10 #include <core/system/lambda/python_callbacks.hpp>
11 #include <core/parallel/pthread_tools.hpp>
12 #include <string>
13 
14 namespace turi {
15 
16 namespace shmipc {
17 class server;
18 }
19 
20 class sframe_rows;
21 
22 namespace lambda {
23 
24 /** The data used in the common call type.
25  */
27  flex_type_enum output_enum_type = flex_type_enum::UNDEFINED;
28  bool skip_undefined = false;
29 
30  // It's the responsibility of the calling class to make sure these
31  // are valid. input_values and output_values must point to storage
32  // of at least n_inputs values.
33  const flexible_type* input_values = nullptr;
34  flexible_type* output_values = nullptr;
35  size_t n_inputs = 0;
36 };
37 
38 /** The data used in the call by dict call type.
39  */
41  flex_type_enum output_enum_type = flex_type_enum::UNDEFINED;
42  bool skip_undefined = false;
43 
44  // It's the responsibility of the calling class to make sure these
45  // are valid. output_values must point to storage
46  // of at least input_rows->size() values.
47  const std::vector<std::string>* input_keys = nullptr;
48  const std::vector<std::vector<flexible_type> >* input_rows = nullptr;
49  flexible_type* output_values = nullptr;
50 };
51 
52 /** The data used in the call by sframe rows call type.
53  */
55  flex_type_enum output_enum_type = flex_type_enum::UNDEFINED;
56  bool skip_undefined = false;
57 
58  const std::vector<std::string>* input_keys = nullptr;
59  const sframe_rows* input_rows = nullptr;
60  flexible_type* output_values = nullptr;
61 };
62 
63 /** The data used in applying a graph triple apply.
64  */
66 
67  const std::vector<std::vector<flexible_type> >* all_edge_data;
68  std::vector<std::vector<flexible_type> >* out_edge_data;
69 
70  std::vector<std::vector<flexible_type> >* source_partition;
71  std::vector<std::vector<flexible_type> >* target_partition;
72 
73 
74  const std::vector<std::string>* vertex_keys;
75  const std::vector<std::string>* edge_keys;
76  const std::vector<std::string>* mutated_edge_keys;
77  size_t srcid_column, dstid_column;
78 };
79 
80 
81 struct pylambda_evaluation_functions {
82  void (*set_random_seed)(size_t seed);
83  size_t (*init_lambda)(const std::string&);
84  void (*release_lambda)(size_t);
85  void (*eval_lambda)(size_t, lambda_call_data*);
86  void (*eval_lambda_by_dict)(size_t, lambda_call_by_dict_data*);
87  void (*eval_lambda_by_sframe_rows)(size_t, lambda_call_by_sframe_rows_data*);
88  void (*eval_graph_triple_apply)(size_t, lambda_graph_triple_apply_data*);
89 };
90 
91 /** This is called through the cython functions to set up the
92  * evaluation function interface.
93  */
94 void set_pylambda_evaluation_functions(pylambda_evaluation_functions* eval_function_struct);
95 
96 extern pylambda_evaluation_functions evaluation_functions;
97 
98 /**
99  * Creates a lambda from a pickled lambda string.
100  *
101  * Throws an exception if the construction failed.
102  */
103 size_t make_lambda(const std::string& pylambda_str);
104 
105 /**
106  * Release the cached lambda object
107  */
108 void release_lambda(size_t lambda_hash);
109 
110 
111 /**
112  * \ingroup lambda
113  *
114  * A functor class wrapping a pickled python lambda string.
115  *
116  * The lambda type is assumed to be either: S -> T or or List -> T.
117  * where all types should be compatible with flexible_type.
118  *
119  * \note: currently only support basic flexible_types: flex_string, flex_int, flex_float
120  *
121  * \internal
122  * All public member functions including the constructors are guarded by the
123  * global mutex, preventing simultanious access to the python's GIL.
124  *
125  * Internally, the class stores a a python lambda object which is created from the
126  * pickled lambda string upon construction. The lambda object is equivalent
127  * to a python lambda object (with proper reference counting), and therefore, the class is copiable.
128  */
129 class pylambda_evaluator : public lambda_evaluator_interface {
130 
131  public:
132  /**
133  * Construct an empty evaluator.
134  */
135  inline pylambda_evaluator(turi::shmipc::server* shared_memory_server = nullptr) {
136  m_shared_memory_server = shared_memory_server;
137  };
138 
140 
141 
142  /**
143  * Initializes shared memory communication via SHMIPC.
144  * Returns the shared memory address to connect to.
145  */
146  std::string initialize_shared_memory_comm();
147 
148  private:
149 
150  // Set the lambda object for the next evaluation.
151  void set_lambda(size_t lambda_hash);
152 
153  /**
154  * Creates a lambda from a pickled lambda string.
155  *
156  * Throws an exception if the construction failed.
157  */
158  size_t make_lambda(const std::string& pylambda_str);
159 
160  /**
161  * Release the cached lambda object
162  */
163  void release_lambda(size_t lambda_hash);
164 
165 
166  /**
167  * Apply as a function: flexible_type -> flexible_type,
168  *
169  * \note: this function does not perform type check and exception could be thrown
170  * when applying of the function. As a subroutine, this function does not
171  * try to acquire GIL and assumes it's already been acquired.
172  */
173  flexible_type eval(size_t lambda_hash, const flexible_type& arg);
174 
175  /**
176  * Evaluate the lambda function on each argument separately in the args list.
177  */
178  std::vector<flexible_type> bulk_eval(size_t lambda_hash, const std::vector<flexible_type>& args,
179  bool skip_undefined, uint64_t seed);
180 
181  /**
182  * \overload
183  *
184  * We have to use different function name because
185  * the cppipc interface doesn't support true overload
186  */
187  std::vector<flexible_type> bulk_eval_rows(size_t lambda_hash,
188  const sframe_rows& values, bool skip_undefined, uint64_t seed);
189 
190  /**
191  * Evaluate the lambda function on each element separately in the values.
192  * The value element is combined with the keys to form a dictionary argument.
193  */
194  std::vector<flexible_type> bulk_eval_dict(size_t lambda_hash,
195  const std::vector<std::string>& keys,
196  const std::vector<std::vector<flexible_type>>& values,
197  bool skip_undefined, uint64_t seed);
198 
199  /**
200  * We have to use different function name because
201  * the cppipc interface doesn't support true overload
202  */
203  std::vector<flexible_type> bulk_eval_dict_rows(size_t lambda_hash,
204  const std::vector<std::string>& keys,
205  const sframe_rows& values,
206  bool skip_undefined, uint64_t seed);
207 
208 
209  /**
210  * Redirects to either bulk_eval_rows or bulk_eval_dict_rows.
211  * First byte in the string is a bulk_eval_serialized_tag byte to denote
212  * whether this call is going to bulk_eval_rows or bulk_eval_dict_rows.
213  *
214  * Deserializes the remaining parameters from the string
215  * and calls the function accordingly.
216  */
217  std::vector<flexible_type> bulk_eval_rows_serialized(const char* ptr, size_t len);
218 
219  turi::shmipc::server* m_shared_memory_server;
220  turi::thread m_shared_memory_listener;
221  volatile bool m_shared_memory_thread_terminating = false;
222 };
223 } // end of lambda namespace
224 } // end of turi namespace
225 
226 
227 
228 
229 #endif
std::set< Key > keys(const std::map< Key, T > &map)
Definition: stl_util.hpp:358
std::set< T > values(const std::map< Key, T > &map)
Definition: stl_util.hpp:386
pylambda_evaluator(turi::shmipc::server *shared_memory_server=nullptr)
Definition: pylambda.hpp:135