Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
pipeline.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "../taskflow.hpp"
4
10namespace tf {
11
12// ----------------------------------------------------------------------------
13// Class Definition: Pipeflow
14// ----------------------------------------------------------------------------
15
42class Pipeflow {
43
44 template <typename... Ps>
45 friend class Pipeline;
46
47 template <typename P>
48 friend class ScalablePipeline;
49
50 public:
51
55 Pipeflow() = default;
56
60 size_t line() const {
61 return _line;
62 }
63
67 size_t pipe() const {
68 return _pipe;
69 }
70
74 size_t token() const {
75 return _token;
76 }
77
84 void stop() {
85 _stop = true;
86 }
87
88 private:
89
90 size_t _line;
91 size_t _pipe;
92 size_t _token;
93 bool _stop;
94};
95
96// ----------------------------------------------------------------------------
97// Class Definition: PipeType
98// ----------------------------------------------------------------------------
99
105enum class PipeType : int {
107 PARALLEL = 1,
109 SERIAL = 2
110};
111
112// ----------------------------------------------------------------------------
113// Class Definition: Pipe
114// ----------------------------------------------------------------------------
115
135template <typename C = std::function<void(tf::Pipeflow&)>>
136class Pipe {
137
138 template <typename... Ps>
139 friend class Pipeline;
140
141 template <typename P>
142 friend class ScalablePipeline;
143
144 public:
145
149 using callable_t = C;
150
154 Pipe() = default;
155
174 _type{d}, _callable{std::forward<C>(callable)} {
175 }
176
182 PipeType type() const {
183 return _type;
184 }
185
192 _type = type;
193 }
194
203 template <typename U>
204 void callable(U&& callable) {
205 _callable = std::forward<U>(callable);
206 }
207
208 private:
209
210 PipeType _type;
211
212 C _callable;
213};
214
215// ----------------------------------------------------------------------------
216// Class Definition: Pipeline
217// ----------------------------------------------------------------------------
218
311template <typename... Ps>
312class Pipeline {
313
314 static_assert(sizeof...(Ps)>0, "must have at least one pipe");
315
319 struct Line {
320 std::atomic<size_t> join_counter;
321 };
322
326 struct PipeMeta {
327 PipeType type;
328 };
329
330 public:
331
343 Pipeline(size_t num_lines, Ps&&... ps);
344
357
366 size_t num_lines() const noexcept;
367
374 constexpr size_t num_pipes() const noexcept;
375
383 void reset();
384
391 size_t num_tokens() const noexcept;
392
399 Graph& graph();
400
401 private:
402
403 Graph _graph;
404
405 size_t _num_tokens;
406
407 std::tuple<Ps...> _pipes;
408 std::array<PipeMeta, sizeof...(Ps)> _meta;
409 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
410 std::vector<Task> _tasks;
411 std::vector<Pipeflow> _pipeflows;
412
413 template <size_t... I>
415
416 void _on_pipe(Pipeflow&, Runtime&);
417 void _build();
418};
419
420// constructor
421template <typename... Ps>
422Pipeline<Ps...>::Pipeline(size_t num_lines, Ps&&... ps) :
423 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
424 _meta {PipeMeta{ps.type()}...},
425 _lines (num_lines),
426 _tasks (num_lines + 1),
427 _pipeflows (num_lines) {
428
429 if(num_lines == 0) {
430 TF_THROW("must have at least one line");
431 }
432
433 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
434 TF_THROW("first pipe must be serial");
435 }
436
437 reset();
438 _build();
439}
440
441// constructor
442template <typename... Ps>
444 _pipes {std::forward<std::tuple<Ps...>>(ps)},
445 _meta {_gen_meta(
446 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
447 )},
448 _lines (num_lines),
449 _tasks (num_lines + 1),
450 _pipeflows (num_lines) {
451
452 if(num_lines == 0) {
453 TF_THROW("must have at least one line");
454 }
455
456 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
457 TF_THROW("first pipe must be serial");
458 }
459
460 reset();
461 _build();
462}
463
464// Function: _get_meta
465template <typename... Ps>
466template <size_t... I>
467auto Pipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
468 return std::array{PipeMeta{std::get<I>(ps).type()}...};
469}
470
471// Function: num_lines
472template <typename... Ps>
473size_t Pipeline<Ps...>::num_lines() const noexcept {
474 return _pipeflows.size();
475}
476
477// Function: num_pipes
478template <typename... Ps>
479constexpr size_t Pipeline<Ps...>::num_pipes() const noexcept {
480 return sizeof...(Ps);
481}
482
483// Function: num_tokens
484template <typename... Ps>
485size_t Pipeline<Ps...>::num_tokens() const noexcept {
486 return _num_tokens;
487}
488
489// Function: graph
490template <typename... Ps>
492 return _graph;
493}
494
495// Function: reset
496template <typename... Ps>
498
499 _num_tokens = 0;
500
501 for(size_t l = 0; l<num_lines(); l++) {
502 _pipeflows[l]._pipe = 0;
503 _pipeflows[l]._line = l;
504 }
505
506 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
507
508 for(size_t l=1; l<num_lines(); l++) {
509 for(size_t f=1; f<num_pipes(); f++) {
510 _lines[l][f].join_counter.store(
511 static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
512 );
513 }
514 }
515
516 for(size_t f=1; f<num_pipes(); f++) {
517 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
518 }
519
520 for(size_t l=1; l<num_lines(); l++) {
521 _lines[l][0].join_counter.store(
522 static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
523 );
524 }
525}
526
527// Procedure: _on_pipe
528template <typename... Ps>
530 visit_tuple([&](auto&& pipe){
531 pipe._callable(pf);
532 }, _pipes, pf._pipe);
533}
534
535// Procedure: _build
536template <typename... Ps>
537void Pipeline<Ps...>::_build() {
538
539 using namespace std::literals::string_literals;
540
541 FlowBuilder fb(_graph);
542
543 // init task
544 _tasks[0] = fb.emplace([this]() {
545 return static_cast<int>(_num_tokens % num_lines());
546 }).name("cond");
547
548 // line task
549 for(size_t l = 0; l < num_lines(); l++) {
550
551 _tasks[l + 1] = fb.emplace([this, l] (tf::Runtime& rt) mutable {
552
553 auto pf = &_pipeflows[l];
554
555 pipeline:
556
557 _lines[pf->_line][pf->_pipe].join_counter.store(
558 static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
559 );
560
561 if (pf->_pipe == 0) {
562 pf->_token = _num_tokens;
563 if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
564 // here, the pipeline is not stopped yet because other
565 // lines of tasks may still be running their last stages
566 return;
567 }
568 ++_num_tokens;
569 }
570 else {
571 _on_pipe(*pf, rt);
572 }
573
574 size_t c_f = pf->_pipe;
575 size_t n_f = (pf->_pipe + 1) % num_pipes();
576 size_t n_l = (pf->_line + 1) % num_lines();
577
578 pf->_pipe = n_f;
579
580 // ---- scheduling starts here ----
581 // Notice that the shared variable f must not be changed after this
582 // point because it can result in data race due to the following
583 // condition:
584 //
585 // a -> b
586 // | |
587 // v v
588 // c -> d
589 //
590 // d will be spawned by either c or b, so if c changes f but b spawns d
591 // then data race on f will happen
592
593 std::array<int, 2> retval;
594 size_t n = 0;
595
596 // downward dependency
597 if(_meta[c_f].type == PipeType::SERIAL &&
598 _lines[n_l][c_f].join_counter.fetch_sub(
600 ) {
601 retval[n++] = 1;
602 }
603
604 // forward dependency
605 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
607 ) {
608 retval[n++] = 0;
609 }
610
611 // notice that the task index starts from 1
612 switch(n) {
613 case 2: {
614 rt.schedule(_tasks[n_l+1]);
615 goto pipeline;
616 }
617 case 1: {
618 if (retval[0] == 1) {
619 pf = &_pipeflows[n_l];
620 }
621 goto pipeline;
622 }
623 }
624 }).name("rt-"s + std::to_string(l));
625
626 _tasks[0].precede(_tasks[l+1]);
627 }
628}
629
630// ----------------------------------------------------------------------------
631// Class Definition: ScalablePipeline
632// ----------------------------------------------------------------------------
633
766template <typename P>
768
772 struct Line {
773 std::atomic<size_t> join_counter;
774 };
775
776 public:
777
782
786 ScalablePipeline() = default;
787
797 ScalablePipeline(size_t num_lines);
798
815 ScalablePipeline(size_t num_lines, P first, P last);
816
821
831
835 ScalablePipeline& operator = (const ScalablePipeline&) = delete;
836
845 ScalablePipeline& operator = (ScalablePipeline&& rhs);
846
855 size_t num_lines() const noexcept;
856
863 size_t num_pipes() const noexcept;
864
871 void reset();
872
888 void reset(P first, P last);
889
907 void reset(size_t num_lines, P first, P last);
908
915 size_t num_tokens() const noexcept;
916
923 Graph& graph();
924
925 private:
926
927 Graph _graph;
928
929 size_t _num_tokens{0};
930
931 std::vector<P> _pipes;
932 std::vector<Task> _tasks;
933 std::vector<Pipeflow> _pipeflows;
935
936 void _on_pipe(Pipeflow&, Runtime&);
937 void _build();
938
939 Line& _line(size_t, size_t);
940};
941
942// constructor
943template <typename P>
945 _tasks (num_lines + 1),
946 _pipeflows (num_lines) {
947
948 if(num_lines == 0) {
949 TF_THROW("must have at least one line");
950 }
951
952 _build();
953}
954
955// constructor
956template <typename P>
957ScalablePipeline<P>::ScalablePipeline(size_t num_lines, P first, P last) :
958 _tasks (num_lines + 1),
959 _pipeflows (num_lines) {
960
961 if(num_lines == 0) {
962 TF_THROW("must have at least one line");
963 }
964
965 reset(first, last);
966 _build();
967}
968
969// move constructor
970template <typename P>
972 _graph {std::move(rhs._graph)},
973 _num_tokens {rhs._num_tokens},
974 _pipes {std::move(rhs._pipes)},
975 _tasks {std::move(rhs._tasks)},
976 _pipeflows {std::move(rhs._pipeflows)},
977 _lines {std::move(rhs._lines)} {
978
979 rhs._num_tokens = 0;
980}
981
982// move assignment operator
983template <typename P>
985 _graph = std::move(rhs._graph);
986 _num_tokens = rhs._num_tokens;
987 _pipes = std::move(rhs._pipes);
988 _tasks = std::move(rhs._tasks);
989 _pipeflows = std::move(rhs._pipeflows);
990 _lines = std::move(rhs._lines);
991 rhs._num_tokens = 0;
992 return *this;
993}
994
995// Function: num_lines
996template <typename P>
997size_t ScalablePipeline<P>::num_lines() const noexcept {
998 return _pipeflows.size();
999}
1000
1001// Function: num_pipes
1002template <typename P>
1003size_t ScalablePipeline<P>::num_pipes() const noexcept {
1004 return _pipes.size();
1005}
1006
1007// Function: num_tokens
1008template <typename P>
1009size_t ScalablePipeline<P>::num_tokens() const noexcept {
1010 return _num_tokens;
1011}
1012
1013// Function: graph
1014template <typename P>
1016 return _graph;
1017}
1018
1019// Function: _line
1020template <typename P>
1021typename ScalablePipeline<P>::Line& ScalablePipeline<P>::_line(size_t l, size_t p) {
1022 return _lines[l*num_pipes() + p];
1023}
1024
1025template <typename P>
1026void ScalablePipeline<P>::reset(size_t num_lines, P first, P last) {
1027
1028 if(num_lines == 0) {
1029 TF_THROW("must have at least one line");
1030 }
1031
1032 _graph.clear();
1033 _tasks.resize(num_lines + 1);
1034 _pipeflows.resize(num_lines);
1035
1036 reset(first, last);
1037
1038 _build();
1039}
1040
1041// Function: reset
1042template <typename P>
1043void ScalablePipeline<P>::reset(P first, P last) {
1044
1045 size_t num_pipes = static_cast<size_t>(std::distance(first, last));
1046
1047 if(num_pipes == 0) {
1048 TF_THROW("pipeline cannot be empty");
1049 }
1050
1051 if(first->type() != PipeType::SERIAL) {
1052 TF_THROW("first pipe must be serial");
1053 }
1054
1055 _pipes.resize(num_pipes);
1056
1057 size_t i=0;
1058 for(auto itr = first; itr != last; itr++) {
1059 _pipes[i++] = itr;
1060 }
1061
1062 _lines = std::make_unique<Line[]>(num_lines() * _pipes.size());
1063
1064 reset();
1065}
1066
1067// Function: reset
1068template <typename P>
1070
1071 _num_tokens = 0;
1072
1073 for(size_t l = 0; l<num_lines(); l++) {
1074 _pipeflows[l]._pipe = 0;
1075 _pipeflows[l]._line = l;
1076 }
1077
1078 _line(0, 0).join_counter.store(0, std::memory_order_relaxed);
1079
1080 for(size_t l=1; l<num_lines(); l++) {
1081 for(size_t f=1; f<num_pipes(); f++) {
1082 _line(l, f).join_counter.store(
1083 static_cast<size_t>(_pipes[f]->type()), std::memory_order_relaxed
1084 );
1085 }
1086 }
1087
1088 for(size_t f=1; f<num_pipes(); f++) {
1089 _line(0, f).join_counter.store(1, std::memory_order_relaxed);
1090 }
1091
1092 for(size_t l=1; l<num_lines(); l++) {
1093 _line(l, 0).join_counter.store(
1094 static_cast<size_t>(_pipes[0]->type()) - 1, std::memory_order_relaxed
1095 );
1096 }
1097}
1098
1099// Procedure: _on_pipe
1100template <typename P>
1102 _pipes[pf._pipe]->_callable(pf);
1103}
1104
1105// Procedure: _build
1106template <typename P>
1107void ScalablePipeline<P>::_build() {
1108
1109 using namespace std::literals::string_literals;
1110
1111 FlowBuilder fb(_graph);
1112
1113 // init task
1114 _tasks[0] = fb.emplace([this]() {
1115 return static_cast<int>(_num_tokens % num_lines());
1116 }).name("cond");
1117
1118 // line task
1119 for(size_t l = 0; l < num_lines(); l++) {
1120
1121 _tasks[l + 1] = fb.emplace([this, l] (tf::Runtime& rt) mutable {
1122
1123 auto pf = &_pipeflows[l];
1124
1125 pipeline:
1126
1127 _line(pf->_line, pf->_pipe).join_counter.store(
1128 static_cast<size_t>(_pipes[pf->_pipe]->type()), std::memory_order_relaxed
1129 );
1130
1131 if (pf->_pipe == 0) {
1132 pf->_token = _num_tokens;
1133 if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
1134 // here, the pipeline is not stopped yet because other
1135 // lines of tasks may still be running their last stages
1136 return;
1137 }
1138 ++_num_tokens;
1139 }
1140 else {
1141 _on_pipe(*pf, rt);
1142 }
1143
1144 size_t c_f = pf->_pipe;
1145 size_t n_f = (pf->_pipe + 1) % num_pipes();
1146 size_t n_l = (pf->_line + 1) % num_lines();
1147
1148 pf->_pipe = n_f;
1149
1150 // ---- scheduling starts here ----
1151 // Notice that the shared variable f must not be changed after this
1152 // point because it can result in data race due to the following
1153 // condition:
1154 //
1155 // a -> b
1156 // | |
1157 // v v
1158 // c -> d
1159 //
1160 // d will be spawned by either c or b, so if c changes f but b spawns d
1161 // then data race on f will happen
1162
1163 std::array<int, 2> retval;
1164 size_t n = 0;
1165
1166 // downward dependency
1167 if(_pipes[c_f]->type() == PipeType::SERIAL &&
1168 _line(n_l, c_f).join_counter.fetch_sub(
1170 ) {
1171 retval[n++] = 1;
1172 }
1173
1174 // forward dependency
1175 if(_line(pf->_line, n_f).join_counter.fetch_sub(
1177 ) {
1178 retval[n++] = 0;
1179 }
1180
1181 // notice that the task index starts from 1
1182 switch(n) {
1183 case 2: {
1184 rt.schedule(_tasks[n_l+1]);
1185 goto pipeline;
1186 }
1187 case 1: {
1188 if (retval[0] == 1) {
1189 pf = &_pipeflows[n_l];
1190 }
1191 goto pipeline;
1192 }
1193 }
1194 }).name("rt-"s + std::to_string(l));
1195
1196 _tasks[0].precede(_tasks[l+1]);
1197 }
1198}
1199
1200} // end of namespace tf -----------------------------------------------------
1201
1202
1203
1204
class to create a graph object
Definition graph.hpp:56
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:136
PipeType type() const
queries the type of the pipe
Definition pipeline.hpp:182
void callable(U &&callable)
assigns a new callable to the pipe
Definition pipeline.hpp:204
C callable_t
alias of the callable type
Definition pipeline.hpp:149
Pipe()=default
default constructor
void type(PipeType type)
assigns a new type to the pipe
Definition pipeline.hpp:191
Pipe(PipeType d, C &&callable)
constructs the pipe object
Definition pipeline.hpp:173
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:42
size_t token() const
queries the token identifier
Definition pipeline.hpp:74
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:67
Pipeflow()=default
default constructor
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:84
size_t line() const
queries the line identifier of the present token
Definition pipeline.hpp:60
class to create a pipeline scheduling framework
Definition pipeline.hpp:312
void reset()
resets the pipeline
Definition pipeline.hpp:497
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:491
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:473
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:485
Pipeline(size_t num_lines, Ps &&... ps)
constructs a pipeline object
Definition pipeline.hpp:422
constexpr size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:479
class to create a runtime object used by a runtime task
Definition graph.hpp:150
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition executor.hpp:1953
class to create a scalable pipeline object
Definition pipeline.hpp:767
ScalablePipeline(const ScalablePipeline &)=delete
disabled copy constructor
ScalablePipeline()=default
default constructor
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:1015
ScalablePipeline & operator=(const ScalablePipeline &)=delete
disabled copy assignment operator
typename std::iterator_traits< P >::value_type pipe_type
pipe type
Definition pipeline.hpp:781
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:997
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:1009
size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:1003
void reset()
resets the pipeline
Definition pipeline.hpp:1069
T distance(T... args)
T forward(T... args)
T move(T... args)
taskflow namespace
Definition small_vector.hpp:27
PipeType
enumeration of all pipe types
Definition pipeline.hpp:105
@ SERIAL
serial type
@ PARALLEL
parallel type
T to_string(T... args)