BS_thread_pool.hpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. #pragma once
  2. /**
  3. * @file BS_thread_pool.hpp
  4. * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)
  5. * @version 3.5.0
  6. * @date 2023-05-25
  7. * @copyright Copyright (c) 2023 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
  8. *
  9. * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer.
  10. */
  11. #define BS_THREAD_POOL_VERSION "v3.5.0 (2023-05-25)"
  12. #include <chrono> // std::chrono
  13. #include <condition_variable> // std::condition_variable
  14. #include <exception> // std::current_exception
  15. #include <functional> // std::bind, std::function, std::invoke
  16. #include <future> // std::future, std::promise
  17. #include <iostream> // std::cout, std::endl, std::flush, std::ostream
  18. #include <memory> // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr
  19. #include <mutex> // std::mutex, std::scoped_lock, std::unique_lock
  20. #include <queue> // std::queue
  21. #include <thread> // std::thread
  22. #include <type_traits> // std::common_type_t, std::conditional_t, std::decay_t, std::invoke_result_t, std::is_void_v
  23. #include <utility> // std::forward, std::move, std::swap
  24. #include <vector> // std::vector
  25. namespace BS
  26. {
  27. /**
  28. * @brief A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsigned int.
  29. */
  30. using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
  31. // ============================================================================================= //
  32. // Begin class multi_future //
  33. /**
  34. * @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
  35. *
  36. * @tparam T The return type of the futures.
  37. */
  38. template <typename T>
  39. class [[nodiscard]] multi_future
  40. {
  41. public:
  42. /**
  43. * @brief Construct a multi_future object with the given number of futures.
  44. *
  45. * @param num_futures_ The desired number of futures to store.
  46. */
  47. multi_future(const size_t num_futures_ = 0) : futures(num_futures_) {}
  48. /**
  49. * @brief Get the results from all the futures stored in this multi_future object, rethrowing any stored exceptions.
  50. *
  51. * @return If the futures return void, this function returns void as well. Otherwise, it returns a vector containing the results.
  52. */
  53. [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>> get()
  54. {
  55. if constexpr (std::is_void_v<T>)
  56. {
  57. for (size_t i = 0; i < futures.size(); ++i)
  58. futures[i].get();
  59. return;
  60. }
  61. else
  62. {
  63. std::vector<T> results(futures.size());
  64. for (size_t i = 0; i < futures.size(); ++i)
  65. results[i] = futures[i].get();
  66. return results;
  67. }
  68. }
  69. /**
  70. * @brief Get a reference to one of the futures stored in this multi_future object.
  71. *
  72. * @param i The index of the desired future.
  73. * @return The future.
  74. */
  75. [[nodiscard]] std::future<T>& operator[](const size_t i)
  76. {
  77. return futures[i];
  78. }
  79. /**
  80. * @brief Append a future to this multi_future object.
  81. *
  82. * @param future The future to append.
  83. */
  84. void push_back(std::future<T> future)
  85. {
  86. futures.push_back(std::move(future));
  87. }
  88. /**
  89. * @brief Get the number of futures stored in this multi_future object.
  90. *
  91. * @return The number of futures.
  92. */
  93. [[nodiscard]] size_t size() const
  94. {
  95. return futures.size();
  96. }
  97. /**
  98. * @brief Wait for all the futures stored in this multi_future object.
  99. */
  100. void wait() const
  101. {
  102. for (size_t i = 0; i < futures.size(); ++i)
  103. futures[i].wait();
  104. }
  105. private:
  106. /**
  107. * @brief A vector to store the futures.
  108. */
  109. std::vector<std::future<T>> futures;
  110. };
  111. // End class multi_future //
  112. // ============================================================================================= //
  113. // ============================================================================================= //
  114. // Begin class blocks //
  115. /**
  116. * @brief A helper class to divide a range into blocks. Used by parallelize_loop() and push_loop().
  117. *
  118. * @tparam T1 The type of the first index in the range. Should be a signed or unsigned integer.
  119. * @tparam T2 The type of the index after the last index in the range. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred.
  120. * @tparam T The common type of T1 and T2.
  121. */
  122. template <typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
  123. class [[nodiscard]] blocks
  124. {
  125. public:
  126. /**
  127. * @brief Construct a blocks object with the given specifications.
  128. *
  129. * @param first_index_ The first index in the range.
  130. * @param index_after_last_ The index after the last index in the range.
  131. * @param num_blocks_ The desired number of blocks to divide the range into.
  132. */
  133. blocks(const T1 first_index_, const T2 index_after_last_, const size_t num_blocks_) : first_index(static_cast<T>(first_index_)), index_after_last(static_cast<T>(index_after_last_)), num_blocks(num_blocks_)
  134. {
  135. if (index_after_last < first_index)
  136. std::swap(index_after_last, first_index);
  137. total_size = static_cast<size_t>(index_after_last - first_index);
  138. block_size = static_cast<size_t>(total_size / num_blocks);
  139. if (block_size == 0)
  140. {
  141. block_size = 1;
  142. num_blocks = (total_size > 1) ? total_size : 1;
  143. }
  144. }
  145. /**
  146. * @brief Get the first index of a block.
  147. *
  148. * @param i The block number.
  149. * @return The first index.
  150. */
  151. [[nodiscard]] T start(const size_t i) const
  152. {
  153. return static_cast<T>(i * block_size) + first_index;
  154. }
  155. /**
  156. * @brief Get the index after the last index of a block.
  157. *
  158. * @param i The block number.
  159. * @return The index after the last index.
  160. */
  161. [[nodiscard]] T end(const size_t i) const
  162. {
  163. return (i == num_blocks - 1) ? index_after_last : (static_cast<T>((i + 1) * block_size) + first_index);
  164. }
  165. /**
  166. * @brief Get the number of blocks. Note that this may be different than the desired number of blocks that was passed to the constructor.
  167. *
  168. * @return The number of blocks.
  169. */
  170. [[nodiscard]] size_t get_num_blocks() const
  171. {
  172. return num_blocks;
  173. }
  174. /**
  175. * @brief Get the total number of indices in the range.
  176. *
  177. * @return The total number of indices.
  178. */
  179. [[nodiscard]] size_t get_total_size() const
  180. {
  181. return total_size;
  182. }
  183. private:
  184. /**
  185. * @brief The size of each block (except possibly the last block).
  186. */
  187. size_t block_size = 0;
  188. /**
  189. * @brief The first index in the range.
  190. */
  191. T first_index = 0;
  192. /**
  193. * @brief The index after the last index in the range.
  194. */
  195. T index_after_last = 0;
  196. /**
  197. * @brief The number of blocks.
  198. */
  199. size_t num_blocks = 0;
  200. /**
  201. * @brief The total number of indices in the range.
  202. */
  203. size_t total_size = 0;
  204. };
  205. // End class blocks //
  206. // ============================================================================================= //
  207. // ============================================================================================= //
  208. // Begin class thread_pool //
  209. /**
  210. * @brief A fast, lightweight, and easy-to-use C++17 thread pool class.
  211. */
  212. class [[nodiscard]] thread_pool
  213. {
  214. public:
  215. // ============================
  216. // Constructors and destructors
  217. // ============================
  218. /**
  219. * @brief Construct a new thread pool.
  220. *
  221. * @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
  222. */
  223. thread_pool(const concurrency_t thread_count_ = 0) : thread_count(determine_thread_count(thread_count_)), threads(std::make_unique<std::thread[]>(determine_thread_count(thread_count_)))
  224. {
  225. create_threads();
  226. }
  227. /**
  228. * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
  229. */
  230. ~thread_pool()
  231. {
  232. wait_for_tasks();
  233. destroy_threads();
  234. }
  235. // =======================
  236. // Public member functions
  237. // =======================
  238. /**
  239. * @brief Get the number of tasks currently waiting in the queue to be executed by the threads.
  240. *
  241. * @return The number of queued tasks.
  242. */
  243. [[nodiscard]] size_t get_tasks_queued() const
  244. {
  245. const std::scoped_lock tasks_lock(tasks_mutex);
  246. return tasks.size();
  247. }
  248. /**
  249. * @brief Get the number of tasks currently being executed by the threads.
  250. *
  251. * @return The number of running tasks.
  252. */
  253. [[nodiscard]] size_t get_tasks_running() const
  254. {
  255. const std::scoped_lock tasks_lock(tasks_mutex);
  256. return tasks_running;
  257. }
  258. /**
  259. * @brief Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running().
  260. *
  261. * @return The total number of tasks.
  262. */
  263. [[nodiscard]] size_t get_tasks_total() const
  264. {
  265. const std::scoped_lock tasks_lock(tasks_mutex);
  266. return tasks_running + tasks.size();
  267. }
  268. /**
  269. * @brief Get the number of threads in the pool.
  270. *
  271. * @return The number of threads.
  272. */
  273. [[nodiscard]] concurrency_t get_thread_count() const
  274. {
  275. return thread_count;
  276. }
  277. /**
  278. * @brief Check whether the pool is currently paused.
  279. *
  280. * @return true if the pool is paused, false if it is not paused.
  281. */
  282. [[nodiscard]] bool is_paused() const
  283. {
  284. const std::scoped_lock tasks_lock(tasks_mutex);
  285. return paused;
  286. }
  287. /**
  288. * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks.
  289. *
  290. * @tparam F The type of the function to loop through.
  291. * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer.
  292. * @tparam T2 The type of the index after the last index in the loop. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred.
  293. * @tparam T The common type of T1 and T2.
  294. * @tparam R The return value of the loop function F (can be void).
  295. * @param first_index The first index in the loop.
  296. * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted.
  297. * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
  298. * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
  299. * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block.
  300. */
  301. template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
  302. [[nodiscard]] multi_future<R> parallelize_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0)
  303. {
  304. blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
  305. if (blks.get_total_size() > 0)
  306. {
  307. multi_future<R> mf(blks.get_num_blocks());
  308. for (size_t i = 0; i < blks.get_num_blocks(); ++i)
  309. mf[i] = submit(std::forward<F>(loop), blks.start(i), blks.end(i));
  310. return mf;
  311. }
  312. else
  313. {
  314. return multi_future<R>();
  315. }
  316. }
  317. /**
  318. * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks. This overload is used for the special case where the first index is 0.
  319. *
  320. * @tparam F The type of the function to loop through.
  321. * @tparam T The type of the loop indices. Should be a signed or unsigned integer.
  322. * @tparam R The return value of the loop function F (can be void).
  323. * @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted.
  324. * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
  325. * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
  326. * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block.
  327. */
  328. template <typename F, typename T, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
  329. [[nodiscard]] multi_future<R> parallelize_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0)
  330. {
  331. return parallelize_loop(0, index_after_last, std::forward<F>(loop), num_blocks);
  332. }
  333. /**
  334. * @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished.
  335. */
  336. void pause()
  337. {
  338. const std::scoped_lock tasks_lock(tasks_mutex);
  339. paused = true;
  340. }
  341. /**
  342. * @brief Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.
  343. */
  344. void purge()
  345. {
  346. const std::scoped_lock tasks_lock(tasks_mutex);
  347. while (!tasks.empty())
  348. tasks.pop();
  349. }
  350. /**
  351. * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
  352. *
  353. * @tparam F The type of the function to loop through.
  354. * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer.
  355. * @tparam T2 The type of the index after the last index in the loop. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred.
  356. * @tparam T The common type of T1 and T2.
  357. * @param first_index The first index in the loop.
  358. * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted.
  359. * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
  360. * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
  361. */
  362. template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
  363. void push_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0)
  364. {
  365. blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
  366. if (blks.get_total_size() > 0)
  367. {
  368. for (size_t i = 0; i < blks.get_num_blocks(); ++i)
  369. push_task(std::forward<F>(loop), blks.start(i), blks.end(i));
  370. }
  371. }
  372. /**
  373. * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. This overload is used for the special case where the first index is 0.
  374. *
  375. * @tparam F The type of the function to loop through.
  376. * @tparam T The type of the loop indices. Should be a signed or unsigned integer.
  377. * @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted.
  378. * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
  379. * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
  380. */
  381. template <typename F, typename T>
  382. void push_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0)
  383. {
  384. push_loop(0, index_after_last, std::forward<F>(loop), num_blocks);
  385. }
  386. /**
  387. * @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
  388. *
  389. * @tparam F The type of the function.
  390. * @tparam A The types of the arguments.
  391. * @param task The function to push.
  392. * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments.
  393. */
  394. template <typename F, typename... A>
  395. void push_task(F&& task, A&&... args)
  396. {
  397. {
  398. const std::scoped_lock tasks_lock(tasks_mutex);
  399. tasks.push(std::bind(std::forward<F>(task), std::forward<A>(args)...)); // cppcheck-suppress ignoredReturnValue
  400. }
  401. task_available_cv.notify_one();
  402. }
  403. /**
  404. * @brief Reset the number of threads in the pool. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
  405. *
  406. * @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
  407. */
  408. void reset(const concurrency_t thread_count_ = 0)
  409. {
  410. std::unique_lock tasks_lock(tasks_mutex);
  411. const bool was_paused = paused;
  412. paused = true;
  413. tasks_lock.unlock();
  414. wait_for_tasks();
  415. destroy_threads();
  416. thread_count = determine_thread_count(thread_count_);
  417. threads = std::make_unique<std::thread[]>(thread_count);
  418. paused = was_paused;
  419. create_threads();
  420. }
  421. /**
  422. * @brief Submit a function with zero or more arguments into the task queue. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future<void> which can be used to wait until the task finishes.
  423. *
  424. * @tparam F The type of the function.
  425. * @tparam A The types of the zero or more arguments to pass to the function.
  426. * @tparam R The return type of the function (can be void).
  427. * @param task The function to submit.
  428. * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments.
  429. * @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one.
  430. */
  431. template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
  432. [[nodiscard]] std::future<R> submit(F&& task, A&&... args)
  433. {
  434. std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
  435. push_task(
  436. [task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...), task_promise]
  437. {
  438. try
  439. {
  440. if constexpr (std::is_void_v<R>)
  441. {
  442. std::invoke(task_function);
  443. task_promise->set_value();
  444. }
  445. else
  446. {
  447. task_promise->set_value(std::invoke(task_function));
  448. }
  449. }
  450. catch (...)
  451. {
  452. try
  453. {
  454. task_promise->set_exception(std::current_exception());
  455. }
  456. catch (...)
  457. {
  458. }
  459. }
  460. });
  461. return task_promise->get_future();
  462. }
  463. /**
  464. * @brief Unpause the pool. The workers will resume retrieving new tasks out of the queue.
  465. */
  466. void unpause()
  467. {
  468. const std::scoped_lock tasks_lock(tasks_mutex);
  469. paused = false;
  470. }
  471. /**
  472. * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future.
  473. */
  474. void wait_for_tasks()
  475. {
  476. std::unique_lock tasks_lock(tasks_mutex);
  477. waiting = true;
  478. tasks_done_cv.wait(tasks_lock, [this] { return !tasks_running && (paused || tasks.empty()); });
  479. waiting = false;
  480. }
  481. /**
  482. * @brief Wait for tasks to be completed, but stop waiting after the specified duration has passed.
  483. *
  484. * @tparam R An arithmetic type representing the number of ticks to wait.
  485. * @tparam P An std::ratio representing the length of each tick in seconds.
  486. * @param duration The time duration to wait.
  487. * @return true if all tasks finished running, false if the duration expired but some tasks are still running.
  488. */
  489. template <typename R, typename P>
  490. bool wait_for_tasks_duration(const std::chrono::duration<R, P>& duration)
  491. {
  492. std::unique_lock tasks_lock(tasks_mutex);
  493. waiting = true;
  494. const bool status = tasks_done_cv.wait_for(tasks_lock, duration, [this] { return !tasks_running && (paused || tasks.empty()); });
  495. waiting = false;
  496. return status;
  497. }
  498. /**
  499. * @brief Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
  500. *
  501. * @tparam C The type of the clock used to measure time.
  502. * @tparam D An std::chrono::duration type used to indicate the time point.
  503. * @param timeout_time The time point at which to stop waiting.
  504. * @return true if all tasks finished running, false if the time point was reached but some tasks are still running.
  505. */
  506. template <typename C, typename D>
  507. bool wait_for_tasks_until(const std::chrono::time_point<C, D>& timeout_time)
  508. {
  509. std::unique_lock tasks_lock(tasks_mutex);
  510. waiting = true;
  511. const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time, [this] { return !tasks_running && (paused || tasks.empty()); });
  512. waiting = false;
  513. return status;
  514. }
  515. private:
  516. // ========================
  517. // Private member functions
  518. // ========================
  519. /**
  520. * @brief Create the threads in the pool and assign a worker to each thread.
  521. */
  522. void create_threads()
  523. {
  524. {
  525. const std::scoped_lock tasks_lock(tasks_mutex);
  526. workers_running = true;
  527. }
  528. for (concurrency_t i = 0; i < thread_count; ++i)
  529. {
  530. threads[i] = std::thread(&thread_pool::worker, this);
  531. }
  532. }
  533. /**
  534. * @brief Destroy the threads in the pool.
  535. */
  536. void destroy_threads()
  537. {
  538. {
  539. const std::scoped_lock tasks_lock(tasks_mutex);
  540. workers_running = false;
  541. }
  542. task_available_cv.notify_all();
  543. for (concurrency_t i = 0; i < thread_count; ++i)
  544. {
  545. threads[i].join();
  546. }
  547. }
  548. /**
  549. * @brief Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().
  550. *
  551. * @param thread_count_ The parameter passed to the constructor or reset(). If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread.
  552. * @return The number of threads to use for constructing the pool.
  553. */
  554. [[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_) const
  555. {
  556. if (thread_count_ > 0)
  557. return thread_count_;
  558. else
  559. {
  560. if (std::thread::hardware_concurrency() > 0)
  561. return std::thread::hardware_concurrency();
  562. else
  563. return 1;
  564. }
  565. }
  566. /**
  567. * @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by push_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait_for_tasks() in case it is waiting.
  568. */
  569. void worker()
  570. {
  571. std::function<void()> task;
  572. while (true)
  573. {
  574. std::unique_lock tasks_lock(tasks_mutex);
  575. task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !workers_running; });
  576. if (!workers_running)
  577. break;
  578. if (paused)
  579. continue;
  580. task = std::move(tasks.front());
  581. tasks.pop();
  582. ++tasks_running;
  583. tasks_lock.unlock();
  584. task();
  585. tasks_lock.lock();
  586. --tasks_running;
  587. if (waiting && !tasks_running && (paused || tasks.empty()))
  588. tasks_done_cv.notify_all();
  589. }
  590. }
  591. // ============
  592. // Private data
  593. // ============
  594. /**
  595. * @brief A flag indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks.
  596. */
  597. bool paused = false;
  598. /**
  599. * @brief A condition variable to notify worker() that a new task has become available.
  600. */
  601. std::condition_variable task_available_cv = {};
  602. /**
  603. * @brief A condition variable to notify wait_for_tasks() that the tasks are done.
  604. */
  605. std::condition_variable tasks_done_cv = {};
  606. /**
  607. * @brief A queue of tasks to be executed by the threads.
  608. */
  609. std::queue<std::function<void()>> tasks = {};
  610. /**
  611. * @brief A counter for the total number of currently running tasks.
  612. */
  613. size_t tasks_running = 0;
  614. /**
  615. * @brief A mutex to synchronize access to the task queue by different threads.
  616. */
  617. mutable std::mutex tasks_mutex = {};
  618. /**
  619. * @brief The number of threads in the pool.
  620. */
  621. concurrency_t thread_count = 0;
  622. /**
  623. * @brief A smart pointer to manage the memory allocated for the threads.
  624. */
  625. std::unique_ptr<std::thread[]> threads = nullptr;
  626. /**
  627. * @brief A flag indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
  628. */
  629. bool waiting = false;
  630. /**
  631. * @brief A flag indicating to the workers to keep running. When set to false, the workers terminate permanently.
  632. */
  633. bool workers_running = false;
  634. };
  635. // End class thread_pool //
  636. // ============================================================================================= //
  637. // ============================================================================================= //
  638. // Begin class synced_stream //
  639. /**
  640. * @brief A helper class to synchronize printing to an output stream by different threads.
  641. */
  642. class [[nodiscard]] synced_stream
  643. {
  644. public:
  645. /**
  646. * @brief Construct a new synced stream.
  647. *
  648. * @param out_stream_ The output stream to print to. The default value is std::cout.
  649. */
  650. synced_stream(std::ostream& out_stream_ = std::cout) : out_stream(out_stream_) {}
  651. /**
  652. * @brief Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.
  653. *
  654. * @tparam T The types of the items
  655. * @param items The items to print.
  656. */
  657. template <typename... T>
  658. void print(T&&... items)
  659. {
  660. const std::scoped_lock lock(stream_mutex);
  661. (out_stream << ... << std::forward<T>(items));
  662. }
  663. /**
  664. * @brief Print any number of items into the output stream, followed by a newline character. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.
  665. *
  666. * @tparam T The types of the items
  667. * @param items The items to print.
  668. */
  669. template <typename... T>
  670. void println(T&&... items)
  671. {
  672. print(std::forward<T>(items)..., '\n');
  673. }
  674. /**
  675. * @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::endl). Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise '\n' should be used instead.
  676. */
  677. inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
  678. /**
  679. * @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::flush). Used to flush the stream.
  680. */
  681. inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);
  682. private:
  683. /**
  684. * @brief The output stream to print to.
  685. */
  686. std::ostream& out_stream;
  687. /**
  688. * @brief A mutex to synchronize printing.
  689. */
  690. mutable std::mutex stream_mutex = {};
  691. };
  692. // End class synced_stream //
  693. // ============================================================================================= //
  694. // ============================================================================================= //
  695. // Begin class timer //
  696. /**
  697. * @brief A helper class to measure execution time for benchmarking purposes.
  698. */
  699. class [[nodiscard]] timer
  700. {
  701. public:
  702. /**
  703. * @brief Start (or restart) measuring time.
  704. */
  705. void start()
  706. {
  707. start_time = std::chrono::steady_clock::now();
  708. }
  709. /**
  710. * @brief Stop measuring time and store the elapsed time since start().
  711. */
  712. void stop()
  713. {
  714. elapsed_time = std::chrono::steady_clock::now() - start_time;
  715. }
  716. /**
  717. * @brief Get the number of milliseconds that have elapsed between start() and stop().
  718. *
  719. * @return The number of milliseconds.
  720. */
  721. [[nodiscard]] std::chrono::milliseconds::rep ms() const
  722. {
  723. return (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)).count();
  724. }
  725. private:
  726. /**
  727. * @brief The time point when measuring started.
  728. */
  729. std::chrono::time_point<std::chrono::steady_clock> start_time = std::chrono::steady_clock::now();
  730. /**
  731. * @brief The duration that has elapsed between start() and stop().
  732. */
  733. std::chrono::duration<double> elapsed_time = std::chrono::duration<double>::zero();
  734. };
  735. // End class timer //
  736. // ============================================================================================= //
  737. } // namespace BS