Turi Create  4.0
lazy_eval_operation_dag.hpp
1 /* Copyright © 2017 Apple Inc. All rights reserved.
2  *
3  * Use of this source code is governed by a BSD-3-clause license that can
4  * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
5  */
6 #ifndef TURI_UNITY_GRAPH_OPERATION_DAG_HPP
7 #define TURI_UNITY_GRAPH_OPERATION_DAG_HPP
8 #include <functional>
9 #include <queue>
10 #include <vector>
11 #include <memory>
12 #include <map>
13 #include <unordered_map>
14 #include <core/util/mutable_queue.hpp>
15 #include <core/storage/lazy_eval/lazy_eval_operation.hpp>
16 #include <core/logging/logger.hpp>
17 #include <core/logging/assertions.hpp>
18 namespace turi {
19 
20 template <typename T>
22 
23 /**
24  * \defgroup lazy_eval Lazy Evaluation DAG
25  */
26 
27 /**
28  * \ingroup lazy_eval
29  * The lazy_eval_future<T> is the key object managed by the
30  * lazy_eval_operation_dag. All operations issued to the dag returns a
31  * future object. Evaluating the future (using operator()) will evaluate
32  * the DAG allowing the object to be accessed efficiently.
33  */
34 template <typename T>
36  public:
37  typedef T value_type;
40 
41  /// deleted default constructor
42  lazy_eval_future() = delete;
43  /// default copy constructor
44  lazy_eval_future(const lazy_eval_future& other) = default;
45  /// default assignment operator
46  lazy_eval_future& operator=(const lazy_eval_future& other) = default;
47 
48  /**
49  * Clears the contents of the future. All shared pointers obtained from
50  * the object are still valid.
51  */
52  void reset() {
53  log_func_entry();
54  object.reset();
55  }
56 
57  /**
58  * Evaluates the dependencies of this object, caches the result, and returns a
59  * reference to the computed value. If the value has already been cached,
60  * no further computation is performed.
61  */
62  value_type& operator()() {
63  if (!object) make_eager();
64  return (*object);
65  }
66 
67 
68  /**
69  * Evaluates the dependencies of this object, caches the result, and returns a
70  * a shared pointer to the computed value. If the value has already been
71  * cached, no further computation is performed.
72  */
73  std::shared_ptr<value_type> get_ptr() {
74  if (!object) make_eager();
75  return object;
76  }
77 
78  /// Destructor
80  log_func_entry();
81  owner->mark_for_deletion(vertex_idx);
82  }
83 
84  /// Gets the vertex ID in the lazy eval DAG datastructure
85  size_t get_vertex_id() const {
86  return vertex_idx;
87  }
88 
89  /**
90  * Returns true if the object has been computed.
91  */
92  bool is_available() {
93  return object;
94  }
95 
96  private:
97  dag_type* owner;
98  size_t vertex_idx;
99  std::shared_ptr<value_type> object;
100 
101  /// private constructor
102  lazy_eval_future(dag_type* owner, size_t vertex_idx):
103  owner(owner), vertex_idx(vertex_idx) { log_func_entry(); }
104 
105  /// Forces the object to be fully instantiated
106  void make_eager() {
107  log_func_entry();
108  object = owner->make_eager(vertex_idx);
109  }
110 
111  friend class lazy_eval_operation_dag<value_type>;
112 };
113 
114 /**
115  * \ingroup lazy_eval
116  * The Lazy Evaluation Operation DAG is a directed acyclic graph
117  * connecting immutable objects of type T with operations, and also provide
118  * lazy evaluation primitives to provide object computation at any point
119  * in the tree.
120  *
121  * Using the lazy_eval_operation_dag system simply requires the user to
122  * implement a collection of operations, each inheriting from
123  * lazy_eval_operation_base<T>. For instance, we can define the following
124  * multiply, increment, and set_val lazy operations on integers.
125  *
126  * \code
127  * struct multiplier: lazy_eval_operation_base<int> {
128  * virtual size_t num_arguments() { return 2; }
129  * virtual void execute(int& output,
130  * const std::vector<int*>& parents) {
131  * output *= *(parents[0]);
132  * }
133  * };
134  *
135  * struct increment: lazy_eval_operation_base<int> {
136  * virtual size_t num_arguments() { return 1; }
137  * virtual void execute(int& output,
138  * const std::vector<int*>& parents) {
139  * output++;
140  * }
141  * };
142  *
143  * struct set_val: lazy_eval_operation_base<int> {
144  * set_val(int i): val(i) { }
145  * size_t val;
146  * virtual size_t num_arguments() { return 0; }
147  * virtual void execute(int& output,
148  * const std::vector<int*>& parents) {
149  * output = val;
150  * }
151  * };
152  * \endcode
153  *
154  * To create a sequence of lazy operations simply involves the use of the
155  * add_operation() function. Each call to add_operation is lazy, and
156  * returns a future object.
157  * \code
158  * lazy_eval_operation_dag<int> dag;
159  * lazy_eval_future<int>* five = dag.add_operation(new set_val(5), {});
160  * lazy_eval_future<int>* two = dag.add_operation(new set_val(2), {});
161  * lazy_eval_future<int>* seven = dag.add_operation(new adder, {five, two});
162  * lazy_eval_future<int>* nine = dag.add_operation(new adder, {seven, two});
163  * \endcode
164  *
165  * We can then evaluate values with the operator() to compute the DAG.
166  * \code
167  * int val = (*nine)();
168  * \endcode
169  */
170 template <typename T>
172  public:
173  typedef T value_type;
175  typedef lazy_eval_future<value_type> future_type;
176 
177  lazy_eval_operation_dag(std::function<value_type*()>
178  allocator = [](){return new value_type;},
179  std::function<void(value_type& dest, value_type& src)>
180  copier = [](value_type& dest, value_type& src){dest = src;})
181  :next_vid(0), allocator(allocator), copier(copier) { log_func_entry(); }
182 
183 
184  /**
185  * Adds a fixed value to the DAG.
186  * The returned future pointer will always be available efficiently,
187  * Deleting the returned future pointer will mark the corresponding entry
188  * in the DAG for deletion.
189  */
190  future_type* add_value(value_type* value) {
191  return add_value(std::shared_ptr<value_type>(value));
192  }
193 
194  future_type* add_value(std::shared_ptr<value_type> value) {
195  vertex* vtx = new vertex(next_vid);
196  vtx->operation = NULL;
197  vtx->object_cache = value;
198  vtx->object = value;
199  vertices[next_vid] = vtx;
200  future_type* ret = new future_type(this, next_vid);
201  ++next_vid;
202  // create and return a future
203  return ret;
204  }
205 
206  /**
207  * Creates a new entry in the dependency tree by creating a future which
208  * corresponds to calling the provided operation on the parents.
209  *
210  * Deleting the returned future pointer will mark the corresponding entry
211  * in the DAG for deletion.
212  */
213  future_type* add_operation(operation_type* operation,
214  const std::vector<future_type*>& parents) {
215  ASSERT_TRUE(operation != NULL);
216  ASSERT_EQ(operation->num_arguments(), parents.size());
217  // create a new vertex
218  vertex* vtx = new vertex(next_vid);
219  // no object yet. do not create one
220  vtx->operation = operation;
221  vtx->parents.resize(parents.size());
222  for (size_t i = 0; i < parents.size(); ++i) {
223  vtx->parents[i] = parents[i]->get_vertex_id();
224  vertices[vtx->parents[i]]->children.push_back(next_vid);
225  }
226  vertices[next_vid] = vtx;
227  future_type* ret = new future_type(this, next_vid);
228  ++next_vid;
229  // create and return a future
230  return ret;
231  }
232 
233  /**
234  * Computes and caches a particular entry in the graph
235  */
236  std::shared_ptr<value_type> make_eager(size_t vertex_id) {
237  log_func_entry();
238  // check that we do have the vertex ID in question
239  ASSERT_EQ(vertices.count(vertex_id), 1);
240  // this is now the main evaluation logic
241  // first check if we have a pointer to it
242  vertex* vtx = vertices.at(vertex_id);
243  // ok, the object is around! lets return that.
244  if (!vtx->object.expired()) return vtx->object.lock();
245 
246  // first we need to backtrack the tree and find all ancestors which I depend
247  // on in two or more paths and force construct them.
248  // Lets keep those cached too.
249  auto ancestors = list_ancestors(vertex_id);
250  for(auto ancestor : ancestors) {
251  if (ancestor.second.size() > 1) {
252  // we only need to keep a shared to the objects to keep them alive.
253  // The weak pointers in the vertex object will do the rest of the
254  // work of tracking the existance of the objects.
255  auto ancestor_object = make_eager(ancestor.first);
256  vertices[ancestor.first]->object_cache = ancestor_object;
257  vertices[ancestor.first]->object = ancestor_object;
258  }
259  }
260  // ok. now we can do a recursive preorder traversal of the ancestor graph
261  // to perform the evaluation
262  auto ret = preorder_compute(vertex_id);
263  vtx->object = ret;
264  return ret;
265  }
266 
267  /**
268  * Marks the vertex for deletion. Deletion will only occur on a call to
269  * cleanup()
270  */
271  void mark_for_deletion(size_t vertex_id) {
272  log_func_entry();
273  if (vertices.count(vertex_id) == 0) return;
274  vertex* vtx = vertices.at(vertex_id);
275  vtx->to_delete = true;
276  // do some safe cleanup now
277  cleanup(true);
278  }
279 
280 
281  /**
282  * If the vertex value is cached, this will force it to be uncached.
283  */
284  void uncache(size_t vertex_id) {
285  log_func_entry();
286  if (vertices.count(vertex_id) == 0) return;
287  vertex* vtx = vertices.at(vertex_id);
288  // if operation is NULL, this vertex was created using add_value
289  vtx->uncache();
290  }
291 
292  /**
293  * Attempts to delete all vertices that were marked for deletion
294  * (see mark_for_deletion()). Note that not all marked vertices may be deleted
295  * as some vertices (for instance, in the middle of a chain of operations)
296  * cannot be deleted safely. This may result in the making eager of
297  * certain vertices to ensure that referenced vertices can always be
298  * constructed.
299  *
300  * \param avoid_instantiation Cancel the deletion if it involves instantiating
301  * an as yet, uninstantiated vertex.
302  */
303  void cleanup(bool avoid_instantiation = false) {
304  log_func_entry();
305  // delete vertices from bottom up
306  // sort the range of keys
307  std::vector<size_t> ids_to_delete;
308  for(auto vertex: vertices) {
309  if (vertex.second->to_delete) ids_to_delete.push_back(vertex.first);
310  }
311  // reverse sort
312  std::sort(ids_to_delete.rbegin(), ids_to_delete.rend());
313  for(size_t id: ids_to_delete) {
314  if (vertices.at(id)->to_delete) {
315  delete_vertex(id, avoid_instantiation);
316  }
317  }
318  }
319 
320  void print(std::ostream& out) const {
321  out << "digraph G {\n";
322  for(auto vertex: vertices) {
323  out << "\t\"" << vertex.second->vertex_id << "\" ";
324  // output the label
325  out << "[label=\"" << vertex.second->vertex_id << ":";
326  if (vertex.second->operation) {
327  out<< vertex.second->operation->name();
328  } else {
329  out << "NULL";
330  }
331  if (!vertex.second->object.expired()) {
332  value_type* v = vertex.second->object.lock().get();
333  out << "\\nptr=" << v;
334  }
335  out << "\"";
336 
337  // print allocated objects in bold border
338  // print deleted objects in red
339  if (!vertex.second->object.expired()) {
340  out << ",style=bold";
341  }
342  if (vertex.second->to_delete) {
343  out << ",color=red";
344  }
345  out << "]\n";
346 
347  for (size_t children : vertex.second->children) {
348  out << "\t\"" << vertex.second->vertex_id << "\" -> "
349  << "\"" << children << "\"\n";
350  }
351  }
352  out << "}\n";
353  }
354 
355 
356  /// destructor
358  log_func_entry();
359  for(auto vtx: vertices) {
360  delete vtx.second;
361  }
362  }
363  private:
364  /// ID to assign to next vertex in the DAG
365  size_t next_vid;
366 
367  /// A vertex in the DAG
368  struct vertex {
369  explicit vertex(size_t vertex_id)
370  : operation(NULL), to_delete(false), vertex_id(vertex_id) { }
371  ~vertex() { if (operation) delete operation; }
372  /// The value of the vertex
373  std::weak_ptr<value_type> object;
374  /**
375  * This is used to the store the value of the vertex when it is truly
376  * necessary to do so. i.e. sometimes when vertices are deleted, their
377  * children must be evaluated to keep the tree evaluatable.
378  */
379  std::shared_ptr<value_type> object_cache;
380  /** The operation to evaluate on this vertex
381  * If this is NULL, this is a value vertex
382  */
383  operation_type* operation;
384  /// parent vertices
385  std::vector<size_t> parents;
386  /// child vertices
387  std::vector<size_t> children;
388  /// Marked for deletion
389  bool to_delete;
390  /// vertex ID
391  size_t vertex_id;
392 
393  bool is_value_vertex() const {
394  return operation == NULL;
395  }
396  void uncache() {
397  if (!is_value_vertex()) {
398  object_cache.reset();
399  }
400  }
401  };
402 
403  /// A map from vertex ID to the vertex
404  std::unordered_map<size_t, vertex*> vertices;
405  /// Used to allocate values
406  std::function<value_type*()> allocator;
407  /// Used to copy values
408  std::function<void(value_type& dest, value_type& src)> copier;
409 
410  /**
411  * Returns a map of all ancestor vertices that have to be computed for this
412  * vertex to be computed, as well as their children.
413  * - halts at non-expired nodes
414  * - halts at vertices with two or more children which the current vertex
415  * depends on
416  */
417  std::unordered_map<size_t, std::vector<size_t> > list_ancestors(size_t vertex) {
418  std::unordered_map<size_t, std::vector<size_t> > ret;
420  // BFS implementation which does the following
421  // - halts at non-expired nodes
422  // - halts at vertices with two or more children which the current vertex
423  // depends on
424  //
425  // To accomplish the former is simply a bog standard BFS
426  //
427  // To accomplish the latter requires a simple trick: the vertex ordering
428  // is a valid topological sort, and thus by always backtracking in the
429  // reverse vertex ordering, when I evalute a vertex, I am guaranteed to
430  // have all of its children listed
431  vqueue.push(vertex, vertex);
432  while(!vqueue.empty()) {
433  size_t curvtx = vqueue.pop().first;
434  if (ret.count(curvtx) && ret[curvtx].size() >= 2) continue;
435  for (size_t parent: vertices.at(curvtx)->parents) {
436  if (vertices.at(curvtx)->object.expired()) {
437  ret[parent].push_back(curvtx);
438  if (!vqueue.contains(parent)) vqueue.push(parent, parent);
439  }
440  }
441  }
442  return ret;
443  }
444 
445 
446  /**
447  * Computes the value of the vertex, assuming certain preconditions
448  * are satisfied: i.e. All dependent ancestors in the DAG with multiple
449  * children are fully evaluated.
450  */
451  std::shared_ptr<value_type> preorder_compute(size_t vertex_id,
452  bool make_copy = true) {
453  // do a recursive backtrack through the vertices in ancestor_forward_edges
454  vertex* vtx = vertices.at(vertex_id);
455 
456  if (!vtx->object.expired()) {
457  // we hit a fully instantiated object. Return a copy of it.
458  if (make_copy) {
459  // if it is a value _vertex, we must always copy
460  std::shared_ptr<value_type> ret;
461  if (vtx->is_value_vertex()) {
462  ret.reset(allocator());
463  copier(*ret, *(vtx->object.lock()));
464  } else if (vtx->object_cache.unique()) {
465  // it is unique! i.e. we are the only people having a cache to it
466  // there are no other external references.
467  // lets take over it. Some care is needed here, since we mutating
468  // what really is supposed to be immutable.
469  // we need to make sure to reset the value pointer as well.
470  ret = vtx->object_cache;
471  vtx->object_cache.reset();
472  vtx->object.reset();
473  return ret;
474  } else {
475  // regular case. There is a still a reference to it, we need to
476  // copy it.
477  ret.reset(allocator());
478  copier(*ret, *(vtx->object.lock()));
479  }
480  return ret;
481  } else {
482  return vtx->object.lock();
483  }
484  }
485 
486  if (vtx->parents.size() == 0) {
487  // no parents.
488  // Create a new object, pass it through the operation and return it.
489  std::shared_ptr<value_type> ret;
490  ret.reset(allocator());
491  vtx->operation->execute(*ret, std::vector<value_type*>());
492  // we are not making a copy, so it is safe to remember the weak pointer
493  if (!make_copy) vtx->object = ret;
494  return ret;
495  } else {
496  // compute all parents
497  // also extract raw pointers from all parents except the first
498  std::vector<std::shared_ptr<value_type> > parents_shared_ptr(vtx->parents.size());
499  std::vector<value_type*> other_parents_raw_ptr;
500  for (size_t i = 0;i < vtx->parents.size(); ++i) {
501  // compute parent, we need to make a copy of only the left side of the
502  // tree. This can be optimized.
503  parents_shared_ptr[i] = preorder_compute(vtx->parents[i], i == 0);
504  if (i > 0) other_parents_raw_ptr.push_back(parents_shared_ptr[i].get());
505  }
506  // set up the call to the operation
507  std::shared_ptr<value_type> ret = parents_shared_ptr[0];
508  vtx->operation->execute(*ret, other_parents_raw_ptr);
509  // memory cleanup
510  other_parents_raw_ptr.clear();
511  parents_shared_ptr.clear();
512  if (!make_copy) vtx->object = ret;
513  return ret;
514  }
515  }
516 
517 
518 
519 
520  /**
521  * Tries to delete a given vertex. This may result in the making eager of
522  * certain vertices to ensure that referenced vertices can always
523  * be constructed. A vertex is only uncached if it can be deleted.
524  *
525  * \param vertex_id Vertex to delete
526  * \param avoid_instantiation Cancel the deletion if it involves instantiating
527  * an as yet, uninstantiated vertex.
528  */
529  void delete_vertex(size_t vertex_id, bool avoid_instantiation = false) {
530  if (vertices.count(vertex_id) == 0) return;
531  vertex* vtx = vertices.at(vertex_id);
532  if (vtx->children.size() == 0) {
533  // no children! no issue deleting
534  // remove myself from parent's children listing
535  for (size_t parentid: vtx->parents) {
536  vertex* parent= vertices.at(parentid);
537  // delete myself from the parent
538  parent->children.erase(std::find(parent->children.begin(),
539  parent->children.end(),
540  vertex_id));
541  }
542  // now we can clear the current object
543  delete vtx;
544  vertices.erase(vertex_id);
545  } else if (vtx->parents.size() == 0 && vtx->children.size() == 1) {
546  // ok. we can actually delete this now.
547  // But we would preferably like to avoid "splits" where I have to
548  // instantiate many children.
549  // keep going downwards until I find a split, or a not deleted vertex
550  // nd make that eager.
551  size_t deepest_child = vertex_id;
552  vertex* deepest_child_vtx = vtx;
553  while(deepest_child_vtx->to_delete &&
554  deepest_child_vtx->children.size() == 1) {
555  deepest_child = deepest_child_vtx->children[0];
556  deepest_child_vtx= vertices.at(deepest_child);
557  }
558  // make eager that element
559  if (avoid_instantiation && !deepest_child_vtx->object_cache) return;
560  auto deepest_child_value = make_eager(deepest_child);
561  deepest_child_vtx->object_cache = deepest_child_value;
562  deepest_child_vtx->object = deepest_child_value;
563 
564  // now we can delete every vertex up to the deepest child
565  size_t deletion_cur_id = vertex_id;
566  vertex* deletion_cur_vtx = vtx;
567  do {
568  // detach this vertex
569  size_t next_child = deletion_cur_vtx->children[0];
570  vertex* next_child_vtx = vertices.at(next_child);
571 
572  next_child_vtx->parents.erase(std::find(next_child_vtx->parents.begin(),
573  next_child_vtx->parents.end(),
574  deletion_cur_id));
575  delete deletion_cur_vtx;
576  vertices.erase(deletion_cur_id);
577  deletion_cur_id = next_child;
578  deletion_cur_vtx = next_child_vtx;
579  }while(deletion_cur_vtx->to_delete && deletion_cur_vtx->children.size() == 1);
580  }
581  }
582 };
583 
584 } // turicreate
585 
586 template <typename T>
587 std::ostream& operator<<(std::ostream& out,
589  dag.print(out);
590  return out;
591 }
592 
593 #endif
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
lazy_eval_future()=delete
deleted default constructor
future_type * add_operation(operation_type *operation, const std::vector< future_type *> &parents)
std::shared_ptr< value_type > make_eager(size_t vertex_id)
void push(T item, Priority priority)
Enqueues a new item in the queue.
future_type * add_value(value_type *value)
lazy_eval_future & operator=(const lazy_eval_future &other)=default
default assignment operator
virtual size_t num_arguments()=0
#define ASSERT_TRUE(cond)
Definition: assertions.hpp:309
std::pair< T, Priority > pop()
std::shared_ptr< value_type > get_ptr()
bool empty() const
Returns true iff the queue is empty.
void cleanup(bool avoid_instantiation=false)
bool contains(const T &item) const
Returns true if the queue contains the given value.
size_t get_vertex_id() const
Gets the vertex ID in the lazy eval DAG datastructure.