tlx
Loading...
Searching...
No Matches
thread_pool.cpp
Go to the documentation of this file.
1/*******************************************************************************
2 * tlx/thread_pool.cpp
3 *
4 * Part of tlx - http://panthema.net/tlx
5 *
6 * Copyright (C) 2015-2019 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the Boost Software License, Version 1.0
9 ******************************************************************************/
10
11#include <tlx/thread_pool.hpp>
12
13#include <iostream>
14
15namespace tlx {
16
17ThreadPool::ThreadPool(size_t num_threads, InitThread&& init_thread)
18 : threads_(num_threads),
19 init_thread_(std::move(init_thread)) {
20 // immediately construct worker threads
21 for (size_t i = 0; i < num_threads; ++i)
22 threads_[i] = std::thread(&ThreadPool::worker, this, i);
23}
24
26 std::unique_lock<std::mutex> lock(mutex_);
27 // set stop-condition
28 terminate_ = true;
29 cv_jobs_.notify_all();
30 lock.unlock();
31
32 // all threads terminate, then we're done
33 for (size_t i = 0; i < threads_.size(); ++i)
34 threads_[i].join();
35}
36
38 std::unique_lock<std::mutex> lock(mutex_);
39 jobs_.emplace_back(std::move(job));
40 cv_jobs_.notify_one();
41}
42
44 std::unique_lock<std::mutex> lock(mutex_);
45 cv_finished_.wait(lock, [this]() { return jobs_.empty() && (busy_ == 0); });
46 std::atomic_thread_fence(std::memory_order_seq_cst);
47}
48
50 std::unique_lock<std::mutex> lock(mutex_);
51 cv_finished_.wait(lock, [this]() { return terminate_ && (busy_ == 0); });
52 std::atomic_thread_fence(std::memory_order_seq_cst);
53}
54
56 std::unique_lock<std::mutex> lock(mutex_);
57 // flag termination
58 terminate_ = true;
59 // wake up all worker threads and let them terminate.
60 cv_jobs_.notify_all();
61 // notify LoopUntilTerminate in case all threads are idle.
62 cv_finished_.notify_one();
63}
64
65size_t ThreadPool::done() const {
66 return done_;
67}
68
69size_t ThreadPool::size() const {
70 return threads_.size();
71}
72
73size_t ThreadPool::idle() const {
74 return idle_;
75}
76
78 return (idle_.load(std::memory_order_relaxed) != 0);
79}
80
81std::thread& ThreadPool::thread(size_t i) {
82 assert(i < threads_.size());
83 return threads_[i];
84}
85
86void ThreadPool::worker(size_t p) {
87 if (init_thread_)
88 init_thread_(p);
89
90 // lock mutex, it is released during condition waits
91 std::unique_lock<std::mutex> lock(mutex_);
92
93 while (true) {
94 // wait on condition variable until job arrives, frees lock
95 if (!terminate_ && jobs_.empty()) {
96 ++idle_;
97 cv_jobs_.wait(
98 lock, [this]() { return terminate_ || !jobs_.empty(); });
99 --idle_;
100 }
101
102 if (terminate_)
103 break;
104
105 if (!jobs_.empty()) {
106 // got work. set busy.
107 ++busy_;
108
109 {
110 // pull job.
111 Job job = std::move(jobs_.front());
112 jobs_.pop_front();
113
114 // release lock.
115 lock.unlock();
116
117 // execute job.
118 try {
119 job();
120 }
121 catch (std::exception& e) {
122 std::cerr << "EXCEPTION: " << e.what() << std::endl;
123 }
124 // destroy job by closing scope
125 }
126
127 // release memory the Job changed
128 std::atomic_thread_fence(std::memory_order_seq_cst);
129
130 ++done_;
131 --busy_;
132
133 // relock mutex before signaling condition.
134 lock.lock();
135 cv_finished_.notify_one();
136 }
137 }
138}
139
140} // namespace tlx
141
142/******************************************************************************/
void loop_until_terminate()
Loop until terminate flag was set.
std::atomic< size_t > idle_
Counter for number of idle threads waiting for a job.
std::atomic< size_t > busy_
Counter for number of threads busy.
std::atomic< size_t > done_
Counter for total number of jobs executed.
std::thread & thread(size_t i)
Return thread handle to thread i.
size_t size() const
Return number of threads in pool.
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
void worker(size_t p)
Worker function, one per thread is started.
simple_vector< std::thread > threads_
threads in pool
Delegate< void()> Job
Delegate< void(size_t)> InitThread
std::atomic< bool > terminate_
Flag whether to terminate.
std::condition_variable cv_jobs_
Condition variable used to notify that a new job has been inserted in the queue.
void terminate()
Terminate thread pool gracefully, wait until currently running jobs finish and then exit.
bool has_idle() const
true if any thread is idle (= waiting for jobs)
size_t done() const
Return number of jobs currently completed.
void loop_until_empty()
Loop until no more jobs are in the queue AND all threads are idle.
std::deque< Job > jobs_
Deque of scheduled jobs.
ThreadPool(size_t num_threads=std::thread::hardware_concurrency(), InitThread &&init_thread=InitThread())
Construct running thread pool of num_threads.
InitThread init_thread_
Run once per worker thread.
void enqueue(Job &&job)
enqueue a Job, the caller must pass in all context using captures.
~ThreadPool()
Stop processing jobs, terminate threads.
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
size_t idle() const
return number of idle threads in pool
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition join.cpp:16
STL namespace.