Alexandria 2.31.0
SDC-CH common library for the Euclid project
Loading...
Searching...
No Matches
ThreadPool.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2012-2022 Euclid Science Ground Segment
3 *
4 * This library is free software; you can redistribute it and/or modify it under
5 * the terms of the GNU Lesser General Public License as published by the Free
6 * Software Foundation; either version 3.0 of the License, or (at your option)
7 * any later version.
8 *
9 * This library is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 * details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this library; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
27#include <algorithm>
28#include <numeric>
29
30namespace Euclid {
31
32namespace {
33
34class Worker {
35
36public:
37 Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue, std::atomic<bool>& run_flag,
38 std::atomic<bool>& sleeping_flag, std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39 std::exception_ptr& exception_ptr)
40 : m_queue_mutex(queue_mutex)
41 , m_queue(queue)
42 , m_run_flag(run_flag)
43 , m_sleeping_flag(sleeping_flag)
44 , m_done_flag(done_flag)
45 , m_empty_queue_wait_time(empty_queue_wait_time)
46 , m_exception_ptr(exception_ptr) {}
47
48 void operator()() {
49 while (m_run_flag.get()) {
50 std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
51
52 {
54 // If an exception was thrown, stop just here
55 if (m_exception_ptr != nullptr) {
56 break;
57 }
58 // Check if there is anything it the queue to be done and get it
59 if (!m_queue.get().empty()) {
60 task_ptr = Euclid::make_unique<ThreadPool::Task>(m_queue.get().front());
61 m_queue.get().pop_front();
62 }
63 }
64
65 // If we have some work to do, do it. Otherwise sleep for some time.
66 if (task_ptr) {
67 try {
68 (*task_ptr)();
69 } catch (...) {
72 }
73 } else {
74 m_sleeping_flag.get() = true;
76 m_sleeping_flag.get() = false;
77 }
78 }
79 // Indicate that the worker is done
80 m_sleeping_flag.get() = true;
81 m_done_flag.get() = true;
82 m_run_flag.get() = false;
83 }
84
85private:
93};
94
95} // end of anonymous namespace
96
97ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
98 : m_worker_run_flags(thread_count)
99 , m_worker_sleeping_flags(thread_count)
100 , m_worker_done_flags(thread_count)
101 , m_empty_queue_wait_time(empty_queue_wait_time) {
102 for (unsigned int i = 0; i < thread_count; ++i) {
103 m_worker_run_flags.at(i) = true;
104 m_worker_sleeping_flags.at(i) = false;
105 m_worker_done_flags.at(i) = false;
108 }
109}
110
111namespace {
112
113void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
114 // Now wait until all the workers have finish any current tasks
115 for (auto& flag : worker_flags) {
116 while (!flag) {
118 }
119 }
120}
121
122} // namespace
123
126 if (m_exception_ptr) {
127 if (rethrow) {
129 } else {
130 return true;
131 }
132 }
133 return false;
134}
135
136size_t ThreadPool::queued() const {
138 return m_queue.size();
139}
140
146
149 return m_worker_done_flags.size() - done;
150}
151
152void ThreadPool::block(bool throw_on_exception) {
153 // Wait for the queue to be empty
154 bool queue_is_empty = false;
155 while (!queue_is_empty) {
156 {
158 if (m_exception_ptr != nullptr) {
159 break;
160 }
161 queue_is_empty = m_queue.empty();
162 }
163 if (!queue_is_empty) {
165 }
166 }
167 // Wait for the workers to finish the currently executing tasks
169 // Check if any worker finished with an exception
170 checkForException(throw_on_exception);
171}
172
174 // Wait for the pool to be done with anything queued
175 block(false);
176 // Tell the threads to finish
178 // Now wait until all the workers have exited
179 for (auto& worker : m_workers) {
180 worker.join();
181 }
182}
183
187 task();
188 } else {
189 m_queue.emplace_back(std::move(task));
190 }
191}
192
193} // namespace Euclid
std::reference_wrapper< std::mutex > m_queue_mutex
std::reference_wrapper< std::atomic< bool > > m_run_flag
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
std::reference_wrapper< std::atomic< bool > > m_done_flag
unsigned int m_empty_queue_wait_time
std::reference_wrapper< std::exception_ptr > m_exception_ptr
T accumulate(T... args)
T at(T... args)
T begin(T... args)
void submit(Task task)
Submit a task to be executed.
std::deque< Task > m_queue
Definition ThreadPool.h:114
size_t running() const
Return the number of running tasks.
void block(bool throw_on_exception=true)
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition ThreadPool.h:111
size_t queued() const
Return the number of queued tasks.
unsigned int m_empty_queue_wait_time
Definition ThreadPool.h:115
std::vector< std::thread > m_workers
Definition ThreadPool.h:113
std::mutex m_queue_mutex
Definition ThreadPool.h:109
std::vector< std::atomic< bool > > m_worker_run_flags
Definition ThreadPool.h:110
std::vector< std::atomic< bool > > m_worker_done_flags
Definition ThreadPool.h:112
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
std::exception_ptr m_exception_ptr
Definition ThreadPool.h:116
size_t activeThreads() const
Return the number of active workers (either running or sleeping)
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
T current_exception(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T fill(T... args)
T lock(T... args)
T move(T... args)
T rethrow_exception(T... args)
T size(T... args)
T sleep_for(T... args)