375 template<
typename P,
typename C>
406 template<
typename P,
typename C>
508 template <
typename F,
typename... ArgsT>
509 auto async(F&& f, ArgsT&&... args);
541 template <
typename F,
typename... ArgsT>
558 template <
typename F,
typename... ArgsT>
575 template <
typename F,
typename... ArgsT>
595 template <
typename Observer,
typename... ArgsT>
603 template <
typename Observer>
618 size_t _num_topologies {0};
627 TaskQueue<Node*> _wsq;
635 Worker* _this_worker();
637 bool _wait_for_task(Worker&, Node*&);
639 void _observer_prologue(Worker&, Node*);
640 void _observer_epilogue(Worker&, Node*);
642 void _worker_loop(Worker&);
643 void _exploit_task(Worker&, Node*&);
644 void _explore_task(Worker&, Node*&);
645 void _consume_task(Worker&, Node*);
646 void _schedule(Worker&, Node*);
647 void _schedule(Node*);
650 void _set_up_topology(Worker*, Topology*);
651 void _tear_down_topology(Worker&, Topology*);
652 void _tear_down_async(Node*);
653 void _tear_down_invoke(Worker&, Node*);
654 void _cancel_invoke(Worker&, Node*);
655 void _increment_topology();
656 void _decrement_topology();
657 void _decrement_topology_and_notify();
658 void _invoke(Worker&, Node*);
659 void _invoke_static_task(Worker&, Node*);
660 void _invoke_dynamic_task(Worker&, Node*);
661 void _join_dynamic_task_external(Worker&, Node*,
Graph&);
662 void _join_dynamic_task_internal(Worker&, Node*,
Graph&);
663 void _detach_dynamic_task(Worker&, Node*,
Graph&);
666 void _invoke_module_task(Worker&, Node*);
667 void _invoke_async_task(Worker&, Node*);
668 void _invoke_silent_async_task(Worker&, Node*);
669 void _invoke_cudaflow_task(Worker&, Node*);
670 void _invoke_syclflow_task(Worker&, Node*);
671 void _invoke_runtime_task(Worker&, Node*);
673 template <
typename C,
676 void _invoke_cudaflow_task_entry(Node*, C&&);
678 template <
typename C,
typename Q,
681 void _invoke_syclflow_task_entry(Node*, C&&, Q&);
690 TF_THROW(
"no cpu workers to execute taskflows");
696 if(has_env(TF_ENABLE_PROFILER)) {
697 TFProfManager::get()._manage(make_observer<TFProfObserver>());
710 _notifier.notify(
true);
712 for(
auto& t : _threads){
719 return _workers.size();
724 return _num_topologies;
729 return _taskflows.size();
733inline Worker* Executor::_this_worker() {
735 return itr == _wids.end() ? nullptr : &_workers[itr->second];
739template <
typename F,
typename... ArgsT>
742 _increment_topology();
753 auto node = node_pool.animate(
756 (
bool cancel)
mutable {
761 p.object.set_value();
764 p.object.set_value(cancel ?
std::nullopt : std::make_optional(f(args...)));
772 if(
auto w = _this_worker(); w) {
783template <
typename F,
typename... ArgsT>
789template <
typename F,
typename... ArgsT>
794 _increment_topology();
796 Node* node = node_pool.animate(
805 if(
auto w = _this_worker(); w) {
814template <
typename F,
typename... ArgsT>
822 return i == _wids.end() ? -1 :
static_cast<int>(_workers[i->second]._id);
826inline void Executor::_spawn(
size_t N) {
832 for(
size_t id=0;
id<N; ++id) {
834 _workers[id]._id = id;
835 _workers[id]._vtm = id;
836 _workers[id]._executor =
this;
837 _workers[id]._waiter = &_notifier._waiters[id];
839 _threads.emplace_back([
this] (
863 if(_wait_for_task(w, t) ==
false) {
872 cond.wait(lock, [&](){
return n==N; });
876inline void Executor::_consume_task(Worker& w, Node* p) {
880 while(p->_join_counter != 0) {
884 if(
auto t = w._wsq.pop(); t) {
888 size_t num_steals = 0;
890 size_t max_steals = ((_workers.size() + 1) << 1);
894 t = (w._id == w._vtm) ? _wsq.steal() : _workers[w._vtm]._wsq.steal();
900 else if(p->_join_counter != 0){
902 if(num_steals++ > max_steals) {
908 w._vtm = rdvtm(w._rdgen);
919inline void Executor::_explore_task(Worker& w, Node*& t) {
924 size_t num_steals = 0;
925 size_t num_yields = 0;
926 size_t max_steals = ((_workers.size() + 1) << 1);
931 t = (w._id == w._vtm) ? _wsq.steal() : _workers[w._vtm]._wsq.steal();
937 if(num_steals++ > max_steals) {
939 if(num_yields++ > 100) {
944 w._vtm = rdvtm(w._rdgen);
950inline void Executor::_exploit_task(Worker& w, Node*& t) {
954 if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
955 _notifier.notify(
false);
968inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
978 _explore_task(worker, t);
981 if(_num_thieves.fetch_sub(1) == 1) {
982 _notifier.notify(
false);
987 _notifier.prepare_wait(worker._waiter);
992 _notifier.cancel_wait(worker._waiter);
997 if(_num_thieves.fetch_sub(1) == 1) {
998 _notifier.notify(
false);
1003 worker._vtm = worker._id;
1009 _notifier.cancel_wait(worker._waiter);
1010 _notifier.notify(
true);
1015 if(_num_thieves.fetch_sub(1) == 1) {
1017 _notifier.cancel_wait(worker._waiter);
1021 for(
auto& w : _workers) {
1022 if(!w._wsq.empty()) {
1023 worker._vtm = w._id;
1024 _notifier.cancel_wait(worker._waiter);
1031 _notifier.commit_wait(worker._waiter);
1037template<
typename Observer,
typename... ArgsT>
1042 "Observer must be derived from ObserverInterface"
1048 ptr->set_up(_workers.size());
1056template <
typename Observer>
1061 "Observer must be derived from ObserverInterface"
1069 return _observers.size();
1073inline void Executor::_schedule(Worker& worker, Node* node) {
1078 if(worker._executor ==
this) {
1079 worker._wsq.push(node);
1088 _notifier.notify(
false);
1092inline void Executor::_schedule(Node* node) {
1101 _notifier.notify(
false);
1105inline void Executor::_schedule(
1106 Worker& worker,
const SmallVector<Node*>& nodes
1111 const auto num_nodes = nodes.size();
1113 if(num_nodes == 0) {
1118 for(
size_t i=0; i<num_nodes; ++i) {
1122 if(worker._executor ==
this) {
1123 for(
size_t i=0; i<num_nodes; ++i) {
1124 worker._wsq.push(nodes[i]);
1131 for(
size_t k=0; k<num_nodes; ++k) {
1132 _wsq.push(nodes[k]);
1136 _notifier.notify_n(num_nodes);
1140inline void Executor::_schedule(
const SmallVector<Node*>& nodes) {
1143 const auto num_nodes = nodes.size();
1145 if(num_nodes == 0) {
1150 for(
size_t i=0; i<num_nodes; ++i) {
1156 for(
size_t k=0; k<num_nodes; ++k) {
1157 _wsq.push(nodes[k]);
1161 _notifier.notify_n(num_nodes);
1165inline void Executor::_invoke(Worker& worker, Node* node) {
1173 if(node->_is_cancelled()) {
1174 _cancel_invoke(worker, node);
1179 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1180 SmallVector<Node*> nodes;
1181 if(!node->_acquire_all(nodes)) {
1182 _schedule(worker, nodes);
1190 SmallVector<int> conds;
1193 switch(node->_handle.index()) {
1196 _invoke_static_task(worker, node);
1201 case Node::DYNAMIC: {
1202 _invoke_dynamic_task(worker, node);
1207 case Node::CONDITION: {
1208 _invoke_condition_task(worker, node, conds);
1213 case Node::MULTI_CONDITION: {
1214 _invoke_multi_condition_task(worker, node, conds);
1219 case Node::MODULE: {
1220 _invoke_module_task(worker, node);
1226 _invoke_async_task(worker, node);
1227 _tear_down_async(node);
1233 case Node::SILENT_ASYNC: {
1234 _invoke_silent_async_task(worker, node);
1235 _tear_down_async(node);
1241 case Node::CUDAFLOW: {
1242 _invoke_cudaflow_task(worker, node);
1247 case Node::SYCLFLOW: {
1248 _invoke_syclflow_task(worker, node);
1253 case Node::RUNTIME: {
1254 _invoke_runtime_task(worker, node);
1264 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1265 _schedule(worker, node->_release_all());
1272 node->_join_counter = node->num_strong_dependents();
1275 node->_join_counter = node->num_dependents();
1279 auto& j = (node->_parent) ? node->_parent->_join_counter :
1280 node->_topology->_join_counter;
1282 Node* cache {
nullptr};
1286 switch(node->_handle.index()) {
1289 case Node::CONDITION:
1290 case Node::MULTI_CONDITION: {
1291 for(
auto cond : conds) {
1292 if(cond >= 0 &&
static_cast<size_t>(cond) < node->_successors.size()) {
1293 auto s = node->_successors[cond];
1298 _schedule(worker, cache);
1308 for(
size_t i=0; i<node->_successors.size(); ++i) {
1309 if(--(node->_successors[i]->_join_counter) == 0) {
1312 _schedule(worker, cache);
1314 cache = node->_successors[i];
1322 _tear_down_invoke(worker, node);
1334inline void Executor::_tear_down_async(Node* node) {
1336 node->_parent->_join_counter.fetch_sub(1);
1339 _decrement_topology_and_notify();
1341 node_pool.recycle(node);
1345inline void Executor::_tear_down_invoke(Worker& worker, Node* node) {
1348 if(node->_parent ==
nullptr) {
1349 if(node->_topology->_join_counter.fetch_sub(1) == 1) {
1350 _tear_down_topology(worker, node->_topology);
1354 node->_parent->_join_counter.fetch_sub(1);
1359inline void Executor::_cancel_invoke(Worker& worker, Node* node) {
1361 switch(node->_handle.index()) {
1365 _tear_down_async(node);
1369 case Node::SILENT_ASYNC:
1370 _tear_down_async(node);
1375 _tear_down_invoke(worker, node);
1382inline void Executor::_observer_prologue(Worker& worker, Node* node) {
1383 for(
auto& observer : _observers) {
1384 observer->on_entry(WorkerView(worker), TaskView(*node));
1389inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
1390 for(
auto& observer : _observers) {
1391 observer->on_exit(WorkerView(worker), TaskView(*node));
1396inline void Executor::_invoke_static_task(Worker& worker, Node* node) {
1397 _observer_prologue(worker, node);
1399 _observer_epilogue(worker, node);
1403inline void Executor::_invoke_dynamic_task(Worker& w, Node* node) {
1405 _observer_prologue(w, node);
1409 handle->subgraph._clear();
1411 Subflow sf(*
this, w, node, handle->subgraph);
1416 _join_dynamic_task_internal(w, node, handle->subgraph);
1419 _observer_epilogue(w, node);
1423inline void Executor::_detach_dynamic_task(
1424 Worker& w, Node* p, Graph& g
1428 if(g.empty() && p->_join_counter == 0) {
1432 SmallVector<Node*> src;
1434 for(
auto n : g._nodes) {
1437 n->_set_up_join_counter();
1438 n->_topology = p->_topology;
1439 n->_parent =
nullptr;
1441 if(n->num_dependents() == 0) {
1448 p->_topology->_taskflow._graph._merge(
std::move(g));
1451 p->_topology->_join_counter.fetch_add(src.size());
1456inline void Executor::_join_dynamic_task_external(
1457 Worker& w, Node* p, Graph& g
1461 if(g.empty() && p->_join_counter == 0) {
1465 SmallVector<Node*> src;
1467 for(
auto n : g._nodes) {
1470 n->_set_up_join_counter();
1471 n->_topology = p->_topology;
1474 if(n->num_dependents() == 0) {
1478 p->_join_counter.fetch_add(src.size());
1480 _consume_task(w, p);
1484inline void Executor::_join_dynamic_task_internal(
1485 Worker& w, Node* p, Graph& g
1489 if(g.empty() && p->_join_counter == 0) {
1493 SmallVector<Node*> src;
1495 for(
auto n : g._nodes) {
1496 n->_topology = p->_topology;
1498 n->_set_up_join_counter();
1500 if(n->num_dependents() == 0) {
1504 p->_join_counter.fetch_add(src.size());
1506 _consume_task(w, p);
1510inline void Executor::_invoke_condition_task(
1511 Worker& worker, Node* node, SmallVector<int>& conds
1513 _observer_prologue(worker, node);
1515 _observer_epilogue(worker, node);
1519inline void Executor::_invoke_multi_condition_task(
1520 Worker& worker, Node* node, SmallVector<int>& conds
1522 _observer_prologue(worker, node);
1524 _observer_epilogue(worker, node);
1528inline void Executor::_invoke_cudaflow_task(Worker& worker, Node* node) {
1529 _observer_prologue(worker, node);
1531 _observer_epilogue(worker, node);
1535inline void Executor::_invoke_syclflow_task(Worker& worker, Node* node) {
1536 _observer_prologue(worker, node);
1538 _observer_epilogue(worker, node);
1542inline void Executor::_invoke_module_task(Worker& w, Node* node) {
1543 _observer_prologue(w, node);
1544 _join_dynamic_task_internal(
1547 _observer_epilogue(w, node);
1551inline void Executor::_invoke_async_task(Worker& w, Node* node) {
1552 _observer_prologue(w, node);
1554 _observer_epilogue(w, node);
1558inline void Executor::_invoke_silent_async_task(Worker& w, Node* node) {
1559 _observer_prologue(w, node);
1561 _observer_epilogue(w, node);
1565inline void Executor::_invoke_runtime_task(Worker& w, Node* node) {
1566 _observer_prologue(w, node);
1567 Runtime rt(*
this, w, node);
1569 _observer_epilogue(w, node);
1574 return run_n(f, 1, [](){});
1583template <
typename C>
1589template <
typename C>
1596 return run_n(f, repeat, [](){});
1605template <
typename C>
1613template <
typename C>
1633template <
typename P,
typename C>
1636 _increment_topology();
1650 promise.set_value();
1651 _decrement_topology_and_notify();
1664 f._topologies.push(t);
1665 if(f._topologies.size() == 1) {
1666 _set_up_topology(_this_worker(), t.get());
1674template <
typename P,
typename C>
1681 itr = _taskflows.emplace(_taskflows.end(),
std::move(f));
1682 itr->_satellite = itr;
1689inline void Executor::_increment_topology() {
1695inline void Executor::_decrement_topology_and_notify() {
1697 if(--_num_topologies == 0) {
1698 _topology_cv.notify_all();
1703inline void Executor::_decrement_topology() {
1711 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
1715inline void Executor::_set_up_topology(Worker* worker, Topology* tpg) {
1719 tpg->_sources.clear();
1720 tpg->_taskflow._graph._clear_detached();
1723 for(
auto node : tpg->_taskflow._graph._nodes) {
1725 node->_topology = tpg;
1728 if(node->num_dependents() == 0) {
1729 tpg->_sources.push_back(node);
1732 node->_set_up_join_counter();
1735 tpg->_join_counter = tpg->_sources.size();
1738 _schedule(*worker, tpg->_sources);
1741 _schedule(tpg->_sources);
1746inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
1748 auto &f = tpg->_taskflow;
1753 if(!tpg->_is_cancelled && !tpg->_pred()) {
1756 tpg->_join_counter = tpg->_sources.size();
1757 _schedule(worker, tpg->_sources);
1764 if(tpg->_call !=
nullptr) {
1773 tpg->_promise.set_value();
1774 f._topologies.pop();
1775 tpg = f._topologies.front().get();
1778 _decrement_topology();
1782 _set_up_topology(&worker, tpg);
1797 auto s {f._satellite};
1800 f._topologies.pop();
1809 _decrement_topology_and_notify();
1816 _taskflows.erase(*s);
1831 TF_THROW(
"subflow not joinable");
1835 _executor._join_dynamic_task_external(_worker, _parent,
_graph);
1844 TF_THROW(
"subflow already joined or detached");
1848 _executor._detach_dynamic_task(_worker, _parent,
_graph);
1853template <
typename F,
typename... ArgsT>
1855 return _named_async(
1861template <
typename F,
typename... ArgsT>
1862auto Subflow::_named_async(
1869 _parent->_join_counter.fetch_add(1);
1880 auto node = node_pool.animate(
1883 (
bool cancel)
mutable {
1888 p.object.set_value();
1898 node->_topology = _parent->_topology;
1899 node->_parent = _parent;
1901 _executor._schedule(w, node);
1907template <
typename F,
typename... ArgsT>
1913template <
typename F,
typename... ArgsT>
1914void Subflow::_named_silent_async(
1915 Worker& w,
const std::string& name, F&& f, ArgsT&&... args
1918 _parent->_join_counter.fetch_add(1);
1920 auto node = node_pool.animate(
1928 node->_topology = _parent->_topology;
1929 node->_parent = _parent;
1931 _executor._schedule(w, node);
1935template <
typename F,
typename... ArgsT>
1937 _named_silent_async(
1943template <
typename F,
typename... ArgsT>
1954 auto node = task._node;
1955 auto& j = node->_parent ? node->_parent->_join_counter :
1956 node->_topology->_join_counter;
1958 _executor._schedule(_worker, node);
1962template <
typename C>
1966 if constexpr(is_dynamic_task_v<C>) {
1968 Subflow sf(_executor, _worker, _parent, graph);
1971 _executor._join_dynamic_task_internal(_worker, _parent, graph);
1975 static_assert(dependent_false_v<C>,
"unsupported task callable to run");
class to create an executor for running a taskflow graph
Definition executor.hpp:50
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
Definition executor.hpp:1622
auto async(F &&f, ArgsT &&... args)
runs a given function asynchronously
Definition executor.hpp:784
void named_silent_async(const std::string &name, F &&f, ArgsT &&... args)
similar to tf::Executor::named_async but does not return a future object
Definition executor.hpp:790
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
Definition executor.hpp:1057
Executor(size_t N=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition executor.hpp:685
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Definition executor.hpp:1573
auto named_async(const std::string &name, F &&f, ArgsT &&... args)
runs a given function asynchronously and gives a name to this task
Definition executor.hpp:740
~Executor()
destructs the executor
Definition executor.hpp:702
size_t num_taskflows() const
queries the number of running taskflows with moved ownership
Definition executor.hpp:728
int this_worker_id() const
queries the id of the caller thread in this executor
Definition executor.hpp:820
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
Definition executor.hpp:1595
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition executor.hpp:723
size_t num_workers() const noexcept
queries the number of worker threads
Definition executor.hpp:718
void wait_for_all()
wait for all tasks to complete
Definition executor.hpp:1709
void silent_async(F &&f, ArgsT &&... args)
similar to tf::Executor::async but does not return a future object
Definition executor.hpp:815
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
Definition executor.hpp:1038
size_t num_observers() const noexcept
queries the number of observers
Definition executor.hpp:1068
class to build a task dependency graph
Definition flow_builder.hpp:21
Graph & _graph
associated graph object
Definition flow_builder.hpp:727
class to access the result of an execution
Definition core/taskflow.hpp:571
class to create a graph object
Definition graph.hpp:56
class to create a runtime object used by a runtime task
Definition graph.hpp:150
void run(C &&)
runs a task callable synchronously
Definition executor.hpp:1963
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition executor.hpp:1953
class to define a vector optimized for small array
Definition small_vector.hpp:918
class to construct a subflow graph from the execution of a dynamic task
Definition flow_builder.hpp:889
void named_silent_async(const std::string &name, F &&f, ArgsT &&... args)
similar to tf::Subflow::named_async but does not return a future object
Definition executor.hpp:1936
void join()
enables the subflow to join its parent task
Definition executor.hpp:1826
auto async(F &&f, ArgsT &&... args)
runs a given function asynchronously
Definition executor.hpp:1908
void detach()
enables the subflow to detach from its parent task
Definition executor.hpp:1839
auto named_async(const std::string &name, F &&f, ArgsT &&... args)
runs the given function asynchronously and assigns the task a name
Definition executor.hpp:1854
void silent_async(F &&f, ArgsT &&... args)
similar to tf::Subflow::async but does not return a future object
Definition executor.hpp:1944
class to create a task handle over a node in a taskflow graph
Definition task.hpp:187
class to create a taskflow object
Definition core/taskflow.hpp:73
bool empty() const
queries the emptiness of the taskflow
Definition core/taskflow.hpp:328
T hardware_concurrency(T... args)
T make_optional(T... args)
taskflow namespace
Definition small_vector.hpp:27