Nax.io

Implementing A Thread Pool In C++

May 21, 2017

A thread pool is one of the easiest ways to achieve parallelism. It decouples the logical concurrent execution of functions with physical parallelism.

The concept is simple : you create N threads once, and then schedule functions to be executed on these threads at arbitrary moments.

Let's implement this in C++.

The interface

In essence, a thread pool is nothing more than a pool of threads, a queue of tasks, and some functions to manage the scheduling.
We will implement an indirection layer to make our pool more powerful : the notions of tasks and jobs.

A job is a function enqueued into the thread pool. If a thread is available when we enqueue our job, then it is woken up and the job is processed. If all threads are busy, the job stays in the queue, and will be picked up when a worker thread becomes available again.

A task, on the other hand, is a higher level concept. It is simply a collection of jobs. Every job belongs to a task, and a task can have many jobs, or even no job at all. It is possible for the master thread to wait for a task to be completed, that is, to wait until every job attached to that task returns. That's how we will achieve synchronization.

Here is a pseudocode example of a classical usage pattern:

task := ThreadPool.create_task

10 times
  ThreadPool.perform(task, function ...)

ThreadPool.wait_complete(task)

# Now every job has completed

We will also add a method to check whether a task has finished or not, without having to wait (so the master thread can report progress to the user without blocking, for example).

In C++ terms, the interface should look like that now:

#include <functional>

class ThreadPool
{
public:
    using Job = std::function<void(void)>;

    ThreadPool();
    ~ThreadPool();

    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    int  task_create();
    bool task_finished(int task) const;
    void task_perform(int task, const Job& job);
    void task_wait(int task);
};

The implementation

First, we're gonna have to spawn actual threads. We will define an entry point, called worker_main, for them. We will also have to store the thread objects, to be able to join them in the destructor. Because worker threads will have to mutate the thread pool's state, we will also use a mutex. We also need a flag to signal the workers they have to return when we delete the thread pool.

#include <vector>
#include <thread>
#include <functional>
#include <mutex>

class ThreadPool
{
public:
    using Job = std::function<void(void)>;

    ThreadPool();
    ~ThreadPool();

    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    int  task_create();
    bool task_finished(int task) const;
    void task_perform(int task, const Job& job);
    void task_wait(int task);

private:
    void worker_main();

    bool                        _running;
    mutable std::mutex          _mutex;
    std::vector<std::thread>    _threads;
};

Now we can start the actual implementation:

#include <thread_pool.h>

ThreadPool::ThreadPool()
: _running(true)
{
    size_t thread_count;

    thread_count = std::thread::hardware_concurrency();

    if (thread_count == 0)
        thread_count = 1;

    _threads.reserve(thread_count);

    for (size_t i = 0; i < thread_count; ++i)
        _threads.emplace_back(&ThreadPool::worker_main, this);
}

ThreadPool::~ThreadPool()
{
    std::unique_lock<std::mutex> lock(_mutex);

    _running = false;

    lock.unlock();
    for (auto& t : _threads)
        t.join();
}

void ThreadPool::worker_main()
{
    while (_running)
    {

    }
}

The helper std::thread::hardware_concurrency returns the number of actual, concurrent threads the hardware supports. As it may return 0 on some systems, we force the creation of at least one thread.

Now we have a system that spawns threads doing nothing but spinlocking, but at least they are properly destroyed.

Let's implement the task and job system :

size_t                      _task_size;
std::vector<int>            _free_tasks;
std::vector<unsigned int>   _task_pending_count;

size_t                      _job_size;
std::vector<int>            _job_task;
std::vector<Job>            _job_function;

Classic SoA design, which is a sane default. _free_tasks is the free list for tasks, we will use it to reuse memory. _task_pending_count is a per-task count of how many jobs are pending for the task. _job_task maps every job to the task it belongs to, and _job_function stores the actual functions linked to jobs.

Now task_create is trivial to implement:

int ThreadPool::task_create()
{
    std::lock_guard<std::mutex> lock(_mutex);
    int task;

    if (_free_tasks.empty())
    {
        task = static_cast<int>(_task_size);
        _task_size++;
        _task_pending_count.resize(_task_size);
    }
    else
    {
        task = _free_tasks.back();
        _free_tasks.pop_back();
    }
    _task_pending_count[task] = 0u;

    return task;
}

task_finished too:

void ThreadPool::task_finished(int task) const
{
    std::lock_guard<std::mutex> lock(_mutex);

    return _task_pending_count[task] == 0;
}

And we can start to implement task_perform:

void ThreadPool::task_perform(int task, const Job& job)
{
    std::lock_guard<std::mutex> lock(_mutex);

    _job_size++;
    _job_task.push_back(task);
    _job_function.push_back(job);
    _task_pending_count[task]++;
}

Now we need a way to make our workers sleep when the job queue is empty, and wake them up when we enqueue a job. This is exactly what std::condition_variable is for. Threads can sleep on a condition variable, and it is possible to wake them up from another thread. A condition variable must be used with a mutex, here is how it works:

  • A lock is acquired before waiting on the condition variable.
  • While waiting for the condition variable, the lock is released.
  • When another thread notifies the condition variable, the lock is re-acquired and the thread wakes up.

It is sometimes possible for a thread to wake up on its own, because of implementation details : this is called a spurious wakeup. In our case, it does not matter at all, since we will check whether jobs are pending or not anyway.

We will need two condition variables: one for the worker threads, and one to implement task_wait.

std::condition_variable _cv_worker;
std::condition_variable _cv_master;

Now we can finally implement worker_main:

void ThreadPool::worker_main()
{
    std::unique_lock<std::mutex> lock(_mutex);

    while (_running)
    {
        _cv_worker.wait(lock);

        for (;;)
        {
            if (_job_size == 0)
                break;

            int task = _job_task.back();
            Job job = _job_function.back();
            _job_task.pop_back();
            _job_function.pop_back();
            _job_size--;
            lock.unlock();

            job();

            lock.lock();
            _task_pending_count[task]--;
            if (_task_pending_count[task] == 0)
                _cv_master.notify_all();
        }
    }
}

The code is quite straightforward: wait on the condition variable, when woken up: loop, pop jobs and execute them. We release the lock while executing the jobs, or else only one job could run in parallel. We call notify_all on the master condition variable when executing the last job, in order to implement task_wait:

void ThreadPool::task_wait(int task)
{
    std::unique_lock<std::mutex> lock(_mutex);

    while (_task_pending_count[task] > 0)
        _cv_master.wait(lock);

    _free_tasks.push_back(task);
}

Now we just have to add a call to notify_one on _cv_master in task_perform to wake up a thread, fix the destructor to make sure all threads are woken up, and we're done!

Here is the complete code:

thread_pool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <thread>
#include <vector>
#include <functional>
#include <condition_variable>
#include <mutex>

class ThreadPool
{
public:
    using Job = std::function<void(void)>;

    ThreadPool();
    ~ThreadPool();

    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    int     task_create();
    bool    task_finished(int task) const;
    void    task_perform(int task, const Job& job);
    void    task_wait(int task);

private:
    void worker_main();

    bool                        _running;
    mutable std::mutex          _mutex;
    std::vector<std::thread>    _threads;

    size_t                      _task_size;
    std::vector<int>            _free_tasks;
    std::vector<unsigned>       _task_pending_count;

    size_t                      _job_size;
    std::vector<int>            _job_task;
    std::vector<Job>            _job_function;

    std::condition_variable     _cv_worker;
    std::condition_variable     _cv_master;
};

#endif

thread_pool.cpp

#include <thread_pool.h>

ThreadPool::ThreadPool()
: _running(true)
, _task_size(0u)
, _job_size(0u)
{
    size_t thread_count;
    
    thread_count = std::thread::hardware_concurrency();

    if (thread_count == 0)
        thread_count = 1;

    _threads.reserve(thread_count);

    for (size_t i = 0; i < thread_count; ++i)
        _threads.emplace_back(&ThreadPool::worker_main, this);
}

ThreadPool::~ThreadPool()
{
    std::unique_lock<std::mutex> lock(_mutex);

    _running = false;

    _cv_worker.notify_all();
    lock.unlock();
    for (auto& t : _threads)
        t.join();
}

int ThreadPool::task_create()
{
    std::lock_guard<std::mutex> lock(_mutex);
    int task;

    if (_free_tasks.empty())
    {
        task = static_cast<int>(_task_size);
        _task_size++;
        _task_pending_count.resize(_task_size);
    }
    else
    {
        task = _free_tasks.back();
        _free_tasks.pop_back();
    }
    _task_pending_count[task] = 0u;

    return task;
}

bool ThreadPool::task_finished(int task) const
{
    std::lock_guard<std::mutex> lock(_mutex);

    return _task_pending_count[task] == 0u;
}

void ThreadPool::task_perform(int task, const Job& job)
{
    std::lock_guard<std::mutex> lock(_mutex);

    _job_task.push_back(task);
    _job_function.push_back(job);
    _task_pending_count[task]++;
    _job_size++;
    _cv_worker.notify_one();
}

void ThreadPool::task_wait(int task)
{
    std::unique_lock<std::mutex> lock(_mutex);

    while (_task_pending_count[task] > 0)
        _cv_master.wait(lock);

    _free_tasks.push_back(task);
}

void ThreadPool::worker_main()
{
    std::unique_lock<std::mutex> lock(_mutex);

    while (_running)
    {
        _cv_worker.wait(lock);

        for (;;)
        {
            if (_job_size == 0)
                break;

            int task = _job_task.back();
            Job job = _job_function.back();
            _job_task.pop_back();
            _job_function.pop_back();
            _job_size--;

            lock.unlock();

            job();

            lock.lock();
            _task_pending_count[task]--;
            if (_task_pending_count[task] == 0)
                _cv_master.notify_all();
        }
    }
}
comments powered by Disqus