#include "LocalException.h" #include "Publisher.h" #include <nlohmann/json.hpp> #include <regex> #include <chrono> #include <condition_variable> #define PAHO_MQTT_IMPORTS #include <mqtt/async_client.h> #include <boost/date_time/posix_time/posix_time.hpp> int MQTT::Publisher::instances = 0; void MQTT::Publisher::loop(void) { MQTT::MessageContainer message; while (worker_running.load(std::memory_order_consume)) { if (queue_size.load(std::memory_order_consume) > 0) { std::unique_lock<std::mutex> queue_lock(queue_mutex); message = queue.front(); queue.pop_front(); queue_size.store(static_cast<int>(queue.size())); } else { std::unique_lock<std::mutex> queue_lock(queue_mutex); while (queue_size.load(std::memory_order_consume) == 0) { queue_condition.wait(queue_lock); } continue; } try { auto paho_message = mqtt::make_message((configuration.root + message.topic).c_str(), message.message.c_str(), message.message.size(), message.qos, message.retain); auto token = client->publish(paho_message); } catch (std::exception&) { /* pass */ } } } void MQTT::Publisher::restart_worker(void) { if (!worker_running.load(std::memory_order_consume)) { std::unique_lock<std::mutex> queue_lock(queue_mutex); queue.clear(); worker_running.store(true, std::memory_order_release); worker.reset(new std::thread(&MQTT::Publisher::loop, this)); } } void MQTT::Publisher::stop_worker(void) { worker_running.store(false, std::memory_order_release); std::unique_lock<std::mutex> queue_lock(queue_mutex); queue_condition.notify_all(); } MQTT::Publisher::Publisher(std::string id, unsigned int buffer) : id(id), buffer(buffer) { if (instances == 0) { /* pass */; } instances++; worker_running.store(false, std::memory_order_release); queue.resize(0); queue_size.store(0, std::memory_order_release); } MQTT::Publisher::~Publisher() { if (this->is_connected()) { this->disconnect(); } if (worker != nullptr) { worker->detach(); } instances--; if (instances == 0) { /* pass */; } } void MQTT::Publisher::connect(MQTT::Configuration configuration) { try { client.reset(new mqtt::async_client(configuration.uri(), id)); auto ssl = mqtt::ssl_options_builder(); auto options = mqtt::connect_options_builder(); options.user_name(configuration.username); options.password(configuration.password); options.clean_session(configuration.clean_session); options.keep_alive_interval(std::chrono::seconds(configuration.keep_alive)); options.connect_timeout(std::chrono::seconds(configuration.connection_timeout_s)); if (configuration.ssl) { ssl.enable_server_cert_auth(configuration.verify); ssl.trust_store(configuration.certificate_authority); ssl.ssl_version(MQTT_SSL_VERSION_DEFAULT); options.ssl(ssl.finalize()); } auto token = client->connect(options.finalize()); auto response = token->get_connect_response(); if (!client->is_connected()) { throw std::runtime_error(mqtt::exception(token->get_return_code())); } restart_worker(); } catch (std::exception& exc) { throw MQTT::Exception(exc); } } void MQTT::Publisher::connect() { this->connect(this->configuration); } void MQTT::Publisher::reconnect(void) { try { auto token = client->reconnect(); auto response = token->get_connect_response(); if (!client->is_connected()) { throw std::runtime_error(mqtt::exception(token->get_return_code())); } restart_worker(); } catch (std::exception& exc) { throw MQTT::Exception(exc); } } void MQTT::Publisher::disconnect(unsigned int timeout) { if (client.get() != NULL) { auto token = client->disconnect(); token->wait_for(timeout); } stop_worker(); client.reset(); } void MQTT::Publisher::set_root_topic(std::string root_topic) { configuration.root = root_topic; if (configuration.root.length() > 0) { std::string::iterator last_character = configuration.root.end() - 1; if ((*last_character) != '/') { configuration.root += "/"; } } } void MQTT::Publisher::set_buffer(unsigned int buffer) { this->buffer = buffer; } bool MQTT::Publisher::is_connected(void) { if (this->client == nullptr) { return false; } try { return client->is_connected(); } catch (std::exception& exc) { throw MQTT::Exception(exc); } } void MQTT::Publisher::configure(MQTT::Configuration configuration) { this->configuration = configuration; if (configuration.root.length() > 0) { std::string::iterator last_character = configuration.root.end() - 1; if ((*last_character) != '/') { configuration.root += "/"; } } } bool MQTT::Publisher::publish(std::string topic, std::string message, int qos, bool retain) { MQTT::MessageContainer mqtt_message = { topic, message, qos, retain }; if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer))) { std::unique_lock<std::mutex> queue_lock(queue_mutex); queue.push_back(mqtt_message); queue_size.store(static_cast<int>(queue.size()), std::memory_order_release); queue_condition.notify_all(); return true; } else { return false; } } bool MQTT::Publisher::publish(std::vector<std::string> topics, std::vector<std::string> messages, int qos, bool retain) { int n = static_cast<int>(topics.size()); std::vector<MQTT::MessageContainer> mqtt_messages(n); for (int i = 0; i < n; i++) { mqtt_messages.at(i) = { topics.at(i), messages.at(i), qos, retain }; } if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer))) { std::unique_lock<std::mutex> queue_lock(queue_mutex); queue.insert(queue.end(), mqtt_messages.begin(), mqtt_messages.end()); queue_size.store(static_cast<int>(queue.size()), std::memory_order_release); queue_condition.notify_all(); return true; } else { return false; } }