Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
graph.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "../utility/iterator.hpp"
4#include "../utility/object_pool.hpp"
5#include "../utility/traits.hpp"
6#include "../utility/os.hpp"
7#include "../utility/math.hpp"
8#include "../utility/small_vector.hpp"
9#include "error.hpp"
10#include "declarations.hpp"
11#include "semaphore.hpp"
12#include "environment.hpp"
13#include "topology.hpp"
14
20namespace tf {
21
22// ----------------------------------------------------------------------------
23// Class: CustomGraphBase
24// ----------------------------------------------------------------------------
25
29class CustomGraphBase {
30
31 public:
32
33 virtual void dump(std::ostream&, const void*, const std::string&) const = 0;
34 virtual ~CustomGraphBase() = default;
35};
36
37// ----------------------------------------------------------------------------
38// Class: Graph
39// ----------------------------------------------------------------------------
40
56class Graph {
57
58 friend class Node;
59 friend class FlowBuilder;
60 friend class Subflow;
61 friend class Taskflow;
62 friend class Executor;
63
64 public:
65
69 Graph() = default;
70
74 Graph(const Graph&) = delete;
75
79 Graph(Graph&&);
80
84 ~Graph();
85
89 Graph& operator = (const Graph&) = delete;
90
95
99 bool empty() const;
100
104 size_t size() const;
105
109 void clear();
110
111 private:
112
113 std::vector<Node*> _nodes;
114
115 void _clear();
116 void _clear_detached();
117 void _merge(Graph&&);
118 void _erase(Node*);
119
120 template <typename ...ArgsT>
121 Node* _emplace_back(ArgsT&&... args);
122
123 Node* _emplace_back();
124};
125
126// ----------------------------------------------------------------------------
127
150class Runtime {
151
152 friend class Executor;
153
154 public:
155
172
208 void schedule(Task task);
209
213 template <typename C>
214 void run(C&&);
215
216 private:
217
218 explicit Runtime(Executor&, Worker&, Node*);
219
220 Executor& _executor;
221 Worker& _worker;
222 Node* _parent;
223};
224
225// constructor
226inline Runtime::Runtime(Executor& e, Worker& w, Node* p) :
227 _executor{e},
228 _worker {w},
229 _parent {p}{
230}
231
232// Function: executor
234 return _executor;
235}
236
237// ----------------------------------------------------------------------------
238// Node
239// ----------------------------------------------------------------------------
240
244class Node {
245
246 friend class Graph;
247 friend class Task;
248 friend class TaskView;
249 friend class Taskflow;
250 friend class Executor;
251 friend class FlowBuilder;
252 friend class Subflow;
253 friend class Runtime;
254
255 TF_ENABLE_POOLABLE_ON_THIS;
256
257 // state bit flag
258 constexpr static int CONDITIONED = 1;
259 constexpr static int DETACHED = 2;
260 constexpr static int ACQUIRED = 4;
261 constexpr static int READY = 8;
262 constexpr static int DEFERRED = 16;
263
264 // static work handle
265 struct Static {
266
267 template <typename C>
268 Static(C&&);
269
270 std::function<void()> work;
271 };
272
273 // runtime work handle
274 struct Runtime {
275
276 template <typename C>
277 Runtime(C&&);
278
279 std::function<void(tf::Runtime&)> work;
280 };
281
282 // dynamic work handle
283 struct Dynamic {
284
285 template <typename C>
286 Dynamic(C&&);
287
288 std::function<void(Subflow&)> work;
289 Graph subgraph;
290 };
291
292 // condition work handle
293 struct Condition {
294
295 template <typename C>
296 Condition(C&&);
297
298 std::function<int()> work;
299 };
300
301 // multi-condition work handle
302 struct MultiCondition {
303
304 template <typename C>
305 MultiCondition(C&&);
306
308 };
309
310 // module work handle
311 struct Module {
312
313 template <typename T>
314 Module(T&);
315
316 Graph& graph;
317 };
318
319 // Async work
320 struct Async {
321
322 template <typename T>
324
325 std::function<void(bool)> work;
326
328 };
329
330 // Silent async work
331 struct SilentAsync {
332
333 template <typename C>
334 SilentAsync(C&&);
335
336 std::function<void()> work;
337 };
338
339 // cudaFlow work handle
340 struct cudaFlow {
341
342 template <typename C, typename G>
343 cudaFlow(C&& c, G&& g);
344
345 std::function<void(Executor&, Node*)> work;
346
348 };
349
350 // syclFlow work handle
351 struct syclFlow {
352
353 template <typename C, typename G>
354 syclFlow(C&& c, G&& g);
355
356 std::function<void(Executor&, Node*)> work;
357
359 };
360
361 using handle_t = std::variant<
362 std::monostate, // placeholder
363 Static, // static tasking
364 Dynamic, // dynamic tasking
365 Condition, // conditional tasking
366 MultiCondition, // multi-conditional tasking
367 Module, // composable tasking
368 Async, // async tasking
369 SilentAsync, // async tasking (no future)
370 cudaFlow, // cudaFlow
371 syclFlow, // syclFlow
372 Runtime // runtime tasking
373 >;
374
375 struct Semaphores {
376 SmallVector<Semaphore*> to_acquire;
377 SmallVector<Semaphore*> to_release;
378 };
379
380 public:
381
382 // variant index
383 constexpr static auto PLACEHOLDER = get_index_v<std::monostate, handle_t>;
384 constexpr static auto STATIC = get_index_v<Static, handle_t>;
385 constexpr static auto DYNAMIC = get_index_v<Dynamic, handle_t>;
386 constexpr static auto CONDITION = get_index_v<Condition, handle_t>;
387 constexpr static auto MULTI_CONDITION = get_index_v<MultiCondition, handle_t>;
388 constexpr static auto MODULE = get_index_v<Module, handle_t>;
389 constexpr static auto ASYNC = get_index_v<Async, handle_t>;
390 constexpr static auto SILENT_ASYNC = get_index_v<SilentAsync, handle_t>;
391 constexpr static auto CUDAFLOW = get_index_v<cudaFlow, handle_t>;
392 constexpr static auto SYCLFLOW = get_index_v<syclFlow, handle_t>;
393 constexpr static auto RUNTIME = get_index_v<Runtime, handle_t>;
394
395 template <typename... Args>
396 Node(Args&&... args);
397
398 ~Node();
399
400 size_t num_successors() const;
401 size_t num_dependents() const;
402 size_t num_strong_dependents() const;
403 size_t num_weak_dependents() const;
404
405 const std::string& name() const;
406
407 private:
408
409 std::string _name;
410
411 void* _data {nullptr};
412
413 handle_t _handle;
414
415 SmallVector<Node*> _successors;
416 SmallVector<Node*> _dependents;
417
418 Topology* _topology {nullptr};
419
420 Node* _parent {nullptr};
421
422 std::atomic<int> _state {0};
423 std::atomic<size_t> _join_counter {0};
424
425 std::unique_ptr<Semaphores> _semaphores;
426
427 void _precede(Node*);
428 void _set_up_join_counter();
429
430 bool _is_cancelled() const;
431 bool _is_conditioner() const;
432 bool _acquire_all(SmallVector<Node*>&);
433
434 SmallVector<Node*> _release_all();
435};
436
437// ----------------------------------------------------------------------------
438// Node Object Pool
439// ----------------------------------------------------------------------------
440
444inline ObjectPool<Node> node_pool;
445
446// ----------------------------------------------------------------------------
447// Definition for Node::Static
448// ----------------------------------------------------------------------------
449
450// Constructor
451template <typename C>
452Node::Static::Static(C&& c) : work {std::forward<C>(c)} {
453}
454
455// ----------------------------------------------------------------------------
456// Definition for Node::Dynamic
457// ----------------------------------------------------------------------------
458
459// Constructor
460template <typename C>
461Node::Dynamic::Dynamic(C&& c) : work {std::forward<C>(c)} {
462}
463
464// ----------------------------------------------------------------------------
465// Definition for Node::Condition
466// ----------------------------------------------------------------------------
467
468// Constructor
469template <typename C>
470Node::Condition::Condition(C&& c) : work {std::forward<C>(c)} {
471}
472
473// ----------------------------------------------------------------------------
474// Definition for Node::MultiCondition
475// ----------------------------------------------------------------------------
476
477// Constructor
478template <typename C>
479Node::MultiCondition::MultiCondition(C&& c) : work {std::forward<C>(c)} {
480}
481
482// ----------------------------------------------------------------------------
483// Definition for Node::cudaFlow
484// ----------------------------------------------------------------------------
485
486template <typename C, typename G>
487Node::cudaFlow::cudaFlow(C&& c, G&& g) :
488 work {std::forward<C>(c)},
489 graph {std::forward<G>(g)} {
490}
491
492// ----------------------------------------------------------------------------
493// Definition for Node::syclFlow
494// ----------------------------------------------------------------------------
495
496template <typename C, typename G>
497Node::syclFlow::syclFlow(C&& c, G&& g) :
498 work {std::forward<C>(c)},
499 graph {std::forward<G>(g)} {
500}
501
502// ----------------------------------------------------------------------------
503// Definition for Node::Module
504// ----------------------------------------------------------------------------
505
506// Constructor
507template <typename T>
508inline Node::Module::Module(T& obj) : graph{ obj.graph() } {
509}
510
511// ----------------------------------------------------------------------------
512// Definition for Node::Async
513// ----------------------------------------------------------------------------
514
515// Constructor
516template <typename C>
517Node::Async::Async(C&& c, std::shared_ptr<AsyncTopology>tpg) :
518 work {std::forward<C>(c)},
519 topology {std::move(tpg)} {
520}
521
522// ----------------------------------------------------------------------------
523// Definition for Node::SilentAsync
524// ----------------------------------------------------------------------------
525
526// Constructor
527template <typename C>
528Node::SilentAsync::SilentAsync(C&& c) :
529 work {std::forward<C>(c)} {
530}
531
532// ----------------------------------------------------------------------------
533// Definition for Node::Runtime
534// ----------------------------------------------------------------------------
535
536// Constructor
537template <typename C>
538Node::Runtime::Runtime(C&& c) :
539 work {std::forward<C>(c)} {
540}
541
542// ----------------------------------------------------------------------------
543// Definition for Node
544// ----------------------------------------------------------------------------
545
546// Constructor
547template <typename... Args>
548Node::Node(Args&&... args): _handle{std::forward<Args>(args)...} {
549}
550
551// Destructor
552inline Node::~Node() {
553 // this is to avoid stack overflow
554
555 if(_handle.index() == DYNAMIC) {
556 // using std::get_if instead of std::get makes this compatible
557 // with older macOS versions
558 // the result of std::get_if is guaranteed to be non-null
559 // due to the index check above
560 auto& subgraph = std::get_if<Dynamic>(&_handle)->subgraph;
561 std::vector<Node*> nodes;
562 nodes.reserve(subgraph.size());
563
564 std::move(
565 subgraph._nodes.begin(), subgraph._nodes.end(), std::back_inserter(nodes)
566 );
567 subgraph._nodes.clear();
568
569 size_t i = 0;
570
571 while(i < nodes.size()) {
572
573 if(nodes[i]->_handle.index() == DYNAMIC) {
574 auto& sbg = std::get_if<Dynamic>(&(nodes[i]->_handle))->subgraph;
575 std::move(
576 sbg._nodes.begin(), sbg._nodes.end(), std::back_inserter(nodes)
577 );
578 sbg._nodes.clear();
579 }
580
581 ++i;
582 }
583
584 //auto& np = Graph::_node_pool();
585 for(i=0; i<nodes.size(); ++i) {
586 node_pool.recycle(nodes[i]);
587 }
588 }
589}
590
591// Procedure: _precede
592inline void Node::_precede(Node* v) {
593 _successors.push_back(v);
594 v->_dependents.push_back(this);
595}
596
597// Function: num_successors
598inline size_t Node::num_successors() const {
599 return _successors.size();
600}
601
602// Function: dependents
603inline size_t Node::num_dependents() const {
604 return _dependents.size();
605}
606
607// Function: num_weak_dependents
608inline size_t Node::num_weak_dependents() const {
609 size_t n = 0;
610 for(size_t i=0; i<_dependents.size(); i++) {
611 //if(_dependents[i]->_handle.index() == Node::CONDITION) {
612 if(_dependents[i]->_is_conditioner()) {
613 n++;
614 }
615 }
616 return n;
617}
618
619// Function: num_strong_dependents
620inline size_t Node::num_strong_dependents() const {
621 size_t n = 0;
622 for(size_t i=0; i<_dependents.size(); i++) {
623 //if(_dependents[i]->_handle.index() != Node::CONDITION) {
624 if(!_dependents[i]->_is_conditioner()) {
625 n++;
626 }
627 }
628 return n;
629}
630
631// Function: name
632inline const std::string& Node::name() const {
633 return _name;
634}
635
636// Function: _is_conditioner
637inline bool Node::_is_conditioner() const {
638 return _handle.index() == Node::CONDITION ||
639 _handle.index() == Node::MULTI_CONDITION;
640}
641
642// Function: _is_cancelled
643inline bool Node::_is_cancelled() const {
644 if(_handle.index() == Node::ASYNC) {
645 auto h = std::get_if<Node::Async>(&_handle);
646 if(h->topology && h->topology->_is_cancelled.load(std::memory_order_relaxed)) {
647 return true;
648 }
649 // async tasks spawned from subflow does not have topology
650 }
651 return _topology && _topology->_is_cancelled.load(std::memory_order_relaxed);
652}
653
654// Procedure: _set_up_join_counter
655inline void Node::_set_up_join_counter() {
656 size_t c = 0;
657 for(auto p : _dependents) {
658 //if(p->_handle.index() == Node::CONDITION) {
659 if(p->_is_conditioner()) {
660 _state.fetch_or(Node::CONDITIONED, std::memory_order_relaxed);
661 }
662 else {
663 c++;
664 }
665 }
666 _join_counter.store(c, std::memory_order_release);
667}
668
669
670// Function: _acquire_all
671inline bool Node::_acquire_all(SmallVector<Node*>& nodes) {
672
673 auto& to_acquire = _semaphores->to_acquire;
674
675 for(size_t i = 0; i < to_acquire.size(); ++i) {
676 if(!to_acquire[i]->_try_acquire_or_wait(this)) {
677 for(size_t j = 1; j <= i; ++j) {
678 auto r = to_acquire[i-j]->_release();
679 nodes.insert(std::end(nodes), std::begin(r), std::end(r));
680 }
681 return false;
682 }
683 }
684 return true;
685}
686
687// Function: _release_all
688inline SmallVector<Node*> Node::_release_all() {
689
690 auto& to_release = _semaphores->to_release;
691
692 SmallVector<Node*> nodes;
693 for(const auto& sem : to_release) {
694 auto r = sem->_release();
695 nodes.insert(std::end(nodes), std::begin(r), std::end(r));
696 }
697
698 return nodes;
699}
700
701// ----------------------------------------------------------------------------
702// Graph definition
703// ----------------------------------------------------------------------------
704
705// Destructor
707 _clear();
708}
709
710// Move constructor
711inline Graph::Graph(Graph&& other) :
712 _nodes {std::move(other._nodes)} {
713}
714
715// Move assignment
717 _clear();
718 _nodes = std::move(other._nodes);
719 return *this;
720}
721
722// Procedure: clear
723inline void Graph::clear() {
724 _clear();
725}
726
727// Procedure: clear
728inline void Graph::_clear() {
729 for(auto node : _nodes) {
730 node_pool.recycle(node);
731 }
732 _nodes.clear();
733}
734
735// Procedure: clear_detached
736inline void Graph::_clear_detached() {
737
738 auto mid = std::partition(_nodes.begin(), _nodes.end(), [] (Node* node) {
739 return !(node->_state.load(std::memory_order_relaxed) & Node::DETACHED);
740 });
741
742 for(auto itr = mid; itr != _nodes.end(); ++itr) {
743 node_pool.recycle(*itr);
744 }
745 _nodes.resize(std::distance(_nodes.begin(), mid));
746}
747
748// Procedure: merge
749inline void Graph::_merge(Graph&& g) {
750 for(auto n : g._nodes) {
751 _nodes.push_back(n);
752 }
753 g._nodes.clear();
754}
755
756// Function: erase
757inline void Graph::_erase(Node* node) {
758 if(auto I = std::find(_nodes.begin(), _nodes.end(), node); I != _nodes.end()) {
759 _nodes.erase(I);
760 node_pool.recycle(node);
761 }
762}
763
764// Function: size
765inline size_t Graph::size() const {
766 return _nodes.size();
767}
768
769// Function: empty
770inline bool Graph::empty() const {
771 return _nodes.empty();
772}
773
774// Function: emplace_back
775template <typename ...ArgsT>
776Node* Graph::_emplace_back(ArgsT&&... args) {
777 _nodes.push_back(node_pool.animate(std::forward<ArgsT>(args)...));
778 return _nodes.back();
779}
780
781// Function: emplace_back
782inline Node* Graph::_emplace_back() {
783 _nodes.push_back(node_pool.animate());
784 return _nodes.back();
785}
786
787
788} // end of namespace tf. ---------------------------------------------------
T back_inserter(T... args)
T begin(T... args)
class to create an executor for running a taskflow graph
Definition executor.hpp:50
class to build a task dependency graph
Definition flow_builder.hpp:21
class to create a graph object
Definition graph.hpp:56
Graph & operator=(const Graph &)=delete
disabled copy assignment operator
Graph()=default
constructs a graph object
bool empty() const
queries if the graph is empty
Definition graph.hpp:770
~Graph()
destructs the graph object
Definition graph.hpp:706
size_t size() const
queries the number of nodes in the graph
Definition graph.hpp:765
void clear()
clears the graph
Definition graph.hpp:723
Graph(const Graph &)=delete
disabled copy constructor
class to create a runtime object used by a runtime task
Definition graph.hpp:150
Executor & executor()
obtains the running executor
Definition graph.hpp:233
void run(C &&)
runs a task callable synchronously
Definition executor.hpp:1963
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition executor.hpp:1953
class to construct a subflow graph from the execution of a dynamic task
Definition flow_builder.hpp:889
class to access task information from the observer interface
Definition task.hpp:638
class to create a task handle over a node in a taskflow graph
Definition task.hpp:187
class to create a taskflow object
Definition core/taskflow.hpp:73
T distance(T... args)
T end(T... args)
T find(T... args)
T forward(T... args)
T move(T... args)
taskflow namespace
Definition small_vector.hpp:27
@ DYNAMIC
dynamic (subflow) task type
@ MODULE
module task type
@ CUDAFLOW
cudaFlow task type
@ MULTI_CONDITION
multi-condition task type
@ CONDITION
condition task type
@ SYCLFLOW
syclFlow task type
@ ASYNC
asynchronous task type
@ PLACEHOLDER
placeholder task type
@ RUNTIME
runtime task type
@ STATIC
static task type
T partition(T... args)
semaphore include file