3#include <nlohmann/json.hpp>
6#include <condition_variable>
7#define PAHO_MQTT_IMPORTS
8#include <mqtt/async_client.h>
9#include <boost/date_time/posix_time/posix_time.hpp>
12int MQTT::Publisher::instances = 0;
16void MQTT::Publisher::loop(
void)
19 while (worker_running.load(std::memory_order_consume))
21 if (queue_size.load(std::memory_order_consume) > 0)
23 std::unique_lock<std::mutex> queue_lock(queue_mutex);
24 message = queue.front();
26 queue_size.store(
static_cast<int>(queue.size()));
30 std::unique_lock<std::mutex> queue_lock(queue_mutex);
31 while (queue_size.load(std::memory_order_consume) == 0)
33 queue_condition.wait(queue_lock);
38 auto paho_message = mqtt::make_message((configuration.
root + message.
topic).c_str(), message.
message.c_str(), message.
message.size(), message.
qos, message.
retain);
39 auto token = client->publish(paho_message);
41 catch (std::exception&)
48void MQTT::Publisher::restart_worker(
void)
50 if (!worker_running.load(std::memory_order_consume))
52 std::unique_lock<std::mutex> queue_lock(queue_mutex);
54 worker_running.store(
true, std::memory_order_release);
55 worker.reset(
new std::thread(&MQTT::Publisher::loop,
this));
59void MQTT::Publisher::stop_worker(
void)
61 worker_running.store(
false, std::memory_order_release);
62 std::unique_lock<std::mutex> queue_lock(queue_mutex);
63 queue_condition.notify_all();
75 worker_running.store(
false, std::memory_order_release);
77 queue_size.store(0, std::memory_order_release);
97 client.reset(
new mqtt::async_client(configuration.
uri(),
id));
99 auto ssl = mqtt::ssl_options_builder();
100 auto options = mqtt::connect_options_builder();
101 options.user_name(configuration.
username);
102 options.password(configuration.
password);
104 options.keep_alive_interval(std::chrono::seconds(configuration.
keep_alive));
107 if (configuration.
ssl)
109 ssl.enable_server_cert_auth(configuration.
verify);
111 ssl.ssl_version(MQTT_SSL_VERSION_DEFAULT);
112 options.ssl(ssl.finalize());
116 auto token = client->connect(options.finalize());
117 auto response = token->get_connect_response();
119 if (!client->is_connected())
121 throw std::runtime_error(mqtt::exception(token->get_return_code()));
126 catch (std::exception& exc)
134 this->connect(this->configuration);
142 auto token = client->reconnect();
143 auto response = token->get_connect_response();
145 if (!client->is_connected())
147 throw std::runtime_error(mqtt::exception(token->get_return_code()));
151 catch (std::exception& exc)
159 if (client.get() != NULL)
161 auto token = client->disconnect();
162 token->wait_for(timeout);
171 configuration.root = root_topic;
172 std::string::iterator last_character = configuration.root.end() - 1;
173 if ((*last_character) !=
'/')
175 configuration.root +=
"/";
181 this->buffer = buffer;
188 return client->is_connected();
190 catch (std::exception& exc)
198 this->configuration = configuration;
199 std::string::iterator last_character = this->configuration.
root.end() - 1;
200 if ((*last_character) !=
'/')
202 this->configuration.
root +=
"/";
210 if ((qos > 0) || (queue_size.load(std::memory_order_consume) <=
static_cast<int>(buffer)))
212 std::unique_lock<std::mutex> queue_lock(queue_mutex);
213 queue.push_back(mqtt_message);
214 queue_size.store(
static_cast<int>(queue.size()), std::memory_order_release);
215 queue_condition.notify_all();
226 int n =
static_cast<int>(topics.size());
227 std::vector<MQTT::MessageContainer> mqtt_messages(n);
228 for (
int i = 0; i < n; i++)
230 mqtt_messages.at(i) = { topics.at(i), messages.at(i), qos, retain };
233 if ((qos > 0) || (queue_size.load(std::memory_order_consume) <=
static_cast<int>(buffer)))
235 std::unique_lock<std::mutex> queue_lock(queue_mutex);
236 queue.insert(queue.end(), mqtt_messages.begin(), mqtt_messages.end());
237 queue_size.store(
static_cast<int>(queue.size()), std::memory_order_release);
238 queue_condition.notify_all();
MQTT publishing configuration.
std::string root
MQTT root topic.
int keep_alive
Keep alive interval in seconds.
bool ssl
Use secured connection.
bool clean_session
Clean session flag.
std::string password
Password for connecting to the MQTT broker.
std::string certificate_authority
Path to CA PEM-file.
bool verify
Skip SSL verification.
std::string uri()
URI Builder.
int connection_timeout_s
Connection timeout in seconds.
std::string username
Username for connecting to the MQTT broker.
MQTT Publisher Exception.
void connect()
Connect to broker.
void reconnect(void)
Direct Reconnect.
void configure(MQTT::Configuration configuration)
Register new configuration.
bool publish(std::string topic, std::string message, int qos, bool retain)
Publish an MQTT message.
void set_root_topic(std::string root_topic)
Set new root topic.
void set_buffer(unsigned int buffer)
Set message queue size.
bool is_connected(void)
Is connected?
Publisher(std::string id, unsigned int buffer=1024)
Constructor.
void disconnect(unsigned int timeout=10000)
Diconnect from the broker.
Internal MQTT message struct.
int qos
Quality of Service.
std::string message
Message.
std::string topic
Message Topic.