Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
tf::ScalablePipeline< P > Class Template Reference

class to create a scalable pipeline object More...

#include <pipeline.hpp>

Public Types

using pipe_type = typename std::iterator_traits< P >::value_type
 pipe type
 

Public Member Functions

 ScalablePipeline ()=default
 default constructor
 
 ScalablePipeline (size_t num_lines)
 constructs an empty scalable pipeline object
 
 ScalablePipeline (size_t num_lines, P first, P last)
 constructs a scalable pipeline object
 
 ScalablePipeline (const ScalablePipeline &)=delete
 disabled copy constructor
 
 ScalablePipeline (ScalablePipeline &&rhs)
 move constructor
 
ScalablePipelineoperator= (const ScalablePipeline &)=delete
 disabled copy assignment operator
 
ScalablePipelineoperator= (ScalablePipeline &&rhs)
 move constructor
 
size_t num_lines () const noexcept
 queries the number of parallel lines
 
size_t num_pipes () const noexcept
 queries the number of pipes
 
void reset ()
 resets the pipeline
 
void reset (P first, P last)
 resets the pipeline with a new range of pipes
 
void reset (size_t num_lines, P first, P last)
 resets the pipeline to a new line number and a new range of pipes
 
size_t num_tokens () const noexcept
 queries the number of generated tokens in the pipeline
 
Graphgraph ()
 obtains the graph object associated with the pipeline construct
 

Detailed Description

template<typename P>
class tf::ScalablePipeline< P >

class to create a scalable pipeline object

Template Parameters
Ptype of the iterator to a range of pipes

A scalable pipeline is a composable graph object for users to create a pipeline scheduling framework using a module task in a taskflow. Unlike tf::Pipeline that instantiates all pipes upon the construction time, tf::ScalablePipeline allows variable assignments of pipes using range iterators. Users can also reset a scalable pipeline to a different range of pipes between runs. The following code creates a scalable pipeline of four parallel lines to schedule tokens through three serial pipes in a custom storage, then resetting the pipeline to a new range of five serial pipes:

tf::Taskflow taskflow("pipeline");
tf::Executor executor;
const size_t num_lines = 4;
// create data storage
// define the pipe callable
auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable {
switch(pf.pipe()) {
// first stage generates only 5 scheduling tokens and saves the
// token number into the buffer.
case 0: {
if(pf.token() == 5) {
pf.stop();
}
else {
printf("stage 1: input token = %zu\n", pf.token());
buffer[pf.line()] = pf.token();
}
return;
}
break;
// other stages propagate the previous result to this pipe and
// increment it by one
default: {
"stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]
);
buffer[pf.line()] = buffer[pf.line()] + 1;
}
break;
}
};
// create a vector of three pipes
for(size_t i=0; i<3; i++) {
pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}
// create a pipeline of four parallel lines based on the given vector of pipes
tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end());
// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
// reset the pipeline to a new range of five pipes and starts from
// the initial state (i.e., token counts from zero)
for(size_t i=0; i<2; i++) {
pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}
pl.reset(pipes.begin(), pipes.end());
executor.run(taskflow).wait();
class to create an executor for running a taskflow graph
Definition executor.hpp:50
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Definition executor.hpp:1573
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:136
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:42
class to create a scalable pipeline object
Definition pipeline.hpp:767
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:997
class to create a task handle over a node in a taskflow graph
Definition task.hpp:187
const std::string & name() const
queries the name of the task
Definition task.hpp:499
void dump(std::ostream &ostream) const
dumps the task through an output stream
Definition task.hpp:573
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:420
Task & composed_of(T &object)
creates a module task from a taskflow
Definition task.hpp:436
class to create a taskflow object
Definition core/taskflow.hpp:73
T printf(T... args)
@ SERIAL
serial type

The above example creates a pipeline graph that schedules five tokens over four parallel lines in a circular fashion, first going through three serial pipes and then five serial pipes:

# initial construction of three serial pipes
o -> o -> o
| | |
v v v
o -> o -> o
| | |
v v v
o -> o -> o
| | |
v v v
o -> o -> o
# resetting to a new range of five serial pipes
o -> o -> o -> o -> o
| | | | |
v v v v v
o -> o -> o -> o -> o
| | | | |
v v v v v
o -> o -> o -> o -> o
| | | | |
v v v v v
o -> o -> o -> o -> o

Each pipe has the same type of tf::Pipe<std::function<void(tf::Pipeflow&)>> and is kept in a vector that is amenable to change. We construct the scalable pipeline using two range iterators pointing to the beginning and the end of the vector. At each pipe stage, the program propagates the result to the next pipe by adding one to the result stored in a custom data storage, buffer. The pipeline scheduler will generate five scheduling tokens and then stop.

A scalable pipeline is move-only.

Constructor & Destructor Documentation

◆ ScalablePipeline() [1/3]

template<typename P >
tf::ScalablePipeline< P >::ScalablePipeline ( size_t  num_lines)

constructs an empty scalable pipeline object

Parameters
num_linesthe number of parallel lines

An empty scalable pipeline does not have any pipes. The pipeline needs to be reset to a valid range of pipes before running.

◆ ScalablePipeline() [2/3]

template<typename P >
tf::ScalablePipeline< P >::ScalablePipeline ( size_t  num_lines,
first,
last 
)

constructs a scalable pipeline object

Parameters
num_linesthe number of parallel lines
firstiterator to the beginning of the range
lastiterator to the end of the range

Constructs a pipeline from the given range of pipes specified in [first, last) using num_lines parallel lines. The first pipe must define a serial direction (tf::PipeType::SERIAL) or an exception will be thrown.

Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.

◆ ScalablePipeline() [3/3]

template<typename P >
tf::ScalablePipeline< P >::ScalablePipeline ( ScalablePipeline< P > &&  rhs)

move constructor

Constructs a pipeline from the given rhs using move semantics (i.e. the data in rhs is moved into this pipeline). After the move, rhs is in a state as if it is just constructed. The behavior is undefined if rhs is running during the move.

Member Function Documentation

◆ graph()

template<typename P >
Graph & tf::ScalablePipeline< P >::graph ( )

obtains the graph object associated with the pipeline construct

This method is primarily used as an opaque data structure for creating a module task of the this pipeline.

◆ num_lines()

template<typename P >
size_t tf::ScalablePipeline< P >::num_lines ( ) const
noexcept

queries the number of parallel lines

The function returns the number of parallel lines given by the user upon the construction of the pipeline. The number of lines represents the maximum parallelism this pipeline can achieve.

◆ num_pipes()

template<typename P >
size_t tf::ScalablePipeline< P >::num_pipes ( ) const
noexcept

queries the number of pipes

The Function returns the number of pipes given by the user upon the construction of the pipeline.

◆ num_tokens()

template<typename P >
size_t tf::ScalablePipeline< P >::num_tokens ( ) const
noexcept

queries the number of generated tokens in the pipeline

The number represents the total scheduling tokens that has been generated by the pipeline so far.

◆ operator=()

template<typename P >
ScalablePipeline< P > & tf::ScalablePipeline< P >::operator= ( ScalablePipeline< P > &&  rhs)

move constructor

Replaces the contents with those of rhs using move semantics (i.e. the data in rhs is moved into this pipeline). After the move, rhs is in a state as if it is just constructed. The behavior is undefined if rhs is running during the move.

◆ reset() [1/3]

template<typename P >
void tf::ScalablePipeline< P >::reset ( )

resets the pipeline

Resets the pipeline to the initial state. After resetting a pipeline, its token identifier will start from zero.

◆ reset() [2/3]

template<typename P >
void tf::ScalablePipeline< P >::reset ( first,
last 
)

resets the pipeline with a new range of pipes

Parameters
firstiterator to the beginning of the range
lastiterator to the end of the range

The member function assigns the pipeline to a new range of pipes specified in [first, last) and resets the pipeline to the initial state. After resetting a pipeline, its token identifier will start from zero.

Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.

◆ reset() [3/3]

template<typename P >
void tf::ScalablePipeline< P >::reset ( size_t  num_lines,
first,
last 
)

resets the pipeline to a new line number and a new range of pipes

Parameters
num_linesnumber of parallel lines
firstiterator to the beginning of the range
lastiterator to the end of the range

The member function resets the pipeline to a new number of parallel lines and a new range of pipes specified in [first, last), as if the pipeline is just constructed. After resetting a pipeline, its token identifier will start from zero.

Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.


The documentation for this class was generated from the following file: