PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
client.h
Go to the documentation of this file.
1
7
8/*******************************************************************************
9 * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
10 *
11 * All rights reserved. This program and the accompanying materials
12 * are made available under the terms of the Eclipse Public License v2.0
13 * and Eclipse Distribution License v1.0 which accompany this distribution.
14 *
15 * The Eclipse Public License is available at
16 * http://www.eclipse.org/legal/epl-v20.html
17 * and the Eclipse Distribution License is available at
18 * http://www.eclipse.org/org/documents/edl-v10.php.
19 *
20 * Contributors:
21 * Frank Pagliughi - initial implementation and documentation
22 *******************************************************************************/
23
24#ifndef __mqtt_client_h
25#define __mqtt_client_h
26
27#include <future>
28
29#include "mqtt/async_client.h"
30
31namespace mqtt {
32
34
39class client : private callback
40{
42 PAHO_MQTTPP_EXPORT static const std::chrono::seconds DFLT_TIMEOUT;
44 PAHO_MQTTPP_EXPORT static const int DFLT_QOS; // =1;
45
47 async_client cli_;
49 std::chrono::milliseconds timeout_;
51 callback* userCallback_;
52
63 template <typename T>
64 std::shared_ptr<T> ptr(const T& val) {
65 return std::shared_ptr<T>(const_cast<T*>(&val), [](T*) {});
66 }
67
68 // User callbacks
69 // Most are launched in a separate thread, for convenience, except
70 // message_arrived, for performance.
71 void connected(const string& cause) override {
72 std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
73 }
74 void connection_lost(const string& cause) override {
75 std::async(std::launch::async, &callback::connection_lost, userCallback_, cause)
76 .wait();
77 }
78 void message_arrived(const_message_ptr msg) override {
79 userCallback_->message_arrived(msg);
80 }
81 void delivery_complete(delivery_token_ptr tok) override {
82 std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok)
83 .wait();
84 }
85
87 client() = delete;
88 client(const async_client&) = delete;
89 client& operator=(const async_client&) = delete;
90
91public:
93 using ptr_t = std::shared_ptr<client>;
96
99
112 const string& serverURI, const string& clientId,
113 iclient_persistence* persistence = nullptr
114 );
125 client(const string& serverURI, const string& clientId, const string& persistDir);
141 const string& serverURI, const string& clientId, int maxBufferedMessages,
142 iclient_persistence* persistence = nullptr
143 );
157 const string& serverURI, const string& clientId, int maxBufferedMessages,
158 const string& persistDir
159 );
174 const string& serverURI, const string& clientId, const create_options& opts,
175 iclient_persistence* persistence = nullptr
176 );
180 virtual ~client() {}
198 virtual void disconnect();
205 virtual void disconnect(int timeoutMS);
212 template <class Rep, class Period>
213 void disconnect(const std::chrono::duration<Rep, Period>& to) {
215 }
216
220 virtual string get_client_id() const { return cli_.get_client_id(); }
225 virtual string get_server_uri() const { return cli_.get_server_uri(); }
230 virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
236 connect_options get_connect_options() const { return cli_.get_connect_options(); }
246 const string& top, int qos = message::DFLT_QOS, bool retained = message::DFLT_RETAINED
247 ) {
248 return topic(cli_, top, qos, retained);
249 }
250
255 virtual bool is_connected() const { return cli_.is_connected(); }
262 cli_.set_update_connection_handler(cb);
263 }
264
274 virtual void publish(
275 string_ref top, const void* payload, size_t n, int qos, bool retained
276 ) {
277 if (!cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_))
278 throw timeout_error();
279 }
280
287 virtual void publish(string_ref top, const void* payload, size_t n) {
288 if (!cli_.publish(std::move(top), payload, n)->wait_for(timeout_))
289 throw timeout_error();
290 }
291
295 virtual void publish(const_message_ptr msg) {
296 if (!cli_.publish(msg)->wait_for(timeout_))
297 throw timeout_error();
298 }
299
306 virtual void publish(const message& msg) { cli_.publish(ptr(msg))->wait(); }
312 virtual void set_callback(callback& cb);
317 virtual void set_timeout(int timeoutMS) {
318 timeout_ = std::chrono::milliseconds(timeoutMS);
319 }
320
324 template <class Rep, class Period>
325 void set_timeout(const std::chrono::duration<Rep, Period>& to) {
326 timeout_ = to_milliseconds(to);
327 }
328
336 const string& topicFilter, const subscribe_options& opts = subscribe_options(),
337 const properties& props = properties()
338 );
348 const string& topicFilter, int qos,
349 const subscribe_options& opts = subscribe_options(),
350 const properties& props = properties()
351 );
361 const string_collection& topicFilters,
362 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
363 const properties& props = properties()
364 );
374 const string_collection& topicFilters, const qos_collection& qos,
375 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
376 const properties& props = properties()
377 );
385 const string& topicFilter, const properties& props = properties()
386 );
394 const string_collection& topicFilters, const properties& props = properties()
395 );
401 virtual void start_consuming() { cli_.start_consuming(); }
407 virtual void stop_consuming() { cli_.stop_consuming(); }
413 virtual const_message_ptr consume_message() { return cli_.consume_message(); }
421 return cli_.try_consume_message(msg);
422 }
423
430 template <typename Rep, class Period>
432 const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
433 ) {
434 return cli_.try_consume_message_for(msg, relTime);
435 }
436
443 template <class Clock, class Duration>
445 const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
446 ) {
447 return cli_.try_consume_message_until(msg, absTime);
448 }
449};
450
453
455} // namespace mqtt
456
457#endif // __mqtt_client_h
Definition async_client.h:121
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:135
Definition callback.h:43
virtual void connected(const string &)
Definition callback.h:61
virtual void connection_lost(const string &)
Definition callback.h:65
virtual void delivery_complete(delivery_token_ptr)
Definition callback.h:74
virtual void message_arrived(const_message_ptr)
Definition callback.h:69
virtual subscribe_response subscribe(const string &topicFilter, const subscribe_options &opts=subscribe_options(), const properties &props=properties())
std::shared_ptr< client > ptr_t
Definition client.h:93
virtual void stop_consuming()
Definition client.h:407
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition client.h:444
virtual bool try_consume_message(const_message_ptr *msg)
Definition client.h:420
virtual void publish(const message &msg)
Definition client.h:306
void set_update_connection_handler(update_connection_handler cb)
Definition client.h:261
virtual void disconnect(int timeoutMS)
virtual void publish(const_message_ptr msg)
Definition client.h:295
virtual subscribe_response subscribe(const string_collection &topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties())
client(const string &serverURI, const string &clientId, int maxBufferedMessages, const string &persistDir)
virtual void disconnect()
virtual const_message_ptr consume_message()
Definition client.h:413
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition client.h:431
virtual void set_callback(callback &cb)
void disconnect(const std::chrono::duration< Rep, Period > &to)
Definition client.h:213
virtual string get_server_uri() const
Definition client.h:225
virtual void start_consuming()
Definition client.h:401
virtual connect_response connect(connect_options opts)
virtual subscribe_response subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties())
virtual unsubscribe_response unsubscribe(const string_collection &topicFilters, const properties &props=properties())
async_client::update_connection_handler update_connection_handler
Definition client.h:98
virtual topic get_topic(const string &top, int qos=message::DFLT_QOS, bool retained=message::DFLT_RETAINED)
Definition client.h:245
void set_timeout(const std::chrono::duration< Rep, Period > &to)
Definition client.h:325
connect_options get_connect_options() const
Definition client.h:236
virtual std::chrono::milliseconds get_timeout() const
Definition client.h:230
virtual subscribe_response subscribe(const string_collection &topicFilters, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties())
virtual bool is_connected() const
Definition client.h:255
client(const string &serverURI, const string &clientId, iclient_persistence *persistence=nullptr)
virtual unsubscribe_response unsubscribe(const string &topicFilter, const properties &props=properties())
virtual void publish(string_ref top, const void *payload, size_t n, int qos, bool retained)
Definition client.h:274
virtual connect_response connect()
async_client::qos_collection qos_collection
Definition client.h:95
client(const string &serverURI, const string &clientId, int maxBufferedMessages, iclient_persistence *persistence=nullptr)
virtual connect_response reconnect()
virtual void publish(string_ref top, const void *payload, size_t n)
Definition client.h:287
client(const string &serverURI, const string &clientId, const string &persistDir)
virtual ~client()
Definition client.h:180
virtual string get_client_id() const
Definition client.h:220
virtual void set_timeout(int timeoutMS)
Definition client.h:317
client(const string &serverURI, const string &clientId, const create_options &opts, iclient_persistence *persistence=nullptr)
Definition connect_options.h:50
Definition server_response.h:73
Definition create_options.h:60
std::vector< int > qos_collection
Definition iasync_client.h:66
Definition iclient_persistence.h:70
Definition message.h:57
static constexpr bool DFLT_RETAINED
Definition message.h:62
static constexpr int DFLT_QOS
Definition message.h:60
Definition properties.h:293
Definition string_collection.h:45
Definition subscribe_options.h:49
Definition exception.h:196
Definition topic.h:45
Definition server_response.h:172
#define PAHO_MQTTPP_EXPORT
Definition export.h:40
Definition async_client.h:60
client::ptr_t client_ptr
Definition client.h:452
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:127
message::const_ptr_t const_message_ptr
Definition message.h:377
std::chrono::milliseconds to_milliseconds(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:84
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:95
buffer_ref< char > string_ref
Definition buffer_ref.h:297
Definition server_response.h:131