diff --git a/CMakeLists.txt b/CMakeLists.txt index 24a305fcc2517aa091b7392299613b41b02f039e..21f1dc887c9f8156a0088183bfce401b7e2d12d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -706,6 +706,7 @@ message("======================================================================= #Stream ${SRC_DIR}/stream/datagram.hpp ${SRC_DIR}/stream/datagram.cpp ${SRC_DIR}/stream/packet.hpp + ${SRC_DIR}/stream/ring_buffer.hpp ${SRC_DIR}/stream/ring_buffer.cpp ${SRC_DIR}/stream/transport.hpp ${SRC_DIR}/stream/transport.cpp ${SRC_DIR}/stream/types.hpp diff --git a/src/stream/ring_buffer.cpp b/src/stream/ring_buffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..62685a9b0a7beaba2f368f81decbd0d96605d665 --- /dev/null +++ b/src/stream/ring_buffer.cpp @@ -0,0 +1,91 @@ +#include "ring_buffer.hpp" +#include <glm/glm.hpp> + +bool RingBuffer::peak(uint8_t* buffer, uint32_t size) +{ + if (this->get_bytes() < size) + { + return false; + } + + while (size > 0) + { + uint32_t max_size = 0; + uint8_t* read_buffer = this->get_read_buffer(max_size); + + memcpy(buffer, read_buffer, glm::min(size, max_size)); + size -= glm::min(size, max_size); + } + + return true; +} + +bool RingBuffer::read(uint8_t* buffer, uint32_t size) +{ + if (!this->peak(buffer, size)) + { + return false; + } + + this->read_offset = (this->read_offset + size) % this->data.size(); + + return true; +} + +bool RingBuffer::write(WriteFunction function) +{ + uint32_t max_size = 0; + uint8_t* write_buffer = this->get_write_buffer(max_size); + uint32_t size = function(write_buffer, max_size); + + this->write_offset = (this->write_offset + size) % this->data.size(); + + return true; +} + +uint32_t RingBuffer::get_bytes() const +{ + if (this->read_offset <= this->write_offset) + { + return this->write_offset - this->read_offset; + } + + return (this->data.size() - this->read_offset) + this->write_offset; +} + +uint8_t* RingBuffer::get_write_buffer(uint32_t& max_size) +{ + if (this->read_offset <= this->write_offset) + { + max_size = this->data.size() - this->write_offset; + } + + else + { + max_size = this->read_offset - this->write_offset; + } + + uint32_t new_offset = (this->write_offset + max_size) % this->data.size(); + + if (new_offset == this->read_offset) + { + max_size -= 1; + } + + return this->data.data() + this->write_offset; +} + +uint8_t* RingBuffer::get_read_buffer(uint32_t& max_size) +{ + if (this->read_offset <= this->write_offset) + { + max_size = this->write_offset - this->read_offset; + } + + else + { + max_size = this->data.size() - this->read_offset; + } + + return this->data.data() + this->read_offset; +} \ No newline at end of file diff --git a/src/stream/ring_buffer.hpp b/src/stream/ring_buffer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3e2c7030f8b390198a90cd37a547420c291773d3 --- /dev/null +++ b/src/stream/ring_buffer.hpp @@ -0,0 +1,30 @@ +#pragma once +#include <functional> +#include <array> +#include <cstdint> + +#define RING_BUFFER_SIZE 1024 + +class RingBuffer +{ +public: + typedef std::function<uint32_t(uint8_t* buffer, uint32_t max_size)> WriteFunction; + +private: + std::array<uint8_t, RING_BUFFER_SIZE> data; + uint32_t read_offset = 0; + uint32_t write_offset = 0; + +public: + RingBuffer() = default; + + bool peak(uint8_t* buffer, uint32_t size); + bool read(uint8_t* buffer, uint32_t size); + bool write(WriteFunction function); + + uint32_t get_bytes() const; + +private: + uint8_t* get_write_buffer(uint32_t& max_size); + uint8_t* get_read_buffer(uint32_t& max_size); +}; \ No newline at end of file diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index c6273b346275ca757b5b63c2b5ca9dc1f8dd9072..2e5825f1bbe55064f7b00e4d1ba508eb23343386 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -65,7 +65,7 @@ void Transport::destroy() this->bitrate_send = 0.0; this->bitrate_received = 0.0; - this->set_state(TRANSPORT_STATE_UNITIALIZED); + this->set_state(TRANSPORT_STATE_UNINITIALIZED); } bool Transport::wait_connect() @@ -337,7 +337,7 @@ void Transport::check_state() { std::chrono::high_resolution_clock::time_point start_time = std::chrono::high_resolution_clock::now(); - this->server_timer->expires_at(start_time + std::chrono::milliseconds(TRANSPORT_STATE_CHECK_INTERVALL)); + this->server_timer->expires_at(start_time + std::chrono::milliseconds(TRANSPORT_STATE_CHECK_INTERVAL)); this->server_timer->async_wait([this, start_time](const asio::error_code& error_code) { if (error_code) @@ -396,93 +396,124 @@ void Transport::send_datagram(const Datagram& datagram) void Transport::receive_datagram() { - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->datagram_pool.acquire_datagram(); - lock.unlock(); - - this->server_socket->async_receive_from(asio::buffer(datagram.buffer, datagram.size), this->receive_endpoint, [this, datagram](const asio::error_code& error_code, std::size_t bytes) + this->server_socket->async_wait(asio::ip::udp::socket::wait_read, [this](const asio::error_code& error_code) { if (error_code) { - lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); + lava::log()->error("Transport: Error during datagram receive wait! Message: {}", error_code.message()); this->on_transport_error(); this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; } - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->bits_received += bytes * 8; - lock.unlock(); - - if (!this->client_endpoint.has_value()) + this->receive_buffer.write([this](uint8_t* buffer, uint32_t max_size) -> uint32_t { - lava::log()->debug("Client connected"); + asio::error_code error_code; + uint32_t bytes = this->server_socket->receive_from(asio::buffer(buffer, max_size), this->receive_endpoint, 0, error_code); - this->client_endpoint = this->receive_endpoint; - this->check_state(); - } + if (error_code) + { + lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); - else if (this->receive_endpoint != this->client_endpoint) + return 0; + } + + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->bits_received += bytes * 8; + lock.unlock(); + + if (!this->client_endpoint.has_value()) + { + lava::log()->debug("Transport: Client connected"); + + this->client_endpoint = this->receive_endpoint; + this->check_state(); + } + + else if (this->receive_endpoint != this->client_endpoint) + { + lava::log()->error("Transport: Client already connected!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + return 0; + } + + return bytes; + }); + + PacketHeader header; + + while (this->receive_buffer.peak((uint8_t*)&header, sizeof(header))) { - lava::log()->error("Transport: Client already connected!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); + if (this->receive_buffer.get_bytes() < header.size) + { + break; + } - this->error_queue.push_back(datagram); + std::unique_lock<std::mutex> lock(this->worker_mutex); + Datagram datagram = this->datagram_pool.acquire_datagram(); + lock.unlock(); - return; - } + if(!this->receive_buffer.read(datagram.buffer, header.size)) + { + lava::log()->error("Transport: Can't read from receive buffer!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); - const PacketHeader* packet = (const PacketHeader*)datagram.buffer; + return; + } - switch (packet->id) - { - case PACKET_ID_SETUP: - case PACKET_ID_PROJECTION_CHANGE: - case PACKET_ID_HEAD_TRANSFORM: - case PACKET_ID_CONTROLLER_TRANSFORM: - case PACKET_ID_CONTROLLER_EVENT: - case PACKET_ID_CONTROLLER_BUTTON: - case PACKET_ID_CONTROLLER_THUMBSTICK: - case PACKET_ID_LATENCY: - this->receive_queue.push_back(datagram); - break; - case PACKET_ID_SETUP_COMPLETE: - lava::log()->error("Transport: Received setup complete packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_CONFIG_NAL: - lava::log()->error("Transport: Received frame config nal packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_NAL: - lava::log()->error("Transport: Received frame nal packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_METADATA: - lava::log()->error("Transport: Received frame metadata packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - default: - lava::log()->error("Transport: Received unknown packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - } + switch (header.id) + { + case PACKET_ID_SETUP: + case PACKET_ID_PROJECTION_CHANGE: + case PACKET_ID_HEAD_TRANSFORM: + case PACKET_ID_CONTROLLER_TRANSFORM: + case PACKET_ID_CONTROLLER_EVENT: + case PACKET_ID_CONTROLLER_BUTTON: + case PACKET_ID_CONTROLLER_THUMBSTICK: + case PACKET_ID_LATENCY: + this->receive_queue.push_back(datagram); + break; + case PACKET_ID_SETUP_COMPLETE: + lava::log()->error("Transport: Received setup complete packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_CONFIG_NAL: + lava::log()->error("Transport: Received frame config nal packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_NAL: + lava::log()->error("Transport: Received frame nal packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_METADATA: + lava::log()->error("Transport: Received frame metadata packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + default: + lava::log()->error("Transport: Received unknown packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + } - this->process_receive_queue(); + this->process_receive_queue(); + } + this->receive_datagram(); }); } diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index 7ea0bbd43c48949fcfcf1fb236c4e7fd79f826f2..2fd871d64d029270b6005319e335489c4f53bdc1 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -5,16 +5,17 @@ #include <span> #include <optional> +#include "ring_buffer.hpp" #include "datagram.hpp" #include "packet.hpp" -#define TRANSPORT_STATE_CHECK_INTERVALL 10 //NOTE: In milliseconds +#define TRANSPORT_STATE_CHECK_INTERVAL 10 //NOTE: In milliseconds #define TRANSPORT_CLIENT_CONNECT_TIMEOUT 60 //NOTE: In seconds #define TRANSPORT_CLIENT_TIMEOUT 2 //NOTE: In seconds enum TransportState { - TRANSPORT_STATE_UNITIALIZED, + TRANSPORT_STATE_UNINITIALIZED, TRANSPORT_STATE_WAITING, TRANSPORT_STATE_CONNECTED, TRANSPORT_STATE_ERROR @@ -98,17 +99,18 @@ private: asio::ip::udp::endpoint receive_endpoint; std::optional<asio::ip::udp::endpoint> client_endpoint; - DatagramPool datagram_pool; //NOTE: Protected by worker_mutex - std::vector<Datagram> send_queue; //NOTE: Protected by worker_mutex - std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped - std::vector<Datagram> error_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped - - TransportState state = TRANSPORT_STATE_UNITIALIZED; //NOTE: Protected by worker_mutex - uint32_t bits_send = 0; //NOTE: Owned by worker_thread - uint32_t bits_received = 0; //NOTE: Owned by worker_thread - double bitrate_send = 0.0; //NOTE: Protected by worker_mutex - double bitrate_received = 0.0; //NOTE: Protected by worker_mutex - double inactive_time = 0.0; //NOTE: Owned by worker_thread + RingBuffer receive_buffer; //NOTE: Owned by worker_thread + DatagramPool datagram_pool; //NOTE: Protected by worker_mutex + std::vector<Datagram> send_queue; //NOTE: Protected by worker_mutex + std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped + std::vector<Datagram> error_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped + + TransportState state = TRANSPORT_STATE_UNINITIALIZED; //NOTE: Protected by worker_mutex + uint32_t bits_send = 0; //NOTE: Owned by worker_thread + uint32_t bits_received = 0; //NOTE: Owned by worker_thread + double bitrate_send = 0.0; //NOTE: Protected by worker_mutex + double bitrate_received = 0.0; //NOTE: Protected by worker_mutex + double inactive_time = 0.0; //NOTE: Owned by worker_thread OnSetup on_setup; OnProjectionChange on_projection_change;