Taskflow  3.2.0-Master-Branch
Loading...
Searching...
No Matches
Limit the Maximum Concurrency

This chapters discusses how to limit the concurrency or the maximum number of workers in subgraphs of a taskflow.

Define a Semaphore

Taskflow provides a mechanism, tf::Semaphore, for you to limit the maximum concurrency in a section of tasks. You can let a task acquire/release one or multiple semaphores before/after executing its work. A task can acquire and release a semaphore, or just acquire or just release it. A tf::Semaphore object starts with an initial count. As long as that count is above 0, tasks can acquire the semaphore and do their work. If the count is 0 or less, a task trying to acquire the semaphore will not run but goes to a waiting list of that semaphore. When the semaphore is released by another task, it reschedules all tasks on that waiting list.

tf::Executor executor(8); // create an executor of 8 workers
tf::Taskflow taskflow;
tf::Semaphore semaphore(1); // create a semaphore with initial count 1
taskflow.emplace([](){ std::cout << "A" << std::endl; }),
taskflow.emplace([](){ std::cout << "B" << std::endl; }),
taskflow.emplace([](){ std::cout << "C" << std::endl; }),
taskflow.emplace([](){ std::cout << "D" << std::endl; }),
taskflow.emplace([](){ std::cout << "E" << std::endl; })
};
for(auto & task : tasks) { // each task acquires and release the semaphore
task.acquire(semaphore);
task.release(semaphore);
}
executor.run(taskflow).wait();
class to create an executor for running a taskflow graph
Definition executor.hpp:50
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:742
class to create a semophore object for building a concurrency constraint
Definition semaphore.hpp:68
Task & acquire(Semaphore &semaphore)
makes the task acquire this semaphore
Definition task.hpp:470
Task & release(Semaphore &semaphore)
makes the task release this semaphore
Definition task.hpp:479
class to create a taskflow object
Definition core/taskflow.hpp:73
T endl(T... args)
dot_semaphore1.png

The above example creates five tasks with no dependencies between them. Under normal circumstances, the five tasks would be executed concurrently. However, this example has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they are done. This organization limits the number of concurrently running tasks to only one. One possible output is shown below:

# the output is a sequential chain of five tasks
A
B
E
D
C
Attention
It is your responsibility to ensure the semaphore stays alive during the execution of tasks that acquire and release it. The executor and taskflow do not manage lifetime of any semaphores.

For the same example above, we can limit the semaphore concurrency to another value different from 1, say 3, which will limit only three workers to run the five tasks, A, B, C, D, and E.

tf::Executor executor(8); // create an executor of 8 workers
tf::Taskflow taskflow;
tf::Semaphore semaphore(3); // create a semaphore with initial count 3
taskflow.emplace([](){ std::cout << "A" << std::endl; }),
taskflow.emplace([](){ std::cout << "B" << std::endl; }),
taskflow.emplace([](){ std::cout << "C" << std::endl; }),
taskflow.emplace([](){ std::cout << "D" << std::endl; }),
taskflow.emplace([](){ std::cout << "E" << std::endl; })
};
for(auto & task : tasks) { // each task acquires and release the semaphore
task.acquire(semaphore);
task.release(semaphore);
}
executor.run(taskflow).wait();
# One possible output: A, B, and C run concurrently, D and E run concurrently
ABC
ED

Semaphores are powerful for limiting the maximum concurrency of not only a section of tasks but also different sections of tasks. Specifically, you can have one task acquire a semaphore and have another task release that semaphore to impose concurrency on subgraphs of tasks. The following example serializes the execution of five pairs of tasks using a semaphore rather than explicit dependencies.

tf::Executor executor(4); // creates an executor of 4 workers
tf::Taskflow taskflow;
tf::Semaphore semaphore(1);
int N = 5;
int counter = 0; // non-atomic integer counter
for(int i=0; i<N; i++) {
tf::Task f = taskflow.emplace([&](){ counter++; })
.name("from-"s + std::to_string(i));
tf::Task t = taskflow.emplace([&](){ counter++; })
.name("to-"s + std::to_string(i));
f.precede(t);
f.acquire(semaphore);
t.release(semaphore);
}
executor.run(taskflow).wait();
assert(counter == 2*N);
class to create a task handle over a node in a taskflow graph
Definition task.hpp:187
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:420
T to_string(T... args)
dot_semaphore2.png

Without semaphores, each pair of tasks, e.g., from-0 -> to-0, will run independently and concurrently. However, the program forces each from task to acquire the semaphore before running its work and not to release it until its paired to task is done. This constraint forces each pair of tasks to run sequentially, while the order of which pair runs first is up to the scheduler.

Define a Critical Section

tf::CriticalSection is a wrapper over tf::Semaphore specialized for limiting the maximum concurrency over a section of tasks. A critical section starts with an initial count representing that limit. When a task is added to the critical section, the task acquires and releases the semaphore internal to the critical section. This method tf::CriticalSection::add automatically calls tf::Task::acquire and tf::Task::release for each task added to the critical section. The following example creates a critical section of two workers to run five tasks in the critical section.

tf::Executor executor(8); // create an executor of 8 workers
tf::Taskflow taskflow;
// create a critical section of two workers
tf::CriticalSection critical_section(2);
tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });
tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });
tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });
tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; });
tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; });
critical_section.add(A, B, C, D, E);
executor.run(taskflow).wait();
class to create a critical region of limited workers to run tasks
Definition critical.hpp:49

Define a Conflict Graph

One important application of tf::Semaphore is conflict-aware scheduling using a conflict graph. A conflict graph is a undirected graph where each vertex represents a task and each edge represents a conflict between a pair of tasks. When a task conflicts with another task, they cannot run together. Consider the conflict graph below, task A conflicts with task B and task C (and vice versa), meaning that A cannot run together with B and C whereas B and C can run together.

dot_semaphore3.png

We can create one semaphore of one concurrency for each edge in the conflict graph and let the two tasks of that edge acquire the semaphore. This organization forces the two tasks to not run concurrently.

tf::Executor executor;
tf::Taskflow taskflow;
tf::Semaphore conflict_AB(1);
tf::Semaphore conflict_AC(1);
tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });
tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });
tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });
// describe the conflict between A and B
A.acquire(conflict_AB).release(conflict_AB);
B.acquire(conflict_AB).release(conflict_AB);
// describe the conflict between A and C
A.acquire(conflict_AC).release(conflict_AC);
C.acquire(conflict_AC).release(conflict_AC);
executor.run(taskflow).wait();
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Definition executor.hpp:1573
# One possible output: B and C run concurrently after A
A
BC
Note
A task can acquire and release multiple semaphores. When the executor is running a task, it will first try to acquire all semaphores of that task. When the executor finishes a task, it will release all acquired semaphores of that task.

The above code can be rewritten with tf::CriticalSection for simplicity, as shown below:

tf::Executor executor;
tf::Taskflow taskflow;
tf::CriticalSection critical_section_AB(1);
tf::CriticalSection critical_section_AC(1);
tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });
tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });
tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });
// describe the conflict graph
critical_section_AB.add(A, B);
critical_section_AC.add(A, C);
executor.run(taskflow).wait();