diff --git a/src/stream/ring_buffer.cpp b/src/stream/ring_buffer.cpp index 62685a9b0a7beaba2f368f81decbd0d96605d665..bc501408205984c8fe378219f75e8bbb2fe42a8d 100644 --- a/src/stream/ring_buffer.cpp +++ b/src/stream/ring_buffer.cpp @@ -3,18 +3,22 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) { - if (this->get_bytes() < size) + if(this->get_bytes() < size) { return false; } - while (size > 0) + uint32_t offset = 0; + + while(offset < size) { + uint32_t read_offset = (this->read_offset + offset) % this->data.size(); + uint32_t max_size = 0; - uint8_t* read_buffer = this->get_read_buffer(max_size); + uint8_t* read_buffer = this->get_read_buffer(read_offset, max_size); - memcpy(buffer, read_buffer, glm::min(size, max_size)); - size -= glm::min(size, max_size); + memcpy(buffer + offset, read_buffer, glm::min(size - offset, max_size)); + offset += glm::min(size - offset, max_size); } return true; @@ -22,7 +26,7 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) bool RingBuffer::read(uint8_t* buffer, uint32_t size) { - if (!this->peak(buffer, size)) + if(!this->peak(buffer, size)) { return false; } @@ -35,7 +39,7 @@ bool RingBuffer::read(uint8_t* buffer, uint32_t size) bool RingBuffer::write(WriteFunction function) { uint32_t max_size = 0; - uint8_t* write_buffer = this->get_write_buffer(max_size); + uint8_t* write_buffer = this->get_write_buffer(this->write_offset, max_size); uint32_t size = function(write_buffer, max_size); this->write_offset = (this->write_offset + size) % this->data.size(); @@ -45,7 +49,7 @@ bool RingBuffer::write(WriteFunction function) uint32_t RingBuffer::get_bytes() const { - if (this->read_offset <= this->write_offset) + if(this->read_offset <= this->write_offset) { return this->write_offset - this->read_offset; } @@ -53,39 +57,39 @@ uint32_t RingBuffer::get_bytes() const return (this->data.size() - this->read_offset) + this->write_offset; } -uint8_t* RingBuffer::get_write_buffer(uint32_t& max_size) +uint8_t* RingBuffer::get_write_buffer(uint32_t offset, uint32_t& max_size) { - if (this->read_offset <= this->write_offset) + if(this->read_offset <= offset) { - max_size = this->data.size() - this->write_offset; + max_size = this->data.size() - offset; } else { - max_size = this->read_offset - this->write_offset; + max_size = this->read_offset - offset; } - uint32_t new_offset = (this->write_offset + max_size) % this->data.size(); + uint32_t new_offset = (offset + max_size) % this->data.size(); - if (new_offset == this->read_offset) + if(new_offset == this->read_offset) { max_size -= 1; } - return this->data.data() + this->write_offset; + return this->data.data() + offset; } -uint8_t* RingBuffer::get_read_buffer(uint32_t& max_size) +uint8_t* RingBuffer::get_read_buffer(uint32_t offset, uint32_t& max_size) { - if (this->read_offset <= this->write_offset) + if(offset <= this->write_offset) { - max_size = this->write_offset - this->read_offset; + max_size = this->write_offset - offset; } else { - max_size = this->data.size() - this->read_offset; + max_size = this->data.size() - offset; } - return this->data.data() + this->read_offset; + return this->data.data() + offset; } \ No newline at end of file diff --git a/src/stream/ring_buffer.hpp b/src/stream/ring_buffer.hpp index 3e2c7030f8b390198a90cd37a547420c291773d3..d0518538b645cf70c73aa2e34fa4e01ad8a176ca 100644 --- a/src/stream/ring_buffer.hpp +++ b/src/stream/ring_buffer.hpp @@ -25,6 +25,6 @@ public: uint32_t get_bytes() const; private: - uint8_t* get_write_buffer(uint32_t& max_size); - uint8_t* get_read_buffer(uint32_t& max_size); + uint8_t* get_write_buffer(uint32_t offset, uint32_t& max_size); + uint8_t* get_read_buffer(uint32_t offset, uint32_t& max_size); }; \ No newline at end of file diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index 2e5825f1bbe55064f7b00e4d1ba508eb23343386..fb0d4f7ee6cccd879e3ebfe9df3b309d153e997d 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -9,16 +9,17 @@ bool Transport::create(uint32_t port_number) this->server_socket = asio::ip::udp::socket(this->server_context); this->server_socket->open(asio::ip::udp::v4()); this->server_socket->bind(this->server_endpoint); + this->server_socket->non_blocking(true); this->set_state(TRANSPORT_STATE_WAITING); + this->receive_datagram(); + this->worker_thread = std::thread([this]() { this->server_context.run(); }); - this->receive_datagram(); - return true; } @@ -194,22 +195,22 @@ bool Transport::send_frame_nal(PacketFrameNumber frame_number, PacketFrameId fra Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - uint32_t size = glm::min(datagram.size, (uint32_t) content.size() - offset); + uint32_t content_size = glm::min(datagram.size - (uint32_t) sizeof(PacketFrameNal), (uint32_t) content.size() - offset); PacketFrameNal* packet = (PacketFrameNal*)datagram.buffer; packet->id = PACKET_ID_FRAME_NAL; - packet->size = size; + packet->size = sizeof(PacketFrameNal) + content_size; packet->frame_number = frame_number; packet->frame_id = frame_id; packet->transform_id = transform_id; packet->payload_offset = offset; packet->payload_size = content.size(); - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*) content.data() + offset, sizeof(uint8_t) * size); + memcpy(datagram.buffer + sizeof(PacketFrameNal), (uint8_t*) content.data() + offset, sizeof(uint8_t) * content_size); this->send_datagram(datagram); - offset += size; + offset += content_size; } return true; @@ -230,20 +231,20 @@ bool Transport::send_frame_metadata(PacketFrameNumber frame_number, const std::s Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - uint32_t size = glm::min(datagram.size, (uint32_t)content.size() - offset); + uint32_t content_size = glm::min(datagram.size - (uint32_t)sizeof(PacketFrameMetadata), (uint32_t)content.size() - offset); PacketFrameMetadata* packet = (PacketFrameMetadata*)datagram.buffer; packet->id = PACKET_ID_FRAME_METADATA; - packet->size = size; + packet->size = sizeof(PacketFrameMetadata) + content_size; packet->frame_number = frame_number; packet->payload_offset = offset; packet->payload_size = content.size(); - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*)content.data() + offset, sizeof(uint8_t) * size); + memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*)content.data() + offset, sizeof(uint8_t) * content_size); this->send_datagram(datagram); - offset += size; + offset += content_size; } return true; @@ -412,6 +413,11 @@ void Transport::receive_datagram() asio::error_code error_code; uint32_t bytes = this->server_socket->receive_from(asio::buffer(buffer, max_size), this->receive_endpoint, 0, error_code); + if (error_code == asio::error::would_block) + { + return 0; + } + if (error_code) { lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index 2fd871d64d029270b6005319e335489c4f53bdc1..a171a78914faeca71bb3c14821c38953742fd7c3 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -11,7 +11,7 @@ #define TRANSPORT_STATE_CHECK_INTERVAL 10 //NOTE: In milliseconds #define TRANSPORT_CLIENT_CONNECT_TIMEOUT 60 //NOTE: In seconds -#define TRANSPORT_CLIENT_TIMEOUT 2 //NOTE: In seconds +#define TRANSPORT_CLIENT_TIMEOUT 20 //NOTE: In seconds enum TransportState {