28 #define THREAD_POOL_VERSION "v2.0.0 (2021-08-14)"
40 #include <type_traits>
46 namespace kvikio::third_party {
55 typedef std::uint_fast32_t ui32;
56 typedef std::uint_fast64_t ui64;
71 thread_pool(
const ui32& _thread_count = std::thread::hardware_concurrency())
72 : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()),
73 threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()])
101 const std::scoped_lock lock(queue_mutex);
147 template <
typename T1,
typename T2,
typename F>
149 const T2& index_after_last,
153 typedef std::common_type_t<T1, T2> T;
154 T the_first_index = (T)first_index;
155 T last_index = (T)index_after_last;
156 if (the_first_index == last_index)
return;
157 if (last_index < the_first_index) {
159 last_index = the_first_index;
160 the_first_index = temp;
163 if (num_blocks == 0) num_blocks = thread_count;
164 ui64 total_size = (ui64)(last_index - the_first_index + 1);
165 ui64 block_size = (ui64)(total_size / num_blocks);
166 if (block_size == 0) {
168 num_blocks = (ui32)total_size > 1 ? (ui32)total_size : 1;
170 std::atomic<ui32> blocks_running = 0;
171 for (ui32 t = 0; t < num_blocks; t++) {
172 T start = ((T)(t * block_size) + the_first_index);
174 (t == num_blocks - 1) ? last_index + 1 : ((T)((t + 1) * block_size) + the_first_index);
176 push_task([start, end, &loop, &blocks_running] {
181 while (blocks_running != 0) {
192 template <
typename F>
197 const std::scoped_lock lock(queue_mutex);
198 tasks.push(std::function<
void()>(task));
214 template <
typename F,
typename... A>
217 push_task([task, args...] { task(args...); });
232 void reset(
const ui32& _thread_count = std::thread::hardware_concurrency())
239 thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency();
240 threads.reset(
new std::thread[thread_count]);
256 template <
typename F,
258 typename = std::enable_if_t<
259 std::is_void_v<std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>>>
260 std::future<bool>
submit(
const F& task,
const A&... args)
262 std::shared_ptr<std::promise<bool>> task_promise(
new std::promise<bool>);
263 std::future<bool> future = task_promise->get_future();
264 push_task([task, args..., task_promise] {
267 task_promise->set_value(true);
270 task_promise->set_exception(std::current_exception());
290 template <
typename F,
292 typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>,
293 typename = std::enable_if_t<!std::is_void_v<R>>>
294 std::future<R>
submit(
const F& task,
const A&... args)
296 std::shared_ptr<std::promise<R>> task_promise(
new std::promise<R>);
297 std::future<R> future = task_promise->get_future();
298 push_task([task, args..., task_promise] {
300 task_promise->set_value(task(args...));
303 task_promise->set_exception(std::current_exception());
322 if (tasks_total == 0)
break;
324 if (get_tasks_running() == 0)
break;
339 std::atomic<bool> paused =
false;
347 ui32 sleep_duration = 1000;
357 void create_threads()
359 for (ui32 i = 0; i < thread_count; i++) {
360 threads[i] = std::thread(&thread_pool::worker,
this);
367 void destroy_threads()
369 for (ui32 i = 0; i < thread_count; i++) {
381 bool pop_task(std::function<
void()>& task)
383 const std::scoped_lock lock(queue_mutex);
387 task = std::move(tasks.front());
397 void sleep_or_yield()
400 std::this_thread::sleep_for(std::chrono::microseconds(sleep_duration));
402 std::this_thread::yield();
412 std::function<void()> task;
413 if (!paused && pop_task(task)) {
429 mutable std::mutex queue_mutex = {};
435 std::atomic<bool> running =
true;
440 std::queue<std::function<void()>> tasks = {};
450 std::unique_ptr<std::thread[]> threads;
456 std::atomic<ui32> tasks_total = 0;
A C++17 thread pool class. The user submits tasks to be executed into a queue. Whenever a thread beco...
~thread_pool()
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
ui32 get_tasks_running() const
Get the number of tasks currently being executed by the threads.
std::future< bool > submit(const F &task, const A &... args)
Submit a function with zero or more arguments and no return value into the task queue,...
thread_pool(const ui32 &_thread_count=std::thread::hardware_concurrency())
Construct a new thread pool.
std::atomic< bool > paused
An atomic variable indicating to the workers to pause. When set to true, the workers temporarily stop...
void push_task(const F &task, const A &... args)
Push a function with arguments, but no return value, into the task queue.
void reset(const ui32 &_thread_count=std::thread::hardware_concurrency())
Reset the number of threads in the pool. Waits for all currently running tasks to be completed,...
void wait_for_tasks()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
ui64 get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
void push_task(const F &task)
Push a function with no arguments or return value into the task queue.
std::future< R > submit(const F &task, const A &... args)
Submit a function with zero or more arguments and a return value into the task queue,...
ui32 get_thread_count() const
Get the number of threads in the pool.
ui32 get_tasks_total() const
Get the total number of unfinished tasks - either still in the queue, or running in a thread.
void parallelize_loop(const T1 &first_index, const T2 &index_after_last, const F &loop, ui32 num_blocks=0)
Parallelize a loop by splitting it into blocks, submitting each block separately to the thread pool,...