34#ifndef __mqtt_async_client_h
35#define __mqtt_async_client_h
66#if defined(PAHO_MQTTPP_VERSIONS)
68const uint32_t PAHO_MQTTPP_VERSION = 0x01050000;
70const string PAHO_MQTTPP_VERSION_STR(
"Paho MQTT C++ (mqttpp) v. 1.5.0");
72const string PAHO_MQTTPP_COPYRIGHT(
"Copyright (c) 2013-2024 Frank Pagliughi");
79const string COPYRIGHT(
"Copyright (c) 2013-2024 Frank Pagliughi");
124 using ptr_t = std::shared_ptr<async_client>;
139 using guard = std::unique_lock<std::mutex>;
141 using unique_lock = std::unique_lock<std::mutex>;
144 mutable std::mutex lock_;
152 std::unique_ptr<MQTTClient_persistence> persist_{};
166 connect_options connOpts_;
170 std::list<token_ptr> pendingTokens_;
172 std::list<delivery_token_ptr> pendingDeliveryTokens_;
177 static void on_connected(
void* context,
char* cause);
178 static void on_connection_lost(
void* context,
char* cause);
179 static void on_disconnected(
180 void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
182 static int on_message_arrived(
183 void* context,
char* topicName,
int topicLen, MQTTAsync_message* msg
185 static void on_delivery_complete(
void* context, MQTTAsync_token tok);
186 static int on_update_connection(
void* context, MQTTAsync_connectData* cdata);
192 virtual void remove_token(
token* tok)
override;
193 virtual void remove_token(
token_ptr tok) { remove_token(tok.get()); }
197 async_client() =
delete;
198 async_client(
const async_client&) =
delete;
199 async_client& operator=(
const async_client&) =
delete;
202 static void check_ret(
int rc) {
203 if (rc != MQTTASYNC_SUCCESS)
226 explicit async_client(
const string& serverURI,
const string& clientId =
string{})
241 const string& serverURI,
const string& clientId,
const persistence_type& persistence
243 : createOpts_{serverURI, clientId, persistence} {
260 const string& serverURI,
const string& clientId,
int maxBufferedMessages,
263 : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
279 const string& serverURI,
const string& clientId,
const create_options& opts,
282 : createOpts_{serverURI, clientId, opts, persistence} {
431 template <
class Rep,
class Period>
465 template <
class Rep,
class Period>
467 const std::chrono::duration<Rep, Period>& timeout,
void* userContext,
544 string_ref topic,
const void* payload,
size_t n,
int qos,
bool retained,
603 string_ref topic,
const void* payload,
size_t n,
int qos,
bool retained,
639 const string& topicFilter,
int qos,
661 const string& topicFilter,
int qos,
void* userContext,
iaction_listener& cb,
681 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
704 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
795 if (que_) que_->clear();
805 return !que_ || que_->closed();
815 return !que_ || que_->done();
826 return (que_) ? que_->size() : 0;
849 template <
typename Rep,
class Period>
851 event* evt,
const std::chrono::duration<Rep, Period>& relTime
857 return que_->try_get_for(evt, relTime);
870 template <
typename Rep,
class Period>
874 que_->try_get_for(&evt, relTime);
888 template <
class Clock,
class Duration>
890 event* evt,
const std::chrono::time_point<Clock, Duration>& absTime
896 return que_->try_get_until(evt, absTime);
909 template <
class Clock,
class Duration>
914 que_->try_get_until(&evt, absTime);
942 template <
typename Rep,
class Period>
955 if (
const auto* pval = evt.get_message_if()) {
956 *msg = std::move(*pval);
960 if (evt.is_any_disconnect()) {
973 template <
typename Rep,
class Period>
975 const std::chrono::duration<Rep, Period>& relTime
988 template <
class Clock,
class Duration>
1001 if (
const auto* pval = evt.get_message_if()) {
1002 *msg = std::move(*pval);
1006 if (!evt.is_any_disconnect()) {
1019 template <
class Clock,
class Duration>
1021 const std::chrono::time_point<Clock, Duration>& absTime
Definition async_client.h:121
void set_connection_lost_handler(connection_handler cb)
bool try_consume_event_until(event *evt, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:889
delivery_token_ptr publish(const_message_ptr msg, void *userContext, iaction_listener &cb) override
bool consumer_done() noexcept override
Definition async_client.h:814
async_client(const string &serverURI, const string &clientId, const persistence_type &persistence)
Definition async_client.h:240
void set_disconnected_handler(disconnected_handler cb)
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:1020
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
token_ptr disconnect(disconnect_options opts) override
bool try_consume_message(const_message_ptr *msg) override
token_ptr disconnect() override
Definition async_client.h:399
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition async_client.h:555
delivery_token_ptr publish(string_ref topic, binary_ref payload, int qos, bool retained, const properties &props=properties()) override
token_ptr subscribe(const string &topicFilter, int qos, void *userContext, iaction_listener &cb, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
std::shared_ptr< async_client > ptr_t
Definition async_client.h:124
bool consumer_closed() noexcept override
Definition async_client.h:804
void stop_consuming() override
connect_options get_connect_options() const
Definition async_client.h:522
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:943
token_ptr disconnect(int timeout) override
Definition async_client.h:418
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:135
void set_message_callback(message_handler cb)
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition async_client.h:466
token_ptr connect(connect_options options, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, const properties &props=properties()) override
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition async_client.h:133
std::function< void(const_message_ptr)> message_handler
Definition async_client.h:129
std::size_t consumer_queue_size() const override
Definition async_client.h:825
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition async_client.h:582
string get_client_id() const override
Definition async_client.h:501
async_client(const string &serverURI, const string &clientId, int maxBufferedMessages, const persistence_type &persistence=persistence_type{})
Definition async_client.h:259
std::unique_ptr< thread_queue< event > > consumer_queue_type
Definition async_client.h:126
void set_callback(callback &cb) override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:989
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, void *userContext, iaction_listener &cb, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
const_message_ptr consume_message() override
token_ptr reconnect() override
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:974
void set_connected_handler(connection_handler cb)
bool try_consume_event_for(event *evt, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:850
async_client(const create_options &opts)
Definition async_client.h:293
token_ptr disconnect(int timeout, void *userContext, iaction_listener &cb) override
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, const properties &props=properties()) override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, void *userContext, iaction_listener &cb, const properties &props=properties()) override
void clear_consumer() override
Definition async_client.h:794
async_client(const string &serverURI, const string &clientId, const create_options &opts, const persistence_type &persistence)
Definition async_client.h:278
bool try_consume_event(event *evt) override
int mqtt_version() const noexcept
Definition async_client.h:516
token_ptr unsubscribe(const string &topicFilter, void *userContext, iaction_listener &cb, const properties &props=properties()) override
event try_consume_event_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:910
event consume_event() override
string get_server_uri() const override
Definition async_client.h:506
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition async_client.h:432
async_client(const string &serverURI, const string &clientId=string{})
Definition async_client.h:226
delivery_token_ptr get_pending_delivery_token(int msgID) const override
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition async_client.h:484
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition async_client.h:384
void disable_callbacks() override
void set_update_connection_handler(update_connection_handler cb)
token_ptr connect() override
bool is_connected() const override
Definition async_client.h:530
event try_consume_event_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:871
delivery_token_ptr publish(const_message_ptr msg) override
void start_consuming() override
token_ptr connect(connect_options options) override
std::function< void(const string &cause)> connection_handler
Definition async_client.h:131
Definition connect_options.h:571
Definition connect_options.h:50
Definition create_options.h:60
const string & get_server_uri() const noexcept
Definition create_options.h:199
const string & get_client_id() const noexcept
Definition create_options.h:209
Definition disconnect_options.h:41
Definition exception.h:48
Definition iaction_listener.h:50
Definition iasync_client.h:60
std::vector< int > qos_collection
Definition iasync_client.h:66
static constexpr bool DFLT_RETAINED
Definition message.h:62
static constexpr int DFLT_QOS
Definition message.h:60
Definition properties.h:293
Definition thread_queue.h:43
Definition subscribe_options.h:49
Definition async_client.h:60
ReasonCode
Definition reason_code.h:39
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:127
constexpr no_persistence NO_PERSISTENCE
Definition create_options.h:43
bool to_bool(int n)
Definition types.h:107
token::ptr_t token_ptr
Definition token.h:513
async_client::ptr_t async_client_ptr
Definition async_client.h:1030
const uint32_t VERSION
Definition async_client.h:75
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:259
message::const_ptr_t const_message_ptr
Definition message.h:377
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.0")
std::variant< no_persistence, string, iclient_persistence * > persistence_type
Definition create_options.h:52
const string COPYRIGHT("Copyright (c) 2013-2024 Frank Pagliughi")
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:95