Alexandria  2.25.0
SDC-CH common library for the Euclid project
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2021 Euclid Science Ground Segment
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 3.0 of the License, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
27 #include <algorithm>
28 #include <numeric>
29 
30 namespace Euclid {
31 
32 namespace {
33 
34 class Worker {
35 
36 public:
37  Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue, std::atomic<bool>& run_flag,
38  std::atomic<bool>& sleeping_flag, std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39  std::exception_ptr& exception_ptr)
40  : m_queue_mutex(queue_mutex)
41  , m_queue(queue)
42  , m_run_flag(run_flag)
43  , m_sleeping_flag(sleeping_flag)
44  , m_done_flag(done_flag)
45  , m_empty_queue_wait_time(empty_queue_wait_time)
46  , m_exception_ptr(exception_ptr) {}
47 
48  void operator()() {
49  while (m_run_flag.get() && m_exception_ptr == nullptr) {
50  // Check if there is anything it the queue to be done and get it
51  std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
53  if (!m_queue.get().empty()) {
54  task_ptr = Euclid::make_unique<ThreadPool::Task>(m_queue.get().front());
55  m_queue.get().pop_front();
56  }
57  lock.unlock();
58 
59  // If we have some work to do, do it. Otherwise sleep for some time.
60  if (task_ptr) {
61  try {
62  (*task_ptr)();
63  } catch (...) {
65  }
66  } else {
67  m_sleeping_flag.get() = true;
69  m_sleeping_flag.get() = false;
70  }
71  }
72  // Indicate that the worker is done
73  m_sleeping_flag.get() = true;
74  m_done_flag.get() = true;
75  m_run_flag.get() = false;
76  }
77 
78 private:
86 };
87 
88 } // end of anonymous namespace
89 
90 ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
91  : m_worker_run_flags(thread_count)
92  , m_worker_sleeping_flags(thread_count)
93  , m_worker_done_flags(thread_count)
94  , m_empty_queue_wait_time(empty_queue_wait_time) {
95  for (unsigned int i = 0; i < thread_count; ++i) {
96  m_worker_run_flags.at(i) = true;
97  m_worker_sleeping_flags.at(i) = false;
98  m_worker_done_flags.at(i) = false;
101  }
102 }
103 
104 namespace {
105 
106 void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
107  // Now wait until all the workers have finish any current tasks
108  for (auto& flag : worker_flags) {
109  while (!flag) {
111  }
112  }
113 }
114 
115 } // namespace
116 
117 bool ThreadPool::checkForException(bool rethrow) {
118  if (m_exception_ptr) {
119  if (rethrow) {
121  } else {
122  return true;
123  }
124  }
125  return false;
126 }
127 
128 size_t ThreadPool::queued() const {
130  return m_queue.size();
131 }
132 
133 size_t ThreadPool::running() const {
136  return m_worker_sleeping_flags.size() - sleeping;
137 }
138 
141  return m_worker_done_flags.size() - done;
142 }
143 
145  // Wait for the queue to be empty
146  bool queue_is_empty = false;
147  while (!queue_is_empty && m_exception_ptr == nullptr) {
149  queue_is_empty = m_queue.empty();
150  lock.unlock();
151  if (!queue_is_empty) {
153  }
154  }
155  // Wait for the workers to finish the currently executing tasks
157  // Check if any worker finished with an exception
158  checkForException(true);
159 }
160 
162  // Stop all the workers. They will stop right after they finish the task
163  // they already run.
165  // Now wait until all the workers have finish any current tasks
167  for (auto& worker : m_workers) {
168  worker.join();
169  }
170 }
171 
174  if (m_worker_run_flags.empty()) {
175  task();
176  } else {
177  m_queue.emplace_back(std::move(task));
178  }
179 }
180 
181 } // namespace Euclid
std::reference_wrapper< std::mutex > m_queue_mutex
Definition: ThreadPool.cpp:79
std::reference_wrapper< std::atomic< bool > > m_run_flag
Definition: ThreadPool.cpp:81
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
Definition: ThreadPool.cpp:82
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
Definition: ThreadPool.cpp:80
std::reference_wrapper< std::atomic< bool > > m_done_flag
Definition: ThreadPool.cpp:83
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.cpp:84
std::reference_wrapper< std::exception_ptr > m_exception_ptr
Definition: ThreadPool.cpp:85
T accumulate(T... args)
T at(T... args)
T begin(T... args)
void submit(Task task)
Submit a task to be executed.
Definition: ThreadPool.cpp:172
std::deque< Task > m_queue
Definition: ThreadPool.h:114
size_t running() const
Return the number of running tasks.
Definition: ThreadPool.cpp:133
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition: ThreadPool.h:111
size_t queued() const
Return the number of queued tasks.
Definition: ThreadPool.cpp:128
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.h:115
std::vector< std::thread > m_workers
Definition: ThreadPool.h:113
std::mutex m_queue_mutex
Definition: ThreadPool.h:109
std::vector< std::atomic< bool > > m_worker_run_flags
Definition: ThreadPool.h:110
std::vector< std::atomic< bool > > m_worker_done_flags
Definition: ThreadPool.h:112
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
Definition: ThreadPool.cpp:117
virtual ~ThreadPool()
Definition: ThreadPool.cpp:161
std::exception_ptr m_exception_ptr
Definition: ThreadPool.h:116
size_t activeThreads() const
Return the number of active workers (either running or sleeping)
Definition: ThreadPool.cpp:139
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
Definition: ThreadPool.cpp:90
T current_exception(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T fill(T... args)
T lock(T... args)
T move(T... args)
T rethrow_exception(T... args)
T size(T... args)
T sleep_for(T... args)