3#include "../taskflow.hpp"
44 template <
typename... Ps>
135template <
typename C = std::function<
void(tf::Pipeflow&)>>
138 template <
typename... Ps>
141 template <
typename P>
203 template <
typename U>
311template <
typename... Ps>
314 static_assert(
sizeof...(Ps)>0,
"must have at least one pipe");
374 constexpr size_t num_pipes()
const noexcept;
413 template <
size_t... I>
421template <
typename... Ps>
423 _pipes {
std::make_tuple(
std::forward<Ps>(ps)...)},
424 _meta {PipeMeta{ps.type()}...},
426 _tasks (num_lines + 1),
427 _pipeflows (num_lines) {
430 TF_THROW(
"must have at least one line");
434 TF_THROW(
"first pipe must be serial");
442template <
typename... Ps>
444 _pipes {
std::forward<
std::tuple<Ps...>>(ps)},
446 std::forward<
std::tuple<Ps...>>(ps),
std::make_index_sequence<sizeof...(Ps)>{}
449 _tasks (num_lines + 1),
450 _pipeflows (num_lines) {
453 TF_THROW(
"must have at least one line");
456 if(
std::get<0>(_pipes).type() != PipeType::SERIAL) {
457 TF_THROW(
"first pipe must be serial");
465template <
typename... Ps>
466template <
size_t... I>
472template <
typename... Ps>
474 return _pipeflows.size();
478template <
typename... Ps>
480 return sizeof...(Ps);
484template <
typename... Ps>
490template <
typename... Ps>
496template <
typename... Ps>
501 for(
size_t l = 0; l<num_lines(); l++) {
502 _pipeflows[l]._pipe = 0;
503 _pipeflows[l]._line = l;
508 for(
size_t l=1; l<num_lines(); l++) {
509 for(
size_t f=1; f<num_pipes(); f++) {
510 _lines[l][f].join_counter.store(
516 for(
size_t f=1; f<num_pipes(); f++) {
520 for(
size_t l=1; l<num_lines(); l++) {
521 _lines[l][0].join_counter.store(
528template <
typename... Ps>
530 visit_tuple([&](
auto&& pipe){
532 }, _pipes, pf._pipe);
536template <
typename... Ps>
537void Pipeline<Ps...>::_build() {
539 using namespace std::literals::string_literals;
541 FlowBuilder fb(_graph);
544 _tasks[0] = fb.emplace([
this]() {
545 return static_cast<int>(_num_tokens % num_lines());
549 for(
size_t l = 0; l < num_lines(); l++) {
551 _tasks[l + 1] = fb.emplace([
this, l] (
tf::Runtime& rt)
mutable {
553 auto pf = &_pipeflows[l];
557 _lines[pf->_line][pf->_pipe].join_counter.store(
561 if (pf->_pipe == 0) {
562 pf->_token = _num_tokens;
563 if (pf->_stop =
false, _on_pipe(*pf, rt); pf->_stop ==
true) {
574 size_t c_f = pf->_pipe;
575 size_t n_f = (pf->_pipe + 1) % num_pipes();
576 size_t n_l = (pf->_line + 1) % num_lines();
597 if(_meta[c_f].type == PipeType::SERIAL &&
598 _lines[n_l][c_f].join_counter.fetch_sub(
605 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
618 if (retval[0] == 1) {
619 pf = &_pipeflows[n_l];
626 _tasks[0].precede(_tasks[l+1]);
855 size_t num_lines() const noexcept;
863 size_t num_pipes() const noexcept;
888 void reset(P first, P last);
907 void reset(
size_t num_lines, P first, P last);
915 size_t num_tokens() const noexcept;
929 size_t _num_tokens{0};
939 Line& _line(
size_t,
size_t);
945 _tasks (num_lines + 1),
946 _pipeflows (num_lines) {
949 TF_THROW(
"must have at least one line");
958 _tasks (num_lines + 1),
959 _pipeflows (num_lines) {
962 TF_THROW(
"must have at least one line");
972 _graph {
std::move(rhs._graph)},
973 _num_tokens {rhs._num_tokens},
974 _pipes {
std::move(rhs._pipes)},
975 _tasks {
std::move(rhs._tasks)},
976 _pipeflows {
std::move(rhs._pipeflows)},
977 _lines {
std::move(rhs._lines)} {
986 _num_tokens = rhs._num_tokens;
998 return _pipeflows.size();
1002template <
typename P>
1004 return _pipes.size();
1008template <
typename P>
1014template <
typename P>
1020template <
typename P>
1022 return _lines[l*num_pipes() + p];
1025template <
typename P>
1028 if(num_lines == 0) {
1029 TF_THROW(
"must have at least one line");
1033 _tasks.resize(num_lines + 1);
1034 _pipeflows.resize(num_lines);
1042template <
typename P>
1045 size_t num_pipes =
static_cast<size_t>(
std::distance(first, last));
1047 if(num_pipes == 0) {
1048 TF_THROW(
"pipeline cannot be empty");
1052 TF_THROW(
"first pipe must be serial");
1055 _pipes.resize(num_pipes);
1058 for(
auto itr = first; itr != last; itr++) {
1068template <
typename P>
1073 for(
size_t l = 0; l<num_lines(); l++) {
1074 _pipeflows[l]._pipe = 0;
1075 _pipeflows[l]._line = l;
1080 for(
size_t l=1; l<num_lines(); l++) {
1081 for(
size_t f=1; f<num_pipes(); f++) {
1082 _line(l, f).join_counter.store(
1088 for(
size_t f=1; f<num_pipes(); f++) {
1092 for(
size_t l=1; l<num_lines(); l++) {
1093 _line(l, 0).join_counter.store(
1100template <
typename P>
1102 _pipes[pf._pipe]->_callable(pf);
1106template <
typename P>
1107void ScalablePipeline<P>::_build() {
1109 using namespace std::literals::string_literals;
1111 FlowBuilder fb(_graph);
1114 _tasks[0] = fb.emplace([
this]() {
1115 return static_cast<int>(_num_tokens % num_lines());
1119 for(
size_t l = 0; l < num_lines(); l++) {
1121 _tasks[l + 1] = fb.emplace([
this, l] (
tf::Runtime& rt)
mutable {
1123 auto pf = &_pipeflows[l];
1127 _line(pf->_line, pf->_pipe).join_counter.store(
1131 if (pf->_pipe == 0) {
1132 pf->_token = _num_tokens;
1133 if (pf->_stop =
false, _on_pipe(*pf, rt); pf->_stop ==
true) {
1144 size_t c_f = pf->_pipe;
1145 size_t n_f = (pf->_pipe + 1) % num_pipes();
1146 size_t n_l = (pf->_line + 1) % num_lines();
1168 _line(n_l, c_f).join_counter.fetch_sub(
1175 if(_line(pf->_line, n_f).join_counter.fetch_sub(
1188 if (retval[0] == 1) {
1189 pf = &_pipeflows[n_l];
1196 _tasks[0].precede(_tasks[l+1]);
class to create a graph object
Definition graph.hpp:56
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:136
PipeType type() const
queries the type of the pipe
Definition pipeline.hpp:182
void callable(U &&callable)
assigns a new callable to the pipe
Definition pipeline.hpp:204
C callable_t
alias of the callable type
Definition pipeline.hpp:149
Pipe()=default
default constructor
void type(PipeType type)
assigns a new type to the pipe
Definition pipeline.hpp:191
Pipe(PipeType d, C &&callable)
constructs the pipe object
Definition pipeline.hpp:173
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:42
size_t token() const
queries the token identifier
Definition pipeline.hpp:74
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:67
Pipeflow()=default
default constructor
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:84
size_t line() const
queries the line identifier of the present token
Definition pipeline.hpp:60
class to create a pipeline scheduling framework
Definition pipeline.hpp:312
void reset()
resets the pipeline
Definition pipeline.hpp:497
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:491
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:473
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:485
Pipeline(size_t num_lines, Ps &&... ps)
constructs a pipeline object
Definition pipeline.hpp:422
constexpr size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:479
class to create a runtime object used by a runtime task
Definition graph.hpp:150
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition executor.hpp:1953
class to create a scalable pipeline object
Definition pipeline.hpp:767
ScalablePipeline(const ScalablePipeline &)=delete
disabled copy constructor
ScalablePipeline()=default
default constructor
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:1015
ScalablePipeline & operator=(const ScalablePipeline &)=delete
disabled copy assignment operator
typename std::iterator_traits< P >::value_type pipe_type
pipe type
Definition pipeline.hpp:781
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:997
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:1009
size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:1003
void reset()
resets the pipeline
Definition pipeline.hpp:1069
taskflow namespace
Definition small_vector.hpp:27
PipeType
enumeration of all pipe types
Definition pipeline.hpp:105