From e1e4676e62a8d0a7e85c007fedfdfba6e0c2c370 Mon Sep 17 00:00:00 2001 From: Jens Koenen <koenen@vr.rwth-aachen.de> Date: Wed, 28 Sep 2022 17:42:52 +0200 Subject: [PATCH] Worked on udp-connection. Ring-Buffer still broken. --- src/stream/ring_buffer.cpp | 44 +++++++++++++++++++++----------------- src/stream/ring_buffer.hpp | 4 ++-- src/stream/transport.cpp | 26 +++++++++++++--------- src/stream/transport.hpp | 2 +- 4 files changed, 43 insertions(+), 33 deletions(-) diff --git a/src/stream/ring_buffer.cpp b/src/stream/ring_buffer.cpp index 62685a9b..bc501408 100644 --- a/src/stream/ring_buffer.cpp +++ b/src/stream/ring_buffer.cpp @@ -3,18 +3,22 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) { - if (this->get_bytes() < size) + if(this->get_bytes() < size) { return false; } - while (size > 0) + uint32_t offset = 0; + + while(offset < size) { + uint32_t read_offset = (this->read_offset + offset) % this->data.size(); + uint32_t max_size = 0; - uint8_t* read_buffer = this->get_read_buffer(max_size); + uint8_t* read_buffer = this->get_read_buffer(read_offset, max_size); - memcpy(buffer, read_buffer, glm::min(size, max_size)); - size -= glm::min(size, max_size); + memcpy(buffer + offset, read_buffer, glm::min(size - offset, max_size)); + offset += glm::min(size - offset, max_size); } return true; @@ -22,7 +26,7 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) bool RingBuffer::read(uint8_t* buffer, uint32_t size) { - if (!this->peak(buffer, size)) + if(!this->peak(buffer, size)) { return false; } @@ -35,7 +39,7 @@ bool RingBuffer::read(uint8_t* buffer, uint32_t size) bool RingBuffer::write(WriteFunction function) { uint32_t max_size = 0; - uint8_t* write_buffer = this->get_write_buffer(max_size); + uint8_t* write_buffer = this->get_write_buffer(this->write_offset, max_size); uint32_t size = function(write_buffer, max_size); this->write_offset = (this->write_offset + size) % this->data.size(); @@ -45,7 +49,7 @@ bool RingBuffer::write(WriteFunction function) uint32_t RingBuffer::get_bytes() const { - if (this->read_offset <= this->write_offset) + if(this->read_offset <= this->write_offset) { return this->write_offset - this->read_offset; } @@ -53,39 +57,39 @@ uint32_t RingBuffer::get_bytes() const return (this->data.size() - this->read_offset) + this->write_offset; } -uint8_t* RingBuffer::get_write_buffer(uint32_t& max_size) +uint8_t* RingBuffer::get_write_buffer(uint32_t offset, uint32_t& max_size) { - if (this->read_offset <= this->write_offset) + if(this->read_offset <= offset) { - max_size = this->data.size() - this->write_offset; + max_size = this->data.size() - offset; } else { - max_size = this->read_offset - this->write_offset; + max_size = this->read_offset - offset; } - uint32_t new_offset = (this->write_offset + max_size) % this->data.size(); + uint32_t new_offset = (offset + max_size) % this->data.size(); - if (new_offset == this->read_offset) + if(new_offset == this->read_offset) { max_size -= 1; } - return this->data.data() + this->write_offset; + return this->data.data() + offset; } -uint8_t* RingBuffer::get_read_buffer(uint32_t& max_size) +uint8_t* RingBuffer::get_read_buffer(uint32_t offset, uint32_t& max_size) { - if (this->read_offset <= this->write_offset) + if(offset <= this->write_offset) { - max_size = this->write_offset - this->read_offset; + max_size = this->write_offset - offset; } else { - max_size = this->data.size() - this->read_offset; + max_size = this->data.size() - offset; } - return this->data.data() + this->read_offset; + return this->data.data() + offset; } \ No newline at end of file diff --git a/src/stream/ring_buffer.hpp b/src/stream/ring_buffer.hpp index 3e2c7030..d0518538 100644 --- a/src/stream/ring_buffer.hpp +++ b/src/stream/ring_buffer.hpp @@ -25,6 +25,6 @@ public: uint32_t get_bytes() const; private: - uint8_t* get_write_buffer(uint32_t& max_size); - uint8_t* get_read_buffer(uint32_t& max_size); + uint8_t* get_write_buffer(uint32_t offset, uint32_t& max_size); + uint8_t* get_read_buffer(uint32_t offset, uint32_t& max_size); }; \ No newline at end of file diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index 2e5825f1..fb0d4f7e 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -9,16 +9,17 @@ bool Transport::create(uint32_t port_number) this->server_socket = asio::ip::udp::socket(this->server_context); this->server_socket->open(asio::ip::udp::v4()); this->server_socket->bind(this->server_endpoint); + this->server_socket->non_blocking(true); this->set_state(TRANSPORT_STATE_WAITING); + this->receive_datagram(); + this->worker_thread = std::thread([this]() { this->server_context.run(); }); - this->receive_datagram(); - return true; } @@ -194,22 +195,22 @@ bool Transport::send_frame_nal(PacketFrameNumber frame_number, PacketFrameId fra Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - uint32_t size = glm::min(datagram.size, (uint32_t) content.size() - offset); + uint32_t content_size = glm::min(datagram.size - (uint32_t) sizeof(PacketFrameNal), (uint32_t) content.size() - offset); PacketFrameNal* packet = (PacketFrameNal*)datagram.buffer; packet->id = PACKET_ID_FRAME_NAL; - packet->size = size; + packet->size = sizeof(PacketFrameNal) + content_size; packet->frame_number = frame_number; packet->frame_id = frame_id; packet->transform_id = transform_id; packet->payload_offset = offset; packet->payload_size = content.size(); - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*) content.data() + offset, sizeof(uint8_t) * size); + memcpy(datagram.buffer + sizeof(PacketFrameNal), (uint8_t*) content.data() + offset, sizeof(uint8_t) * content_size); this->send_datagram(datagram); - offset += size; + offset += content_size; } return true; @@ -230,20 +231,20 @@ bool Transport::send_frame_metadata(PacketFrameNumber frame_number, const std::s Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - uint32_t size = glm::min(datagram.size, (uint32_t)content.size() - offset); + uint32_t content_size = glm::min(datagram.size - (uint32_t)sizeof(PacketFrameMetadata), (uint32_t)content.size() - offset); PacketFrameMetadata* packet = (PacketFrameMetadata*)datagram.buffer; packet->id = PACKET_ID_FRAME_METADATA; - packet->size = size; + packet->size = sizeof(PacketFrameMetadata) + content_size; packet->frame_number = frame_number; packet->payload_offset = offset; packet->payload_size = content.size(); - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*)content.data() + offset, sizeof(uint8_t) * size); + memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*)content.data() + offset, sizeof(uint8_t) * content_size); this->send_datagram(datagram); - offset += size; + offset += content_size; } return true; @@ -412,6 +413,11 @@ void Transport::receive_datagram() asio::error_code error_code; uint32_t bytes = this->server_socket->receive_from(asio::buffer(buffer, max_size), this->receive_endpoint, 0, error_code); + if (error_code == asio::error::would_block) + { + return 0; + } + if (error_code) { lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index 2fd871d6..a171a789 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -11,7 +11,7 @@ #define TRANSPORT_STATE_CHECK_INTERVAL 10 //NOTE: In milliseconds #define TRANSPORT_CLIENT_CONNECT_TIMEOUT 60 //NOTE: In seconds -#define TRANSPORT_CLIENT_TIMEOUT 2 //NOTE: In seconds +#define TRANSPORT_CLIENT_TIMEOUT 20 //NOTE: In seconds enum TransportState { -- GitLab