6 #ifndef TURI_UNITY_GRAPH_OPERATION_DAG_HPP 7 #define TURI_UNITY_GRAPH_OPERATION_DAG_HPP 13 #include <unordered_map> 14 #include <core/util/mutable_queue.hpp> 15 #include <core/storage/lazy_eval/lazy_eval_operation.hpp> 17 #include <core/logging/assertions.hpp> 63 if (!
object) make_eager();
74 if (!
object) make_eager();
99 std::shared_ptr<value_type> object;
103 owner(owner), vertex_idx(vertex_idx) { log_func_entry(); }
170 template <
typename T>
173 typedef T value_type;
178 allocator = [](){
return new value_type;},
179 std::function<void(value_type& dest, value_type& src)>
180 copier = [](value_type& dest, value_type& src){dest = src;})
181 :next_vid(0), allocator(allocator), copier(copier) { log_func_entry(); }
191 return add_value(std::shared_ptr<value_type>(value));
194 future_type* add_value(std::shared_ptr<value_type> value) {
195 vertex* vtx =
new vertex(next_vid);
196 vtx->operation = NULL;
197 vtx->object_cache = value;
199 vertices[next_vid] = vtx;
200 future_type* ret =
new future_type(
this, next_vid);
214 const std::vector<future_type*>& parents) {
218 vertex* vtx =
new vertex(next_vid);
220 vtx->operation = operation;
221 vtx->parents.resize(parents.size());
222 for (
size_t i = 0; i < parents.size(); ++i) {
223 vtx->parents[i] = parents[i]->get_vertex_id();
224 vertices[vtx->parents[i]]->children.push_back(next_vid);
226 vertices[next_vid] = vtx;
227 future_type* ret =
new future_type(
this, next_vid);
239 ASSERT_EQ(vertices.count(vertex_id), 1);
242 vertex* vtx = vertices.at(vertex_id);
244 if (!vtx->object.expired())
return vtx->object.lock();
249 auto ancestors = list_ancestors(vertex_id);
250 for(
auto ancestor : ancestors) {
251 if (ancestor.second.size() > 1) {
255 auto ancestor_object = make_eager(ancestor.first);
256 vertices[ancestor.first]->object_cache = ancestor_object;
257 vertices[ancestor.first]->object = ancestor_object;
262 auto ret = preorder_compute(vertex_id);
273 if (vertices.count(vertex_id) == 0)
return;
274 vertex* vtx = vertices.at(vertex_id);
275 vtx->to_delete =
true;
286 if (vertices.count(vertex_id) == 0)
return;
287 vertex* vtx = vertices.at(vertex_id);
303 void cleanup(
bool avoid_instantiation =
false) {
307 std::vector<size_t> ids_to_delete;
308 for(
auto vertex: vertices) {
309 if (vertex.second->to_delete) ids_to_delete.push_back(vertex.first);
312 std::sort(ids_to_delete.rbegin(), ids_to_delete.rend());
313 for(
size_t id: ids_to_delete) {
314 if (vertices.at(
id)->to_delete) {
315 delete_vertex(
id, avoid_instantiation);
320 void print(std::ostream& out)
const {
321 out <<
"digraph G {\n";
322 for(
auto vertex: vertices) {
323 out <<
"\t\"" << vertex.second->vertex_id <<
"\" ";
325 out <<
"[label=\"" << vertex.second->vertex_id <<
":";
326 if (vertex.second->operation) {
327 out<< vertex.second->operation->name();
331 if (!vertex.second->object.expired()) {
332 value_type* v = vertex.second->object.lock().get();
333 out <<
"\\nptr=" << v;
339 if (!vertex.second->object.expired()) {
340 out <<
",style=bold";
342 if (vertex.second->to_delete) {
347 for (
size_t children : vertex.second->children) {
348 out <<
"\t\"" << vertex.second->vertex_id <<
"\" -> " 349 <<
"\"" << children <<
"\"\n";
359 for(
auto vtx: vertices) {
369 explicit vertex(
size_t vertex_id)
370 : operation(NULL), to_delete(
false), vertex_id(vertex_id) { }
371 ~vertex() {
if (operation)
delete operation; }
373 std::weak_ptr<value_type> object;
379 std::shared_ptr<value_type> object_cache;
383 operation_type* operation;
385 std::vector<size_t> parents;
387 std::vector<size_t> children;
393 bool is_value_vertex()
const {
394 return operation == NULL;
397 if (!is_value_vertex()) {
398 object_cache.reset();
404 std::unordered_map<size_t, vertex*> vertices;
406 std::function<value_type*()> allocator;
408 std::function<void(value_type& dest, value_type& src)> copier;
417 std::unordered_map<size_t, std::vector<size_t> > list_ancestors(
size_t vertex) {
418 std::unordered_map<size_t, std::vector<size_t> > ret;
431 vqueue.
push(vertex, vertex);
432 while(!vqueue.
empty()) {
433 size_t curvtx = vqueue.
pop().first;
434 if (ret.count(curvtx) && ret[curvtx].size() >= 2)
continue;
435 for (
size_t parent: vertices.at(curvtx)->parents) {
436 if (vertices.at(curvtx)->object.expired()) {
437 ret[parent].push_back(curvtx);
438 if (!vqueue.
contains(parent)) vqueue.
push(parent, parent);
451 std::shared_ptr<value_type> preorder_compute(
size_t vertex_id,
452 bool make_copy =
true) {
454 vertex* vtx = vertices.at(vertex_id);
456 if (!vtx->object.expired()) {
460 std::shared_ptr<value_type> ret;
461 if (vtx->is_value_vertex()) {
462 ret.reset(allocator());
463 copier(*ret, *(vtx->object.lock()));
464 }
else if (vtx->object_cache.unique()) {
470 ret = vtx->object_cache;
471 vtx->object_cache.reset();
477 ret.reset(allocator());
478 copier(*ret, *(vtx->object.lock()));
482 return vtx->object.lock();
486 if (vtx->parents.size() == 0) {
489 std::shared_ptr<value_type> ret;
490 ret.reset(allocator());
491 vtx->operation->execute(*ret, std::vector<value_type*>());
493 if (!make_copy) vtx->object = ret;
498 std::vector<std::shared_ptr<value_type> > parents_shared_ptr(vtx->parents.size());
499 std::vector<value_type*> other_parents_raw_ptr;
500 for (
size_t i = 0;i < vtx->parents.size(); ++i) {
503 parents_shared_ptr[i] = preorder_compute(vtx->parents[i], i == 0);
504 if (i > 0) other_parents_raw_ptr.push_back(parents_shared_ptr[i].
get());
507 std::shared_ptr<value_type> ret = parents_shared_ptr[0];
508 vtx->operation->execute(*ret, other_parents_raw_ptr);
510 other_parents_raw_ptr.clear();
511 parents_shared_ptr.clear();
512 if (!make_copy) vtx->object = ret;
529 void delete_vertex(
size_t vertex_id,
bool avoid_instantiation =
false) {
530 if (vertices.count(vertex_id) == 0)
return;
531 vertex* vtx = vertices.at(vertex_id);
532 if (vtx->children.size() == 0) {
535 for (
size_t parentid: vtx->parents) {
536 vertex* parent= vertices.at(parentid);
538 parent->children.erase(std::find(parent->children.begin(),
539 parent->children.end(),
544 vertices.erase(vertex_id);
545 }
else if (vtx->parents.size() == 0 && vtx->children.size() == 1) {
551 size_t deepest_child = vertex_id;
552 vertex* deepest_child_vtx = vtx;
553 while(deepest_child_vtx->to_delete &&
554 deepest_child_vtx->children.size() == 1) {
555 deepest_child = deepest_child_vtx->children[0];
556 deepest_child_vtx= vertices.at(deepest_child);
559 if (avoid_instantiation && !deepest_child_vtx->object_cache)
return;
560 auto deepest_child_value = make_eager(deepest_child);
561 deepest_child_vtx->object_cache = deepest_child_value;
562 deepest_child_vtx->object = deepest_child_value;
565 size_t deletion_cur_id = vertex_id;
566 vertex* deletion_cur_vtx = vtx;
569 size_t next_child = deletion_cur_vtx->children[0];
570 vertex* next_child_vtx = vertices.at(next_child);
572 next_child_vtx->parents.erase(std::find(next_child_vtx->parents.begin(),
573 next_child_vtx->parents.end(),
575 delete deletion_cur_vtx;
576 vertices.erase(deletion_cur_id);
577 deletion_cur_id = next_child;
578 deletion_cur_vtx = next_child_vtx;
579 }
while(deletion_cur_vtx->to_delete && deletion_cur_vtx->children.size() == 1);
586 template <
typename T>
587 std::ostream& operator<<(std::ostream& out,
std::shared_ptr< sframe > sort(std::shared_ptr< planner_node > sframe_planner_node, const std::vector< std::string > column_names, const std::vector< size_t > &sort_column_indices, const std::vector< bool > &sort_orders)
lazy_eval_future()=delete
deleted default constructor
future_type * add_operation(operation_type *operation, const std::vector< future_type *> &parents)
void uncache(size_t vertex_id)
std::shared_ptr< value_type > make_eager(size_t vertex_id)
~lazy_eval_operation_dag()
destructor
void push(T item, Priority priority)
Enqueues a new item in the queue.
future_type * add_value(value_type *value)
value_type & operator()()
lazy_eval_future & operator=(const lazy_eval_future &other)=default
default assignment operator
virtual size_t num_arguments()=0
#define ASSERT_TRUE(cond)
~lazy_eval_future()
Destructor.
std::pair< T, Priority > pop()
std::shared_ptr< value_type > get_ptr()
bool empty() const
Returns true iff the queue is empty.
void cleanup(bool avoid_instantiation=false)
bool contains(const T &item) const
Returns true if the queue contains the given value.
size_t get_vertex_id() const
Gets the vertex ID in the lazy eval DAG datastructure.
void mark_for_deletion(size_t vertex_id)