#include "LocalException.h"
#include "Publisher.h"
#include <nlohmann/json.hpp>
#include <regex>
#include <chrono>
#include <condition_variable>
#define PAHO_MQTT_IMPORTS
#include <mqtt/async_client.h>
#include <boost/date_time/posix_time/posix_time.hpp>


int MQTT::Publisher::instances = 0;



void MQTT::Publisher::loop(void)
{
	MQTT::MessageContainer message;
	while (worker_running.load(std::memory_order_consume))
	{
		if (queue_size.load(std::memory_order_consume) > 0)
		{
			std::unique_lock<std::mutex> queue_lock(queue_mutex);
			message = queue.front();
			queue.pop_front();
			queue_size.store(static_cast<int>(queue.size()));
		}
		else
		{
			std::unique_lock<std::mutex> queue_lock(queue_mutex);
			while (queue_size.load(std::memory_order_consume) == 0)
			{
				queue_condition.wait(queue_lock);
			}
			continue;
		}
		try {
			auto paho_message = mqtt::make_message((configuration.root + message.topic).c_str(), message.message.c_str(), message.message.size(), message.qos, message.retain);
			auto token = client->publish(paho_message);
		}
		catch (std::exception&)
		{
			/* pass */
		}
	}
}

void MQTT::Publisher::restart_worker(void)
{
	if (!worker_running.load(std::memory_order_consume))
	{
		std::unique_lock<std::mutex> queue_lock(queue_mutex);
		queue.clear();
		worker_running.store(true, std::memory_order_release);
		worker.reset(new std::thread(&MQTT::Publisher::loop, this));
	}
}

void MQTT::Publisher::stop_worker(void)
{
	worker_running.store(false, std::memory_order_release);
	std::unique_lock<std::mutex> queue_lock(queue_mutex);
	queue_condition.notify_all();
}

MQTT::Publisher::Publisher(std::string id, unsigned int buffer) : id(id), buffer(buffer)
{
	if (instances == 0)
	{
		/* pass */;
	}
	instances++;

	
	worker_running.store(false, std::memory_order_release);
	queue.resize(0);
	queue_size.store(0, std::memory_order_release);

}


MQTT::Publisher::~Publisher()
{
	if (this->is_connected()) 
	{
		this->disconnect();
	}
	if (worker != nullptr) 
	{
		worker->detach();
	}
	instances--;
	if (instances == 0)
	{
		/* pass */;
	}
}

void MQTT::Publisher::connect(MQTT::Configuration configuration)
{
	try
	{
		client.reset(new mqtt::async_client(configuration.uri(), id));

		auto ssl = mqtt::ssl_options_builder();
		auto options = mqtt::connect_options_builder();
		options.user_name(configuration.username);
		options.password(configuration.password);
		options.clean_session(configuration.clean_session);
		options.keep_alive_interval(std::chrono::seconds(configuration.keep_alive));
		options.connect_timeout(std::chrono::seconds(configuration.connection_timeout_s));
		
		if (configuration.ssl)
		{
			ssl.enable_server_cert_auth(configuration.verify);
			ssl.trust_store(configuration.certificate_authority);
			ssl.ssl_version(MQTT_SSL_VERSION_DEFAULT);
			options.ssl(ssl.finalize());
		}
		

		auto token = client->connect(options.finalize());
		auto response = token->get_connect_response();
	
		if (!client->is_connected())
		{
			throw std::runtime_error(mqtt::exception(token->get_return_code()));
		}

		restart_worker();
	}
	catch (std::exception& exc)
	{
		throw MQTT::Exception(exc);
	}
}

void MQTT::Publisher::connect()
{
	this->connect(this->configuration);
}


void MQTT::Publisher::reconnect(void)
{
	try
	{
		auto token = client->reconnect();
		auto response = token->get_connect_response();

		if (!client->is_connected())
		{
			throw std::runtime_error(mqtt::exception(token->get_return_code()));
		}
		restart_worker();
	}
	catch (std::exception& exc)
	{
		throw MQTT::Exception(exc);
	}
}

void MQTT::Publisher::disconnect(unsigned int timeout)
{
	if (client.get() != NULL)
	{
		auto token = client->disconnect();
		token->wait_for(timeout);
	}
	stop_worker();
	client.reset();
}


void MQTT::Publisher::set_root_topic(std::string root_topic)
{
	configuration.root = root_topic;
	if (configuration.root.length() > 0) 
	{
		std::string::iterator last_character = configuration.root.end() - 1;
		if ((*last_character) != '/')
		{
			configuration.root += "/";
		}
	}	
}

void MQTT::Publisher::set_buffer(unsigned int buffer)
{
	this->buffer = buffer;
}

bool MQTT::Publisher::is_connected(void)
{
	if (this->client == nullptr)
	{
		return false;
	}
	try
	{
		return client->is_connected();
	}
	catch (std::exception& exc)
	{
		throw MQTT::Exception(exc);
	}
}

void MQTT::Publisher::configure(MQTT::Configuration configuration)
{
	this->configuration = configuration;
	if (configuration.root.length() > 0) 
	{
		std::string::iterator last_character = configuration.root.end() - 1;
		if ((*last_character) != '/')
		{
			configuration.root += "/";
		}
	}
}


bool MQTT::Publisher::publish(std::string topic, std::string message, int qos, bool retain)
{
	MQTT::MessageContainer mqtt_message = { topic, message, qos, retain };
	if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer)))
	{
		std::unique_lock<std::mutex> queue_lock(queue_mutex);
		queue.push_back(mqtt_message);
		queue_size.store(static_cast<int>(queue.size()), std::memory_order_release);
		queue_condition.notify_all();
		return true;
	}
	else
	{
		return false;
	}
}

bool MQTT::Publisher::publish(std::vector<std::string> topics, std::vector<std::string> messages, int qos, bool retain)
{
	int n = static_cast<int>(topics.size());
	std::vector<MQTT::MessageContainer> mqtt_messages(n);
	for (int i = 0; i < n; i++)
	{
		mqtt_messages.at(i) = { topics.at(i), messages.at(i), qos, retain };
	}
	
	if ((qos > 0) || (queue_size.load(std::memory_order_consume) <= static_cast<int>(buffer)))
	{
		std::unique_lock<std::mutex> queue_lock(queue_mutex);
		queue.insert(queue.end(), mqtt_messages.begin(), mqtt_messages.end());
		queue_size.store(static_cast<int>(queue.size()), std::memory_order_release);
		queue_condition.notify_all();
		return true;
	}
	else
	{
		return false;
	}

}