25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
29#include <condition_variable>
84template <
typename T,
class Container = std::deque<T>>
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
111 std::queue<T, Container> que_;
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
119 bool is_done()
const {
return closed_ && que_.empty(); }
178 notFullCond_.notify_all();
179 notEmptyCond_.notify_all();
208 while (!que_.empty()) que_.pop();
209 notFullCond_.notify_all();
218 unique_guard g{lock_};
219 notFullCond_.wait(g, [
this] {
return que_.size() < cap_ || closed_; });
223 que_.emplace(std::move(val));
224 notEmptyCond_.notify_one();
234 if (que_.size() >= cap_ || closed_)
237 que_.emplace(std::move(val));
238 notEmptyCond_.notify_one();
250 template <
typename Rep,
class Period>
252 unique_guard g{lock_};
253 bool to = !notFullCond_.wait_for(g, relTime, [
this] {
254 return que_.size() < cap_ || closed_;
259 que_.emplace(std::move(val));
260 notEmptyCond_.notify_one();
273 template <
class Clock,
class Duration>
275 value_type val,
const std::chrono::time_point<Clock, Duration>& absTime
277 unique_guard g{lock_};
278 bool to = !notFullCond_.wait_until(g, absTime, [
this] {
279 return que_.size() < cap_ || closed_;
285 que_.emplace(std::move(val));
286 notEmptyCond_.notify_one();
299 unique_guard g{lock_};
300 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
304 *val = std::move(que_.front());
306 notFullCond_.notify_one();
316 unique_guard g{lock_};
317 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
323 notFullCond_.notify_one();
342 *val = std::move(que_.front());
344 notFullCond_.notify_one();
357 template <
typename Rep,
class Period>
362 unique_guard g{lock_};
363 notEmptyCond_.wait_for(g, relTime, [
this] {
return !que_.empty() || closed_; });
368 *val = std::move(que_.front());
370 notFullCond_.notify_one();
383 template <
class Clock,
class Duration>
385 value_type* val,
const std::chrono::time_point<Clock, Duration>& absTime
390 unique_guard g{lock_};
391 notEmptyCond_.wait_until(g, absTime, [
this] {
return !que_.empty() || closed_; });
395 *val = std::move(que_.front());
397 notFullCond_.notify_one();
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:165
void capacity(size_type cap)
Definition thread_queue.h:157
bool done() const
Definition thread_queue.h:198
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:232
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:358
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:251
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:334
thread_queue()
Definition thread_queue.h:126
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:274
void clear()
Definition thread_queue.h:206
bool closed() const
Definition thread_queue.h:188
bool get(value_type *val)
Definition thread_queue.h:295
bool empty() const
Definition thread_queue.h:139
void close()
Definition thread_queue.h:175
size_type capacity() const
Definition thread_queue.h:147
thread_queue(size_t cap)
Definition thread_queue.h:133
void put(value_type val)
Definition thread_queue.h:217
value_type get()
Definition thread_queue.h:315
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:384
Definition async_client.h:60