Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
executor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "observer.hpp"
4#include "taskflow.hpp"
5
11namespace tf {
12
13// ----------------------------------------------------------------------------
14// Executor Definition
15// ----------------------------------------------------------------------------
16
50class Executor {
51
52 friend class FlowBuilder;
53 friend class Subflow;
54 friend class Runtime;
55
56 public:
57
67 explicit Executor(size_t N = std::thread::hardware_concurrency());
68
76 ~Executor();
77
100 tf::Future<void> run(Taskflow& taskflow);
101
121 tf::Future<void> run(Taskflow&& taskflow);
122
148 template<typename C>
149 tf::Future<void> run(Taskflow& taskflow, C&& callable);
150
175 template<typename C>
176 tf::Future<void> run(Taskflow&& taskflow, C&& callable);
177
201 tf::Future<void> run_n(Taskflow& taskflow, size_t N);
202
225 tf::Future<void> run_n(Taskflow&& taskflow, size_t N);
226
256 template<typename C>
257 tf::Future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
258
284 template<typename C>
285 tf::Future<void> run_n(Taskflow&& taskflow, size_t N, C&& callable);
286
314 template<typename P>
315 tf::Future<void> run_until(Taskflow& taskflow, P&& pred);
316
342 template<typename P>
343 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred);
344
375 template<typename P, typename C>
376 tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
377
406 template<typename P, typename C>
407 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred, C&& callable);
408
422 void wait_for_all();
423
435 size_t num_workers() const noexcept;
436
450 size_t num_topologies() const;
451
460 size_t num_taskflows() const;
461
479 int this_worker_id() const;
480
508 template <typename F, typename... ArgsT>
509 auto async(F&& f, ArgsT&&... args);
510
541 template <typename F, typename... ArgsT>
542 auto named_async(const std::string& name, F&& f, ArgsT&&... args);
543
558 template <typename F, typename... ArgsT>
559 void silent_async(F&& f, ArgsT&&... args);
560
575 template <typename F, typename... ArgsT>
576 void named_silent_async(const std::string& name, F&& f, ArgsT&&... args);
577
595 template <typename Observer, typename... ArgsT>
597
603 template <typename Observer>
605
609 size_t num_observers() const noexcept;
610
611 private:
612
613 std::condition_variable _topology_cv;
614 std::mutex _taskflow_mutex;
615 std::mutex _topology_mutex;
616 std::mutex _wsq_mutex;
617
618 size_t _num_topologies {0};
619
621 std::vector<Worker> _workers;
623 std::list<Taskflow> _taskflows;
624
625 Notifier _notifier;
626
627 TaskQueue<Node*> _wsq;
628
629 std::atomic<size_t> _num_actives {0};
630 std::atomic<size_t> _num_thieves {0};
631 std::atomic<bool> _done {0};
632
634
635 Worker* _this_worker();
636
637 bool _wait_for_task(Worker&, Node*&);
638
639 void _observer_prologue(Worker&, Node*);
640 void _observer_epilogue(Worker&, Node*);
641 void _spawn(size_t);
642 void _worker_loop(Worker&);
643 void _exploit_task(Worker&, Node*&);
644 void _explore_task(Worker&, Node*&);
645 void _consume_task(Worker&, Node*);
646 void _schedule(Worker&, Node*);
647 void _schedule(Node*);
648 void _schedule(Worker&, const SmallVector<Node*>&);
649 void _schedule(const SmallVector<Node*>&);
650 void _set_up_topology(Worker*, Topology*);
651 void _tear_down_topology(Worker&, Topology*);
652 void _tear_down_async(Node*);
653 void _tear_down_invoke(Worker&, Node*);
654 void _cancel_invoke(Worker&, Node*);
655 void _increment_topology();
656 void _decrement_topology();
657 void _decrement_topology_and_notify();
658 void _invoke(Worker&, Node*);
659 void _invoke_static_task(Worker&, Node*);
660 void _invoke_dynamic_task(Worker&, Node*);
661 void _join_dynamic_task_external(Worker&, Node*, Graph&);
662 void _join_dynamic_task_internal(Worker&, Node*, Graph&);
663 void _detach_dynamic_task(Worker&, Node*, Graph&);
664 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
665 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
666 void _invoke_module_task(Worker&, Node*);
667 void _invoke_async_task(Worker&, Node*);
668 void _invoke_silent_async_task(Worker&, Node*);
669 void _invoke_cudaflow_task(Worker&, Node*);
670 void _invoke_syclflow_task(Worker&, Node*);
671 void _invoke_runtime_task(Worker&, Node*);
672
673 template <typename C,
675 >
676 void _invoke_cudaflow_task_entry(Node*, C&&);
677
678 template <typename C, typename Q,
680 >
681 void _invoke_syclflow_task_entry(Node*, C&&, Q&);
682};
683
684// Constructor
685inline Executor::Executor(size_t N) :
686 _workers {N},
687 _notifier {N} {
688
689 if(N == 0) {
690 TF_THROW("no cpu workers to execute taskflows");
691 }
692
693 _spawn(N);
694
695 // instantite the default observer if requested
696 if(has_env(TF_ENABLE_PROFILER)) {
697 TFProfManager::get()._manage(make_observer<TFProfObserver>());
698 }
699}
700
701// Destructor
703
704 // wait for all topologies to complete
705 wait_for_all();
706
707 // shut down the scheduler
708 _done = true;
709
710 _notifier.notify(true);
711
712 for(auto& t : _threads){
713 t.join();
714 }
715}
716
717// Function: num_workers
718inline size_t Executor::num_workers() const noexcept {
719 return _workers.size();
720}
721
722// Function: num_topologies
723inline size_t Executor::num_topologies() const {
724 return _num_topologies;
725}
726
727// Function: num_taskflows
728inline size_t Executor::num_taskflows() const {
729 return _taskflows.size();
730}
731
732// Function: _this_worker
733inline Worker* Executor::_this_worker() {
734 auto itr = _wids.find(std::this_thread::get_id());
735 return itr == _wids.end() ? nullptr : &_workers[itr->second];
736}
737
738// Function: named_async
739template <typename F, typename... ArgsT>
740auto Executor::named_async(const std::string& name, F&& f, ArgsT&&... args) {
741
742 _increment_topology();
743
744 using T = std::invoke_result_t<F, ArgsT...>;
746
748
750
751 Future<R> fu(p.get_future(), tpg);
752
753 auto node = node_pool.animate(
755 [p=make_moc(std::move(p)), f=std::forward<F>(f), args...]
756 (bool cancel) mutable {
757 if constexpr(std::is_same_v<R, void>) {
758 if(!cancel) {
759 f(args...);
760 }
761 p.object.set_value();
762 }
763 else {
764 p.object.set_value(cancel ? std::nullopt : std::make_optional(f(args...)));
765 }
766 },
767 std::move(tpg)
768 );
769
770 node->_name = name;
771
772 if(auto w = _this_worker(); w) {
773 _schedule(*w, node);
774 }
775 else{
776 _schedule(node);
777 }
778
779 return fu;
780}
781
782// Function: async
783template <typename F, typename... ArgsT>
784auto Executor::async(F&& f, ArgsT&&... args) {
785 return named_async("", std::forward<F>(f), std::forward<ArgsT>(args)...);
786}
787
788// Function: named_silent_async
789template <typename F, typename... ArgsT>
791 const std::string& name, F&& f, ArgsT&&... args
792) {
793
794 _increment_topology();
795
796 Node* node = node_pool.animate(
798 [f=std::forward<F>(f), args...] () mutable {
799 f(args...);
800 }
801 );
802
803 node->_name = name;
804
805 if(auto w = _this_worker(); w) {
806 _schedule(*w, node);
807 }
808 else {
809 _schedule(node);
810 }
811}
812
813// Function: silent_async
814template <typename F, typename... ArgsT>
815void Executor::silent_async(F&& f, ArgsT&&... args) {
817}
818
819// Function: this_worker_id
820inline int Executor::this_worker_id() const {
821 auto i = _wids.find(std::this_thread::get_id());
822 return i == _wids.end() ? -1 : static_cast<int>(_workers[i->second]._id);
823}
824
825// Procedure: _spawn
826inline void Executor::_spawn(size_t N) {
827
828 std::mutex mutex;
830 size_t n=0;
831
832 for(size_t id=0; id<N; ++id) {
833
834 _workers[id]._id = id;
835 _workers[id]._vtm = id;
836 _workers[id]._executor = this;
837 _workers[id]._waiter = &_notifier._waiters[id];
838
839 _threads.emplace_back([this] (
840 Worker& w, std::mutex& mutex, std::condition_variable& cond, size_t& n
841 ) -> void {
842
843 // enables the mapping
844 {
845 std::scoped_lock lock(mutex);
846 _wids[std::this_thread::get_id()] = w._id;
847 if(n++; n == num_workers()) {
848 cond.notify_one();
849 }
850 }
851
852 //this_worker().worker = &w;
853
854 Node* t = nullptr;
855
856 // must use 1 as condition instead of !done
857 while(1) {
858
859 // execute the tasks.
860 _exploit_task(w, t);
861
862 // wait for tasks
863 if(_wait_for_task(w, t) == false) {
864 break;
865 }
866 }
867
868 }, std::ref(_workers[id]), std::ref(mutex), std::ref(cond), std::ref(n));
869 }
870
872 cond.wait(lock, [&](){ return n==N; });
873}
874
875// Function: _consume_task
876inline void Executor::_consume_task(Worker& w, Node* p) {
877
878 std::uniform_int_distribution<size_t> rdvtm(0, _workers.size()-1);
879
880 while(p->_join_counter != 0) {
881
882 exploit:
883
884 if(auto t = w._wsq.pop(); t) {
885 _invoke(w, t);
886 }
887 else {
888 size_t num_steals = 0;
889 //size_t num_pauses = 0;
890 size_t max_steals = ((_workers.size() + 1) << 1);
891
892 explore:
893
894 t = (w._id == w._vtm) ? _wsq.steal() : _workers[w._vtm]._wsq.steal();
895
896 if(t) {
897 _invoke(w, t);
898 goto exploit;
899 }
900 else if(p->_join_counter != 0){
901
902 if(num_steals++ > max_steals) {
903 //(num_pauses++ < 100) ? relax_cpu() : std::this_thread::yield();
905 }
906
907 //std::this_thread::yield();
908 w._vtm = rdvtm(w._rdgen);
909 goto explore;
910 }
911 else {
912 break;
913 }
914 }
915 }
916}
917
918// Function: _explore_task
919inline void Executor::_explore_task(Worker& w, Node*& t) {
920
921 //assert(_workers[w].wsq.empty());
922 //assert(!t);
923
924 size_t num_steals = 0;
925 size_t num_yields = 0;
926 size_t max_steals = ((_workers.size() + 1) << 1);
927
928 std::uniform_int_distribution<size_t> rdvtm(0, _workers.size()-1);
929
930 do {
931 t = (w._id == w._vtm) ? _wsq.steal() : _workers[w._vtm]._wsq.steal();
932
933 if(t) {
934 break;
935 }
936
937 if(num_steals++ > max_steals) {
939 if(num_yields++ > 100) {
940 break;
941 }
942 }
943
944 w._vtm = rdvtm(w._rdgen);
945 } while(!_done);
946
947}
948
949// Procedure: _exploit_task
950inline void Executor::_exploit_task(Worker& w, Node*& t) {
951
952 if(t) {
953
954 if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
955 _notifier.notify(false);
956 }
957
958 while(t) {
959 _invoke(w, t);
960 t = w._wsq.pop();
961 }
962
963 --_num_actives;
964 }
965}
966
967// Function: _wait_for_task
968inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
969
970 wait_for_task:
971
972 //assert(!t);
973
974 ++_num_thieves;
975
976 explore_task:
977
978 _explore_task(worker, t);
979
980 if(t) {
981 if(_num_thieves.fetch_sub(1) == 1) {
982 _notifier.notify(false);
983 }
984 return true;
985 }
986
987 _notifier.prepare_wait(worker._waiter);
988
989 //if(auto vtm = _find_vtm(me); vtm != _workers.size()) {
990 if(!_wsq.empty()) {
991
992 _notifier.cancel_wait(worker._waiter);
993 //t = (vtm == me) ? _wsq.steal() : _workers[vtm].wsq.steal();
994
995 t = _wsq.steal(); // must steal here
996 if(t) {
997 if(_num_thieves.fetch_sub(1) == 1) {
998 _notifier.notify(false);
999 }
1000 return true;
1001 }
1002 else {
1003 worker._vtm = worker._id;
1004 goto explore_task;
1005 }
1006 }
1007
1008 if(_done) {
1009 _notifier.cancel_wait(worker._waiter);
1010 _notifier.notify(true);
1011 --_num_thieves;
1012 return false;
1013 }
1014
1015 if(_num_thieves.fetch_sub(1) == 1) {
1016 if(_num_actives) {
1017 _notifier.cancel_wait(worker._waiter);
1018 goto wait_for_task;
1019 }
1020 // check all queues again
1021 for(auto& w : _workers) {
1022 if(!w._wsq.empty()) {
1023 worker._vtm = w._id;
1024 _notifier.cancel_wait(worker._waiter);
1025 goto wait_for_task;
1026 }
1027 }
1028 }
1029
1030 // Now I really need to relinguish my self to others
1031 _notifier.commit_wait(worker._waiter);
1032
1033 return true;
1034}
1035
1036// Function: make_observer
1037template<typename Observer, typename... ArgsT>
1039
1040 static_assert(
1042 "Observer must be derived from ObserverInterface"
1043 );
1044
1045 // use a local variable to mimic the constructor
1047
1048 ptr->set_up(_workers.size());
1049
1050 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1051
1052 return ptr;
1053}
1054
1055// Procedure: remove_observer
1056template <typename Observer>
1058
1059 static_assert(
1061 "Observer must be derived from ObserverInterface"
1062 );
1063
1064 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1065}
1066
1067// Function: num_observers
1068inline size_t Executor::num_observers() const noexcept {
1069 return _observers.size();
1070}
1071
1072// Procedure: _schedule
1073inline void Executor::_schedule(Worker& worker, Node* node) {
1074
1075 node->_state.fetch_or(Node::READY, std::memory_order_release);
1076
1077 // caller is a worker to this pool
1078 if(worker._executor == this) {
1079 worker._wsq.push(node);
1080 return;
1081 }
1082
1083 {
1084 std::lock_guard<std::mutex> lock(_wsq_mutex);
1085 _wsq.push(node);
1086 }
1087
1088 _notifier.notify(false);
1089}
1090
1091// Procedure: _schedule
1092inline void Executor::_schedule(Node* node) {
1093
1094 node->_state.fetch_or(Node::READY, std::memory_order_release);
1095
1096 {
1098 _wsq.push(node);
1099 }
1100
1101 _notifier.notify(false);
1102}
1103
1104// Procedure: _schedule
1105inline void Executor::_schedule(
1106 Worker& worker, const SmallVector<Node*>& nodes
1107) {
1108
1109 // We need to cacth the node count to avoid accessing the nodes
1110 // vector while the parent topology is removed!
1111 const auto num_nodes = nodes.size();
1112
1113 if(num_nodes == 0) {
1114 return;
1115 }
1116
1117 // make the node ready
1118 for(size_t i=0; i<num_nodes; ++i) {
1119 nodes[i]->_state.fetch_or(Node::READY, std::memory_order_release);
1120 }
1121
1122 if(worker._executor == this) {
1123 for(size_t i=0; i<num_nodes; ++i) {
1124 worker._wsq.push(nodes[i]);
1125 }
1126 return;
1127 }
1128
1129 {
1131 for(size_t k=0; k<num_nodes; ++k) {
1132 _wsq.push(nodes[k]);
1133 }
1134 }
1135
1136 _notifier.notify_n(num_nodes);
1137}
1138
1139// Procedure: _schedule
1140inline void Executor::_schedule(const SmallVector<Node*>& nodes) {
1141
1142 // parent topology may be removed!
1143 const auto num_nodes = nodes.size();
1144
1145 if(num_nodes == 0) {
1146 return;
1147 }
1148
1149 // make the node ready
1150 for(size_t i=0; i<num_nodes; ++i) {
1151 nodes[i]->_state.fetch_or(Node::READY, std::memory_order_release);
1152 }
1153
1154 {
1156 for(size_t k=0; k<num_nodes; ++k) {
1157 _wsq.push(nodes[k]);
1158 }
1159 }
1160
1161 _notifier.notify_n(num_nodes);
1162}
1163
1164// Procedure: _invoke
1165inline void Executor::_invoke(Worker& worker, Node* node) {
1166
1167 // synchronize all outstanding memory operations caused by reordering
1168 while(!(node->_state.load(std::memory_order_acquire) & Node::READY));
1169
1170 begin_invoke:
1171
1172 // no need to do other things if the topology is cancelled
1173 if(node->_is_cancelled()) {
1174 _cancel_invoke(worker, node);
1175 return;
1176 }
1177
1178 // if acquiring semaphore(s) exists, acquire them first
1179 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1180 SmallVector<Node*> nodes;
1181 if(!node->_acquire_all(nodes)) {
1182 _schedule(worker, nodes);
1183 return;
1184 }
1185 node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
1186 }
1187
1188 // condition task
1189 //int cond = -1;
1190 SmallVector<int> conds;
1191
1192 // switch is faster than nested if-else due to jump table
1193 switch(node->_handle.index()) {
1194 // static task
1195 case Node::STATIC:{
1196 _invoke_static_task(worker, node);
1197 }
1198 break;
1199
1200 // dynamic task
1201 case Node::DYNAMIC: {
1202 _invoke_dynamic_task(worker, node);
1203 }
1204 break;
1205
1206 // condition task
1207 case Node::CONDITION: {
1208 _invoke_condition_task(worker, node, conds);
1209 }
1210 break;
1211
1212 // multi-condition task
1213 case Node::MULTI_CONDITION: {
1214 _invoke_multi_condition_task(worker, node, conds);
1215 }
1216 break;
1217
1218 // module task
1219 case Node::MODULE: {
1220 _invoke_module_task(worker, node);
1221 }
1222 break;
1223
1224 // async task
1225 case Node::ASYNC: {
1226 _invoke_async_task(worker, node);
1227 _tear_down_async(node);
1228 return ;
1229 }
1230 break;
1231
1232 // silent async task
1233 case Node::SILENT_ASYNC: {
1234 _invoke_silent_async_task(worker, node);
1235 _tear_down_async(node);
1236 return ;
1237 }
1238 break;
1239
1240 // cudaflow task
1241 case Node::CUDAFLOW: {
1242 _invoke_cudaflow_task(worker, node);
1243 }
1244 break;
1245
1246 // syclflow task
1247 case Node::SYCLFLOW: {
1248 _invoke_syclflow_task(worker, node);
1249 }
1250 break;
1251
1252 // runtime task
1253 case Node::RUNTIME: {
1254 _invoke_runtime_task(worker, node);
1255 }
1256 break;
1257
1258 // monostate (placeholder)
1259 default:
1260 break;
1261 }
1262
1263 // if releasing semaphores exist, release them
1264 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1265 _schedule(worker, node->_release_all());
1266 }
1267
1268 // We MUST recover the dependency since the graph may have cycles.
1269 // This must be done before scheduling the successors, otherwise this might cause
1270 // race condition on the _dependents
1271 if((node->_state.load(std::memory_order_relaxed) & Node::CONDITIONED)) {
1272 node->_join_counter = node->num_strong_dependents();
1273 }
1274 else {
1275 node->_join_counter = node->num_dependents();
1276 }
1277
1278 // acquire the parent flow counter
1279 auto& j = (node->_parent) ? node->_parent->_join_counter :
1280 node->_topology->_join_counter;
1281
1282 Node* cache {nullptr};
1283
1284 // At this point, the node storage might be destructed (to be verified)
1285 // case 1: non-condition task
1286 switch(node->_handle.index()) {
1287
1288 // condition and multi-condition tasks
1289 case Node::CONDITION:
1290 case Node::MULTI_CONDITION: {
1291 for(auto cond : conds) {
1292 if(cond >= 0 && static_cast<size_t>(cond) < node->_successors.size()) {
1293 auto s = node->_successors[cond];
1294 // zeroing the join counter for invariant
1295 s->_join_counter.store(0, std::memory_order_relaxed);
1296 j.fetch_add(1);
1297 if(cache) {
1298 _schedule(worker, cache);
1299 }
1300 cache = s;
1301 }
1302 }
1303 }
1304 break;
1305
1306 // non-condition task
1307 default: {
1308 for(size_t i=0; i<node->_successors.size(); ++i) {
1309 if(--(node->_successors[i]->_join_counter) == 0) {
1310 j.fetch_add(1);
1311 if(cache) {
1312 _schedule(worker, cache);
1313 }
1314 cache = node->_successors[i];
1315 }
1316 }
1317 }
1318 break;
1319 }
1320
1321 // tear_down the invoke
1322 _tear_down_invoke(worker, node);
1323
1324 // perform tail recursion elimination for the right-most child to reduce
1325 // the number of expensive pop/push operations through the task queue
1326 if(cache) {
1327 node = cache;
1328 //node->_state.fetch_or(Node::READY, std::memory_order_release);
1329 goto begin_invoke;
1330 }
1331}
1332
1333// Procedure: _tear_down_async
1334inline void Executor::_tear_down_async(Node* node) {
1335 if(node->_parent) {
1336 node->_parent->_join_counter.fetch_sub(1);
1337 }
1338 else {
1339 _decrement_topology_and_notify();
1340 }
1341 node_pool.recycle(node);
1342}
1343
1344// Proecdure: _tear_down_invoke
1345inline void Executor::_tear_down_invoke(Worker& worker, Node* node) {
1346 // we must check parent first before substracting the join counter,
1347 // or it can introduce data race
1348 if(node->_parent == nullptr) {
1349 if(node->_topology->_join_counter.fetch_sub(1) == 1) {
1350 _tear_down_topology(worker, node->_topology);
1351 }
1352 }
1353 else { // joined subflow
1354 node->_parent->_join_counter.fetch_sub(1);
1355 }
1356}
1357
1358// Procedure: _cancel_invoke
1359inline void Executor::_cancel_invoke(Worker& worker, Node* node) {
1360
1361 switch(node->_handle.index()) {
1362 // async task needs to carry out the promise
1363 case Node::ASYNC:
1364 std::get_if<Node::Async>(&(node->_handle))->work(true);
1365 _tear_down_async(node);
1366 break;
1367
1368 // silent async doesn't need to carry out the promise
1369 case Node::SILENT_ASYNC:
1370 _tear_down_async(node);
1371 break;
1372
1373 // tear down topology if the node is the last leaf
1374 default: {
1375 _tear_down_invoke(worker, node);
1376 }
1377 break;
1378 }
1379}
1380
1381// Procedure: _observer_prologue
1382inline void Executor::_observer_prologue(Worker& worker, Node* node) {
1383 for(auto& observer : _observers) {
1384 observer->on_entry(WorkerView(worker), TaskView(*node));
1385 }
1386}
1387
1388// Procedure: _observer_epilogue
1389inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
1390 for(auto& observer : _observers) {
1391 observer->on_exit(WorkerView(worker), TaskView(*node));
1392 }
1393}
1394
1395// Procedure: _invoke_static_task
1396inline void Executor::_invoke_static_task(Worker& worker, Node* node) {
1397 _observer_prologue(worker, node);
1398 std::get_if<Node::Static>(&node->_handle)->work();
1399 _observer_epilogue(worker, node);
1400}
1401
1402// Procedure: _invoke_dynamic_task
1403inline void Executor::_invoke_dynamic_task(Worker& w, Node* node) {
1404
1405 _observer_prologue(w, node);
1406
1407 auto handle = std::get_if<Node::Dynamic>(&node->_handle);
1408
1409 handle->subgraph._clear();
1410
1411 Subflow sf(*this, w, node, handle->subgraph);
1412
1413 handle->work(sf);
1414
1415 if(sf._joinable) {
1416 _join_dynamic_task_internal(w, node, handle->subgraph);
1417 }
1418
1419 _observer_epilogue(w, node);
1420}
1421
1422// Procedure: _detach_dynamic_task
1423inline void Executor::_detach_dynamic_task(
1424 Worker& w, Node* p, Graph& g
1425) {
1426
1427 // graph is empty and has no async tasks
1428 if(g.empty() && p->_join_counter == 0) {
1429 return;
1430 }
1431
1432 SmallVector<Node*> src;
1433
1434 for(auto n : g._nodes) {
1435
1436 n->_state.store(Node::DETACHED, std::memory_order_relaxed);
1437 n->_set_up_join_counter();
1438 n->_topology = p->_topology;
1439 n->_parent = nullptr;
1440
1441 if(n->num_dependents() == 0) {
1442 src.push_back(n);
1443 }
1444 }
1445
1446 {
1447 std::lock_guard<std::mutex> lock(p->_topology->_taskflow._mutex);
1448 p->_topology->_taskflow._graph._merge(std::move(g));
1449 }
1450
1451 p->_topology->_join_counter.fetch_add(src.size());
1452 _schedule(w, src);
1453}
1454
1455// Procedure: _join_dynamic_task_external
1456inline void Executor::_join_dynamic_task_external(
1457 Worker& w, Node* p, Graph& g
1458) {
1459
1460 // graph is empty and has no async tasks
1461 if(g.empty() && p->_join_counter == 0) {
1462 return;
1463 }
1464
1465 SmallVector<Node*> src;
1466
1467 for(auto n : g._nodes) {
1468
1469 n->_state.store(0, std::memory_order_relaxed);
1470 n->_set_up_join_counter();
1471 n->_topology = p->_topology;
1472 n->_parent = p;
1473
1474 if(n->num_dependents() == 0) {
1475 src.push_back(n);
1476 }
1477 }
1478 p->_join_counter.fetch_add(src.size());
1479 _schedule(w, src);
1480 _consume_task(w, p);
1481}
1482
1483// Procedure: _join_dynamic_task_internal
1484inline void Executor::_join_dynamic_task_internal(
1485 Worker& w, Node* p, Graph& g
1486) {
1487
1488 // graph is empty and has no async tasks
1489 if(g.empty() && p->_join_counter == 0) {
1490 return;
1491 }
1492
1493 SmallVector<Node*> src;
1494
1495 for(auto n : g._nodes) {
1496 n->_topology = p->_topology;
1497 n->_state.store(0, std::memory_order_relaxed);
1498 n->_set_up_join_counter();
1499 n->_parent = p;
1500 if(n->num_dependents() == 0) {
1501 src.push_back(n);
1502 }
1503 }
1504 p->_join_counter.fetch_add(src.size());
1505 _schedule(w, src);
1506 _consume_task(w, p);
1507}
1508
1509// Procedure: _invoke_condition_task
1510inline void Executor::_invoke_condition_task(
1511 Worker& worker, Node* node, SmallVector<int>& conds
1512) {
1513 _observer_prologue(worker, node);
1514 conds = { std::get_if<Node::Condition>(&node->_handle)->work() };
1515 _observer_epilogue(worker, node);
1516}
1517
1518// Procedure: _invoke_multi_condition_task
1519inline void Executor::_invoke_multi_condition_task(
1520 Worker& worker, Node* node, SmallVector<int>& conds
1521) {
1522 _observer_prologue(worker, node);
1523 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
1524 _observer_epilogue(worker, node);
1525}
1526
1527// Procedure: _invoke_cudaflow_task
1528inline void Executor::_invoke_cudaflow_task(Worker& worker, Node* node) {
1529 _observer_prologue(worker, node);
1530 std::get_if<Node::cudaFlow>(&node->_handle)->work(*this, node);
1531 _observer_epilogue(worker, node);
1532}
1533
1534// Procedure: _invoke_syclflow_task
1535inline void Executor::_invoke_syclflow_task(Worker& worker, Node* node) {
1536 _observer_prologue(worker, node);
1537 std::get_if<Node::syclFlow>(&node->_handle)->work(*this, node);
1538 _observer_epilogue(worker, node);
1539}
1540
1541// Procedure: _invoke_module_task
1542inline void Executor::_invoke_module_task(Worker& w, Node* node) {
1543 _observer_prologue(w, node);
1544 _join_dynamic_task_internal(
1545 w, node, std::get_if<Node::Module>(&node->_handle)->graph
1546 );
1547 _observer_epilogue(w, node);
1548}
1549
1550// Procedure: _invoke_async_task
1551inline void Executor::_invoke_async_task(Worker& w, Node* node) {
1552 _observer_prologue(w, node);
1553 std::get_if<Node::Async>(&node->_handle)->work(false);
1554 _observer_epilogue(w, node);
1555}
1556
1557// Procedure: _invoke_silent_async_task
1558inline void Executor::_invoke_silent_async_task(Worker& w, Node* node) {
1559 _observer_prologue(w, node);
1560 std::get_if<Node::SilentAsync>(&node->_handle)->work();
1561 _observer_epilogue(w, node);
1562}
1563
1564// Procedure: _invoke_runtime_task
1565inline void Executor::_invoke_runtime_task(Worker& w, Node* node) {
1566 _observer_prologue(w, node);
1567 Runtime rt(*this, w, node);
1568 std::get_if<Node::Runtime>(&node->_handle)->work(rt);
1569 _observer_epilogue(w, node);
1570}
1571
1572// Function: run
1574 return run_n(f, 1, [](){});
1575}
1576
1577// Function: run
1579 return run_n(std::move(f), 1, [](){});
1580}
1581
1582// Function: run
1583template <typename C>
1585 return run_n(f, 1, std::forward<C>(c));
1586}
1587
1588// Function: run
1589template <typename C>
1591 return run_n(std::move(f), 1, std::forward<C>(c));
1592}
1593
1594// Function: run_n
1595inline tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat) {
1596 return run_n(f, repeat, [](){});
1597}
1598
1599// Function: run_n
1600inline tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat) {
1601 return run_n(std::move(f), repeat, [](){});
1602}
1603
1604// Function: run_n
1605template <typename C>
1606tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
1607 return run_until(
1608 f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
1609 );
1610}
1611
1612// Function: run_n
1613template <typename C>
1614tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat, C&& c) {
1615 return run_until(
1616 std::move(f), [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
1617 );
1618}
1619
1620// Function: run_until
1621template<typename P>
1623 return run_until(f, std::forward<P>(pred), [](){});
1624}
1625
1626// Function: run_until
1627template<typename P>
1629 return run_until(std::move(f), std::forward<P>(pred), [](){});
1630}
1631
1632// Function: run_until
1633template <typename P, typename C>
1635
1636 _increment_topology();
1637
1638 // Need to check the empty under the lock since dynamic task may
1639 // define detached blocks that modify the taskflow at the same time
1640 bool empty;
1641 {
1642 std::lock_guard<std::mutex> lock(f._mutex);
1643 empty = f.empty();
1644 }
1645
1646 // No need to create a real topology but returns an dummy future
1647 if(empty || p()) {
1648 c();
1649 std::promise<void> promise;
1650 promise.set_value();
1651 _decrement_topology_and_notify();
1652 return tf::Future<void>(promise.get_future(), std::monostate{});
1653 }
1654
1655 // create a topology for this run
1657
1658 // need to create future before the topology got torn down quickly
1659 tf::Future<void> future(t->_promise.get_future(), t);
1660
1661 // modifying topology needs to be protected under the lock
1662 {
1663 std::lock_guard<std::mutex> lock(f._mutex);
1664 f._topologies.push(t);
1665 if(f._topologies.size() == 1) {
1666 _set_up_topology(_this_worker(), t.get());
1667 }
1668 }
1669
1670 return future;
1671}
1672
1673// Function: run_until
1674template <typename P, typename C>
1676
1678
1679 {
1680 std::scoped_lock<std::mutex> lock(_taskflow_mutex);
1681 itr = _taskflows.emplace(_taskflows.end(), std::move(f));
1682 itr->_satellite = itr;
1683 }
1684
1685 return run_until(*itr, std::forward<P>(pred), std::forward<C>(c));
1686}
1687
1688// Procedure: _increment_topology
1689inline void Executor::_increment_topology() {
1690 std::lock_guard<std::mutex> lock(_topology_mutex);
1691 ++_num_topologies;
1692}
1693
1694// Procedure: _decrement_topology_and_notify
1695inline void Executor::_decrement_topology_and_notify() {
1696 std::lock_guard<std::mutex> lock(_topology_mutex);
1697 if(--_num_topologies == 0) {
1698 _topology_cv.notify_all();
1699 }
1700}
1701
1702// Procedure: _decrement_topology
1703inline void Executor::_decrement_topology() {
1704 std::lock_guard<std::mutex> lock(_topology_mutex);
1705 --_num_topologies;
1706}
1707
1708// Procedure: wait_for_all
1710 std::unique_lock<std::mutex> lock(_topology_mutex);
1711 _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1712}
1713
1714// Function: _set_up_topology
1715inline void Executor::_set_up_topology(Worker* worker, Topology* tpg) {
1716
1717 // ---- under taskflow lock ----
1718
1719 tpg->_sources.clear();
1720 tpg->_taskflow._graph._clear_detached();
1721
1722 // scan each node in the graph and build up the links
1723 for(auto node : tpg->_taskflow._graph._nodes) {
1724
1725 node->_topology = tpg;
1726 node->_state.store(0, std::memory_order_relaxed);
1727
1728 if(node->num_dependents() == 0) {
1729 tpg->_sources.push_back(node);
1730 }
1731
1732 node->_set_up_join_counter();
1733 }
1734
1735 tpg->_join_counter = tpg->_sources.size();
1736
1737 if(worker) {
1738 _schedule(*worker, tpg->_sources);
1739 }
1740 else {
1741 _schedule(tpg->_sources);
1742 }
1743}
1744
1745// Function: _tear_down_topology
1746inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
1747
1748 auto &f = tpg->_taskflow;
1749
1750 //assert(&tpg == &(f._topologies.front()));
1751
1752 // case 1: we still need to run the topology again
1753 if(!tpg->_is_cancelled && !tpg->_pred()) {
1754 //assert(tpg->_join_counter == 0);
1756 tpg->_join_counter = tpg->_sources.size();
1757 _schedule(worker, tpg->_sources);
1758 }
1759 // case 2: the final run of this topology
1760 else {
1761
1762 // TODO: if the topology is cancelled, need to release all semaphores
1763
1764 if(tpg->_call != nullptr) {
1765 tpg->_call();
1766 }
1767
1768 // If there is another run (interleave between lock)
1769 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
1770 //assert(tpg->_join_counter == 0);
1771
1772 // Set the promise
1773 tpg->_promise.set_value();
1774 f._topologies.pop();
1775 tpg = f._topologies.front().get();
1776
1777 // decrement the topology but since this is not the last we don't notify
1778 _decrement_topology();
1779
1780 // set up topology needs to be under the lock or it can
1781 // introduce memory order error with pop
1782 _set_up_topology(&worker, tpg);
1783 }
1784 else {
1785 //assert(f._topologies.size() == 1);
1786
1787 // Need to back up the promise first here becuz taskflow might be
1788 // destroy soon after calling get
1789 auto p {std::move(tpg->_promise)};
1790
1791 // Back up lambda capture in case it has the topology pointer,
1792 // to avoid it releasing on pop_front ahead of _mutex.unlock &
1793 // _promise.set_value. Released safely when leaving scope.
1794 auto c {std::move(tpg->_call)};
1795
1796 // Get the satellite if any
1797 auto s {f._satellite};
1798
1799 // Now we remove the topology from this taskflow
1800 f._topologies.pop();
1801
1802 //f._mutex.unlock();
1803 lock.unlock();
1804
1805 // We set the promise in the end in case taskflow leaves the scope.
1806 // After set_value, the caller will return from wait
1807 p.set_value();
1808
1809 _decrement_topology_and_notify();
1810
1811 // remove the taskflow if it is managed by the executor
1812 // TODO: in the future, we may need to synchronize on wait
1813 // (which means the following code should the moved before set_value)
1814 if(s) {
1815 std::scoped_lock<std::mutex> lock(_taskflow_mutex);
1816 _taskflows.erase(*s);
1817 }
1818 }
1819 }
1820}
1821
1822// ############################################################################
1823// Forward Declaration: Subflow
1824// ############################################################################
1825
1826inline void Subflow::join() {
1827
1828 // assert(this_worker().worker == &_worker);
1829
1830 if(!_joinable) {
1831 TF_THROW("subflow not joinable");
1832 }
1833
1834 // only the parent worker can join the subflow
1835 _executor._join_dynamic_task_external(_worker, _parent, _graph);
1836 _joinable = false;
1837}
1838
1839inline void Subflow::detach() {
1840
1841 // assert(this_worker().worker == &_worker);
1842
1843 if(!_joinable) {
1844 TF_THROW("subflow already joined or detached");
1845 }
1846
1847 // only the parent worker can detach the subflow
1848 _executor._detach_dynamic_task(_worker, _parent, _graph);
1849 _joinable = false;
1850}
1851
1852// Function: named_async
1853template <typename F, typename... ArgsT>
1854auto Subflow::named_async(const std::string& name, F&& f, ArgsT&&... args) {
1855 return _named_async(
1856 *_executor._this_worker(), name, std::forward<F>(f), std::forward<ArgsT>(args)...
1857 );
1858}
1859
1860// Function: _named_async
1861template <typename F, typename... ArgsT>
1862auto Subflow::_named_async(
1863 Worker& w,
1864 const std::string& name,
1865 F&& f,
1866 ArgsT&&... args
1867) {
1868
1869 _parent->_join_counter.fetch_add(1);
1870
1871 using T = std::invoke_result_t<F, ArgsT...>;
1873
1875
1877
1878 Future<R> fu(p.get_future(), tpg);
1879
1880 auto node = node_pool.animate(
1882 [p=make_moc(std::move(p)), f=std::forward<F>(f), args...]
1883 (bool cancel) mutable {
1884 if constexpr(std::is_same_v<R, void>) {
1885 if(!cancel) {
1886 f(args...);
1887 }
1888 p.object.set_value();
1889 }
1890 else {
1891 p.object.set_value(cancel ? std::nullopt : std::make_optional(f(args...)));
1892 }
1893 },
1894 std::move(tpg)
1895 );
1896
1897 node->_name = name;
1898 node->_topology = _parent->_topology;
1899 node->_parent = _parent;
1900
1901 _executor._schedule(w, node);
1902
1903 return fu;
1904}
1905
1906// Function: async
1907template <typename F, typename... ArgsT>
1908auto Subflow::async(F&& f, ArgsT&&... args) {
1909 return named_async("", std::forward<F>(f), std::forward<ArgsT>(args)...);
1910}
1911
1912// Function: _named_silent_async
1913template <typename F, typename... ArgsT>
1914void Subflow::_named_silent_async(
1915 Worker& w, const std::string& name, F&& f, ArgsT&&... args
1916) {
1917
1918 _parent->_join_counter.fetch_add(1);
1919
1920 auto node = node_pool.animate(
1922 [f=std::forward<F>(f), args...] () mutable {
1923 f(args...);
1924 }
1925 );
1926
1927 node->_name = name;
1928 node->_topology = _parent->_topology;
1929 node->_parent = _parent;
1930
1931 _executor._schedule(w, node);
1932}
1933
1934// Function: silent_async
1935template <typename F, typename... ArgsT>
1936void Subflow::named_silent_async(const std::string& name, F&& f, ArgsT&&... args) {
1937 _named_silent_async(
1938 *_executor._this_worker(), name, std::forward<F>(f), std::forward<ArgsT>(args)...
1939 );
1940}
1941
1942// Function: named_silent_async
1943template <typename F, typename... ArgsT>
1944void Subflow::silent_async(F&& f, ArgsT&&... args) {
1946}
1947
1948// ############################################################################
1949// Forward Declaration: Runtime
1950// ############################################################################
1951
1952// Procedure: schedule
1953inline void Runtime::schedule(Task task) {
1954 auto node = task._node;
1955 auto& j = node->_parent ? node->_parent->_join_counter :
1956 node->_topology->_join_counter;
1957 j.fetch_add(1);
1958 _executor._schedule(_worker, node);
1959}
1960
1961// Procedure: run
1962template <typename C>
1963void Runtime::run(C&& callable) {
1964
1965 // dynamic task (subflow)
1966 if constexpr(is_dynamic_task_v<C>) {
1967 Graph graph;
1968 Subflow sf(_executor, _worker, _parent, graph);
1969 callable(sf);
1970 if(sf._joinable) {
1971 _executor._join_dynamic_task_internal(_worker, _parent, graph);
1972 }
1973 }
1974 else {
1975 static_assert(dependent_false_v<C>, "unsupported task callable to run");
1976 }
1977}
1978
1979} // end of namespace tf -----------------------------------------------------
1980
1981
1982
1983
1984
1985
1986
1987
class to create an executor for running a taskflow graph
Definition executor.hpp:50
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
Definition executor.hpp:1622
auto async(F &&f, ArgsT &&... args)
runs a given function asynchronously
Definition executor.hpp:784
void named_silent_async(const std::string &name, F &&f, ArgsT &&... args)
similar to tf::Executor::named_async but does not return a future object
Definition executor.hpp:790
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
Definition executor.hpp:1057
Executor(size_t N=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition executor.hpp:685
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Definition executor.hpp:1573
auto named_async(const std::string &name, F &&f, ArgsT &&... args)
runs a given function asynchronously and gives a name to this task
Definition executor.hpp:740
~Executor()
destructs the executor
Definition executor.hpp:702
size_t num_taskflows() const
queries the number of running taskflows with moved ownership
Definition executor.hpp:728
int this_worker_id() const
queries the id of the caller thread in this executor
Definition executor.hpp:820
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
Definition executor.hpp:1595
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition executor.hpp:723
size_t num_workers() const noexcept
queries the number of worker threads
Definition executor.hpp:718
void wait_for_all()
wait for all tasks to complete
Definition executor.hpp:1709
void silent_async(F &&f, ArgsT &&... args)
similar to tf::Executor::async but does not return a future object
Definition executor.hpp:815
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
Definition executor.hpp:1038
size_t num_observers() const noexcept
queries the number of observers
Definition executor.hpp:1068
class to build a task dependency graph
Definition flow_builder.hpp:21
Graph & _graph
associated graph object
Definition flow_builder.hpp:727
class to access the result of an execution
Definition core/taskflow.hpp:571
class to create a graph object
Definition graph.hpp:56
class to create a runtime object used by a runtime task
Definition graph.hpp:150
void run(C &&)
runs a task callable synchronously
Definition executor.hpp:1963
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition executor.hpp:1953
class to define a vector optimized for small array
Definition small_vector.hpp:918
class to construct a subflow graph from the execution of a dynamic task
Definition flow_builder.hpp:889
void named_silent_async(const std::string &name, F &&f, ArgsT &&... args)
similar to tf::Subflow::named_async but does not return a future object
Definition executor.hpp:1936
void join()
enables the subflow to join its parent task
Definition executor.hpp:1826
auto async(F &&f, ArgsT &&... args)
runs a given function asynchronously
Definition executor.hpp:1908
void detach()
enables the subflow to detach from its parent task
Definition executor.hpp:1839
auto named_async(const std::string &name, F &&f, ArgsT &&... args)
runs the given function asynchronously and assigns the task a name
Definition executor.hpp:1854
void silent_async(F &&f, ArgsT &&... args)
similar to tf::Subflow::async but does not return a future object
Definition executor.hpp:1944
class to create a task handle over a node in a taskflow graph
Definition task.hpp:187
class to create a taskflow object
Definition core/taskflow.hpp:73
bool empty() const
queries the emptiness of the taskflow
Definition core/taskflow.hpp:328
T forward(T... args)
T get_id(T... args)
T hardware_concurrency(T... args)
T lock(T... args)
T make_optional(T... args)
T move(T... args)
taskflow namespace
Definition small_vector.hpp:27
observer include file
T ref(T... args)
T yield(T... args)