Before diving into the code implementation, let me summarize what we’ve created: a C++17-compatible thread pool that not only manages a fixed set of worker threads but also supports task scheduling at specific future time points. This implementation leverages modern C++ features and provides a clean, reusable interface that would be suitable for high-performance applications.
The thread pool implementation follows these key principles:
The core interface is straightforward:
template <typename TaskType>
future<typename std::invoke_result<TaskType>::type>
addTask(TaskType task, time_point<steady_clock> execute_at);
This allows adding any callable object to be executed at a specific time point, returning a future for the eventual result.
struct Task {
std::function<void()> execute_cb;
time_point<steady_clock> execute_at;
};
struct TaskComparator {
bool operator()(const Task &a, const Task &b) {
return a.execute_at > b.execute_at;
}
};
priority_queue<Task, vector<Task>, TaskComparator> pq;
The implementation uses a priority queue with a custom comparator to ensure tasks are ordered by their execution time
. The comparator returns true when a should have lower priority than b, so we compare execution times in descending order to prioritize earlier time points.
void startThreads() {
for (int ii = 0; ii < num_threads; ++ii) {
thread_vec.emplace_back([this]() { worker(); });
}
}
Worker threads are created during pool initialization, with each thread executing the worker() function. This approach avoids the overhead of creating and destroying threads for each task
. The number of threads defaults to the hardware concurrency level, which is optimal for CPU-bound tasks
template <typename TaskType>
future<typename std::invoke_result<TaskType>::type>
addTask(TaskType task, time_point<steady_clock> execute_at) {
using result_type = typename std::invoke_result<TaskType>::type;
packaged_task<result_type()> ptask(std::move(task));
auto fut = ptask.get_future();
auto ptask_ptr =
make_shared<packaged_task<result_type()>>(std::move(ptask));
Task q_task;
q_task.execute_cb = [ptask_ptr = std::move(ptask_ptr)]() {
(*ptask_ptr)();
};
q_task.execute_at = execute_at;
{
unique_lock lk(qmtx);
pq.push(std::move(q_task));
}
cv.notify_one();
return fut;
}
This is where the magic happens:
The use of std::packaged_task provides a clean way to handle the task's return value through futures
void worker() {
while (true) {
unique_lock lk(qmtx);
cv.wait(lk, [this]() { return term || !pq.empty(); });
if (term && pq.empty()) {
return;
}
auto &task = pq.top();
if (task.execute_at <= steady_clock::now()) {
auto execute_cb = std::move(task.execute_cb);
pq.pop();
lk.unlock();
execute_cb();
} else {
cv.wait_until(lk, task.execute_at);
}
}
}
The worker thread algorithm:
This design ensures threads aren’t busy-waiting when there’s no work to do, preserving CPU resources
The thread pool uses a mutex (qmtx) to protect access to the shared priority queue. Instead of directly locking and unlocking the mutex, we use std::unique_lock
unique_lock lk(qmtx);
// Critical section
Why std::unique_lock instead of std::lock_guard? While std::lock_guard provides simple RAII-based locking, std::unique_lock offers more flexibility:
As seen in our worker thread:
auto execute_cb = std::move(task.execute_cb);
pq.pop();
lk.unlock(); // Release lock before execution
execute_cb(); // Execute without holding the lock
This pattern prevents holding the lock during potentially long-running tasks, which would block other threads from accessing the queue
The condition variable (cv) allows threads to wait efficiently until a condition is met:
cv.wait(lk, [this]() { return term || !pq.empty(); });
Here, threads wait until either termination is requested or there’s a task in the queue. The condition variable integrates with the unique_lock to:
We also use wait_until to have threads wake up at specific times for scheduled tasks:
cv.wait_until(lk, task.execute_at);
This is more efficient than continuously checking the time, as it leverages the operating system’s timer facilities
.
The provided BasicTest() function demonstrates two key capabilities:
void BasicTest() {
ThreadPool tp(std::thread::hardware_concurrency() /* num_threads */);
vector<future<int>> future_vec;
// Schedule 20 "long" tasks 10 seconds in the future
for (int ii = 0; ii < 20; ++ii) {
auto fut = tp.addTask(
[]() {
cout << " Long Task " << endl;
return 0;
},
steady_clock::now() + steady_clock::duration(std::chrono::seconds(10)));
future_vec.emplace_back(std::move(fut));
}
// Schedule 10 "short" tasks 5 seconds in the future
for (int ii = 0; ii < 10; ++ii) {
auto fut = tp.addTask(
[]() {
cout << " Short Task " << endl;
return 0;
},
steady_clock::now() + steady_clock::duration(std::chrono::seconds(5)));
future_vec.emplace_back(std::move(fut));
}
// Wait for all tasks to complete
for (auto &fut : future_vec) {
fut.get();
}
tp.terminate();
}
Expected output (simplified):
Start running basic test
(After 5 seconds)
Short Task
Short Task
...
(After 10 seconds)
Long Task
Long Task
...
For a more comprehensive test suite:
Several enhancements could be made to this thread pool:
This thread pool implementation showcases key C++17 features and threading concepts. Its elegant design provides type-safe task scheduling with minimal locking overhead. While there’s room for optimization, this core implementation serves as an excellent foundation for understanding concurrent programming patterns in modern C++.
In an interview setting, this code demonstrates:
The balance between simplicity and functionality makes this implementation a solid starting point for many concurrent applications.