SOIL C++
C++ Unified Device Interface
Publisher.cpp
Go to the documentation of this file.
1#include "LocalException.h"
2#include "Publisher.h"
3#include <nlohmann/json.hpp>
4#include <regex>
5#include <chrono>
6#include <condition_variable>
7#define PAHO_MQTT_IMPORTS
8#include <mqtt/async_client.h>
9#include <boost/date_time/posix_time/posix_time.hpp>
10
11
12int MQTT::Publisher::instances = 0;
13
14
15
16void MQTT::Publisher::loop(void)
17{
19 while (worker_running.load(std::memory_order_consume))
20 {
21 if (queue_size.load(std::memory_order_consume) > 0)
22 {
23 std::unique_lock<std::mutex> queue_lock(queue_mutex);
24 message = queue.front();
25 queue.pop_front();
26 queue_size.store(static_cast<int>(queue.size()));
27 }
28 else
29 {
30 std::unique_lock<std::mutex> queue_lock(queue_mutex);
31 while (queue_size.load(std::memory_order_consume) == 0)
32 {
33 queue_condition.wait(queue_lock);
34 }
35 continue;
36 }
37 try {
38 auto paho_message = mqtt::make_message((configuration.root + message.topic).c_str(), message.message.c_str(), message.message.size(), message.qos, message.retain);
39 auto token = client->publish(paho_message);
40 }
41 catch (std::exception&)
42 {
43 /* pass */
44 }
45 }
46}
47
48void MQTT::Publisher::restart_worker(void)
49{
50 if (!worker_running.load(std::memory_order_consume))
51 {
52 std::unique_lock<std::mutex> queue_lock(queue_mutex);
53 queue.clear();
54 worker_running.store(true, std::memory_order_release);
55 worker.reset(new std::thread(&MQTT::Publisher::loop, this));
56 }
57}
58
59void MQTT::Publisher::stop_worker(void)
60{
61 worker_running.store(false, std::memory_order_release);
62 std::unique_lock<std::mutex> queue_lock(queue_mutex);
63 queue_condition.notify_all();
64}
65
66MQTT::Publisher::Publisher(std::string id, unsigned int buffer) : id(id), buffer(buffer)
67{
68 if (instances == 0)
69 {
70 /* pass */;
71 }
72 instances++;
73
74
75 worker_running.store(false, std::memory_order_release);
76 queue.resize(0);
77 queue_size.store(0, std::memory_order_release);
78
79}
80
81
83{
84 this->disconnect();
85 worker->detach();
86 instances--;
87 if (instances == 0)
88 {
89 /* pass */;
90 }
91}
92
94{
95 try
96 {
97 client.reset(new mqtt::async_client(configuration.uri(), id));
98
99 auto ssl = mqtt::ssl_options_builder();
100 auto options = mqtt::connect_options_builder();
101 options.user_name(configuration.username);
102 options.password(configuration.password);
103 options.clean_session(configuration.clean_session);
104 options.keep_alive_interval(std::chrono::seconds(configuration.keep_alive));
105 options.connect_timeout(std::chrono::seconds(configuration.connection_timeout_s));
106
107 if (configuration.ssl)
108 {
109 ssl.enable_server_cert_auth(configuration.verify);
110 ssl.trust_store(configuration.certificate_authority);
111 ssl.ssl_version(MQTT_SSL_VERSION_DEFAULT);
112 options.ssl(ssl.finalize());
113 }
114
115
116 auto token = client->connect(options.finalize());
117 auto response = token->get_connect_response();
118
119 if (!client->is_connected())
120 {
121 throw std::runtime_error(mqtt::exception(token->get_return_code()));
122 }
123
124 restart_worker();
125 }
126 catch (std::exception& exc)
127 {
128 throw MQTT::Exception(exc);
129 }
130}
131
133{
134 this->connect(this->configuration);
135}
136
137
139{
140 try
141 {
142 auto token = client->reconnect();
143 auto response = token->get_connect_response();
144
145 if (!client->is_connected())
146 {
147 throw std::runtime_error(mqtt::exception(token->get_return_code()));
148 }
149 restart_worker();
150 }
151 catch (std::exception& exc)
152 {
153 throw MQTT::Exception(exc);
154 }
155}
156
157void MQTT::Publisher::disconnect(unsigned int timeout)
158{
159 if (client.get() != NULL)
160 {
161 auto token = client->disconnect();
162 token->wait_for(timeout);
163 }
164 stop_worker();
165 client.reset();
166}
167
168
169void MQTT::Publisher::set_root_topic(std::string root_topic)
170{
171 configuration.root = root_topic;
172 std::string::iterator last_character = configuration.root.end() - 1;
173 if ((*last_character) != '/')
174 {
175 configuration.root += "/";
176 }
177}
178
179void MQTT::Publisher::set_buffer(unsigned int buffer)
180{
181 this->buffer = buffer;
182}
183
185{
186 try
187 {
188 return client->is_connected();
189 }
190 catch (std::exception& exc)
191 {
192 throw MQTT::Exception(exc);
193 }
194}
195
197{
198 this->configuration = configuration;
199 std::string::iterator last_character = this->configuration.root.end() - 1;
200 if ((*last_character) != '/')
201 {
202 this->configuration.root += "/";
203 }
204}
205
206
207bool MQTT::Publisher::publish(std::string topic, std::string message, int qos, bool retain)
208{
209 MQTT::MessageContainer mqtt_message = { topic, message, qos, retain };
210 if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer)))
211 {
212 std::unique_lock<std::mutex> queue_lock(queue_mutex);
213 queue.push_back(mqtt_message);
214 queue_size.store(static_cast<int>(queue.size()), std::memory_order_release);
215 queue_condition.notify_all();
216 return true;
217 }
218 else
219 {
220 return false;
221 }
222}
223
224bool MQTT::Publisher::publish(std::vector<std::string> topics, std::vector<std::string> messages, int qos, bool retain)
225{
226 int n = static_cast<int>(topics.size());
227 std::vector<MQTT::MessageContainer> mqtt_messages(n);
228 for (int i = 0; i < n; i++)
229 {
230 mqtt_messages.at(i) = { topics.at(i), messages.at(i), qos, retain };
231 }
232
233 if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer)))
234 {
235 std::unique_lock<std::mutex> queue_lock(queue_mutex);
236 queue.insert(queue.end(), mqtt_messages.begin(), mqtt_messages.end());
237 queue_size.store(static_cast<int>(queue.size()), std::memory_order_release);
238 queue_condition.notify_all();
239 return true;
240 }
241 else
242 {
243 return false;
244 }
245
246}
MQTT publishing configuration.
Definition: Configuration.h:14
std::string root
MQTT root topic.
Definition: Configuration.h:64
int keep_alive
Keep alive interval in seconds.
Definition: Configuration.h:72
bool ssl
Use secured connection.
Definition: Configuration.h:99
bool clean_session
Clean session flag.
Definition: Configuration.h:55
std::string password
Password for connecting to the MQTT broker.
Definition: Configuration.h:47
std::string certificate_authority
Path to CA PEM-file.
bool verify
Skip SSL verification.
std::string uri()
URI Builder.
int connection_timeout_s
Connection timeout in seconds.
Definition: Configuration.h:90
std::string username
Username for connecting to the MQTT broker.
Definition: Configuration.h:39
MQTT Publisher Exception.
void connect()
Connect to broker.
Definition: Publisher.cpp:132
void reconnect(void)
Direct Reconnect.
Definition: Publisher.cpp:138
void configure(MQTT::Configuration configuration)
Register new configuration.
Definition: Publisher.cpp:196
bool publish(std::string topic, std::string message, int qos, bool retain)
Publish an MQTT message.
Definition: Publisher.cpp:207
void set_root_topic(std::string root_topic)
Set new root topic.
Definition: Publisher.cpp:169
~Publisher()
Destructor.
Definition: Publisher.cpp:82
void set_buffer(unsigned int buffer)
Set message queue size.
Definition: Publisher.cpp:179
bool is_connected(void)
Is connected?
Definition: Publisher.cpp:184
Publisher(std::string id, unsigned int buffer=1024)
Constructor.
Definition: Publisher.cpp:66
void disconnect(unsigned int timeout=10000)
Diconnect from the broker.
Definition: Publisher.cpp:157
Internal MQTT message struct.
int qos
Quality of Service.
std::string message
Message.
std::string topic
Message Topic.
bool retain
Retain Flag.