From c04f6bf43e624aa87f9f1baae88e8f7d52d49af0 Mon Sep 17 00:00:00 2001 From: Jens Koenen <koenen@vr.rwth-aachen.de> Date: Wed, 5 Oct 2022 17:52:07 +0200 Subject: [PATCH] Worked on udp-connection. --- src/headset/headset.hpp | 1 + src/headset/openxr_headset.cpp | 20 +- src/headset/remote_headset.cpp | 17 +- src/headset/remote_headset.hpp | 5 +- .../depth_peeling_reprojection_stereo.cpp | 3 +- src/strategy/multi_view_stereo.cpp | 3 +- src/strategy/native_stereo_deferred.cpp | 3 +- src/strategy/native_stereo_forward.cpp | 3 +- src/stream/ring_buffer.cpp | 80 ++--- src/stream/ring_buffer.hpp | 11 +- src/stream/transport.cpp | 290 +++++++++++------- src/stream/transport.hpp | 19 +- src/utility/encoder.cpp | 1 + src/utility/extern_fence.cpp | 16 +- 14 files changed, 269 insertions(+), 203 deletions(-) diff --git a/src/headset/headset.hpp b/src/headset/headset.hpp index 89220b60..766d0950 100644 --- a/src/headset/headset.hpp +++ b/src/headset/headset.hpp @@ -87,6 +87,7 @@ public: // This function can be used to send metadata to a remote headset and is only relevant in case of an remote headset. // If the headset is an remote headset, this fuction has to be called every frame before submitting any frames. + // Submiting metadata with size equal to zero is invalid. virtual void submit_metadata(const FrameMetadata& metadata); // Returns the image format of the framebuffer images. diff --git a/src/headset/openxr_headset.cpp b/src/headset/openxr_headset.cpp index e38ef3ca..ff4e8af6 100644 --- a/src/headset/openxr_headset.cpp +++ b/src/headset/openxr_headset.cpp @@ -105,7 +105,7 @@ bool OpenXRHeadset::on_setup_instance(lava::frame_config& config) memset(&requirements, 0, sizeof(requirements)); requirements.type = XR_TYPE_GRAPHICS_REQUIREMENTS_VULKAN_KHR; - if(!this->check_result(xrGetVulkanGraphicsRequirementsKHR(this->instance, this->system, &requirements))) + if (!this->check_result(xrGetVulkanGraphicsRequirementsKHR(this->instance, this->system, &requirements))) { lava::log()->error("OpenXR: Can't get Vulkan requirements!"); @@ -157,7 +157,7 @@ bool OpenXRHeadset::on_setup_instance(lava::frame_config& config) bool OpenXRHeadset::on_setup_device(lava::device::create_param& parameters) { VkPhysicalDevice physical_device = VK_NULL_HANDLE; - if(!this->check_result(xrGetVulkanGraphicsDeviceKHR(this->instance, this->system, this->get_application()->get_instance().get(), &physical_device))) + if (!this->check_result(xrGetVulkanGraphicsDeviceKHR(this->instance, this->system, this->get_application()->get_instance().get(), &physical_device))) { if (physical_device != parameters.physical_device->get()) { @@ -210,7 +210,7 @@ bool OpenXRHeadset::on_create() create_info.createFlags = 0; create_info.systemId = this->system; - if(!this->check_result(xrCreateSession(this->instance, &create_info, &this->session))) + if (!this->check_result(xrCreateSession(this->instance, &create_info, &this->session))) { lava::log()->error("OpenXR: Can't create session!"); @@ -231,7 +231,7 @@ bool OpenXRHeadset::on_create() memset(&space_info.poseInReferenceSpace, 0, sizeof(space_info.poseInReferenceSpace)); space_info.poseInReferenceSpace.orientation.w = 1.0f; - if(!this->check_result(xrCreateReferenceSpace(this->session, &space_info, &this->space))) + if (!this->check_result(xrCreateReferenceSpace(this->session, &space_info, &this->space))) { lava::log()->error("OpenXR: Can't create reference space!"); @@ -779,7 +779,7 @@ bool OpenXRHeadset::release_image(FrameId frame_id) release_info.type = XR_TYPE_SWAPCHAIN_IMAGE_RELEASE_INFO; release_info.next = nullptr; - if(!this->check_result(xrReleaseSwapchainImage(this->swapchains[frame_id], &release_info))) + if (!this->check_result(xrReleaseSwapchainImage(this->swapchains[frame_id], &release_info))) { lava::log()->error("OpenXR: Can't release swapchain image!"); @@ -830,7 +830,7 @@ bool OpenXRHeadset::end_frame() end_info.layerCount = 1; end_info.layers = &layer_pointer; - if(!this->check_result(xrEndFrame(this->session, &end_info))) + if (!this->check_result(xrEndFrame(this->session, &end_info))) { lava::log()->error("OpenXR: Can't end frame!"); @@ -909,7 +909,7 @@ bool OpenXRHeadset::check_layer_support(const std::vector<const char*>& required bool OpenXRHeadset::check_extension_support(const std::vector<const char*>& required_extensions) const { uint32_t extension_count = 0; - if(!this->check_result(xrEnumerateInstanceExtensionProperties(nullptr, 0, &extension_count, nullptr))) + if (!this->check_result(xrEnumerateInstanceExtensionProperties(nullptr, 0, &extension_count, nullptr))) { return false; } @@ -919,7 +919,7 @@ bool OpenXRHeadset::check_extension_support(const std::vector<const char*>& requ extension_template.type = XR_TYPE_EXTENSION_PROPERTIES; std::vector<XrExtensionProperties> extension_list(extension_count, extension_template); - if(!this->check_result(xrEnumerateInstanceExtensionProperties(nullptr, extension_count, &extension_count, extension_list.data()))) + if (!this->check_result(xrEnumerateInstanceExtensionProperties(nullptr, extension_count, &extension_count, extension_list.data()))) { return false; } @@ -979,7 +979,7 @@ bool OpenXRHeadset::check_blend_mode_support(XrEnvironmentBlendMode required_mod bool OpenXRHeadset::check_view_type_support(XrViewConfigurationType required_type) const { uint32_t view_type_count = 0; - if(!this->check_result(xrEnumerateViewConfigurations(this->instance, this->system, 0, &view_type_count, nullptr))) + if (!this->check_result(xrEnumerateViewConfigurations(this->instance, this->system, 0, &view_type_count, nullptr))) { return false; } @@ -1006,7 +1006,7 @@ bool OpenXRHeadset::check_view_type_support(XrViewConfigurationType required_typ bool OpenXRHeadset::check_swapchain_format_support(int64_t required_format) const { uint32_t format_count = 0; - if(!this->check_result(xrEnumerateSwapchainFormats(this->session, 0, &format_count, nullptr))) + if (!this->check_result(xrEnumerateSwapchainFormats(this->session, 0, &format_count, nullptr))) { return false; } diff --git a/src/headset/remote_headset.cpp b/src/headset/remote_headset.cpp index 69b59291..29f4fffe 100644 --- a/src/headset/remote_headset.cpp +++ b/src/headset/remote_headset.cpp @@ -42,7 +42,9 @@ bool RemoteHeadset::on_setup_device(lava::device::create_param& parameters) bool RemoteHeadset::on_create() { this->statistic_send_bitrate = make_statistic("Send-Bitrate (Mbps)", 128); + this->statistic_send_queue_size = make_statistic("Send-Queue (Bytes)", 128); this->statistic_receive_bitrate = make_statistic("Receive-Bitrate (Mbps)", 128); + this->statistic_receive_queue_size = make_statistic("Receive-Queue (Bytes)", 128); this->statistic_latency = make_latency_statistic("latency"); StereoStrategy::Ptr strategy = this->get_application()->get_stereo_strategy(); @@ -141,7 +143,7 @@ bool RemoteHeadset::on_interface() { for (Encoder::Ptr encoder : this->encoders) { - encoder->set_bit_rate(this->encoder_bit_rate); + encoder->set_bit_rate(this->encoder_bit_rate * 1000); } } @@ -172,11 +174,19 @@ bool RemoteHeadset::on_interface() return false; } + if (ImGui::SliderInt("Send-Rate Limit", (int32_t*)&this->send_bit_rate_limit, 10, 10000)) + { + this->transport->set_max_bitrate_send(this->send_bit_rate_limit / 1000.0); + } + ImGui::Separator(); this->statistic_send_bitrate->interface(); this->statistic_receive_bitrate->interface(); + this->statistic_send_queue_size->interface(); + this->statistic_receive_queue_size->interface(); + std::unique_lock<std::mutex> lock(this->transport_mutex); this->statistic_latency->interface(); lock.unlock(); @@ -337,6 +347,8 @@ bool RemoteHeadset::create_transport() this->on_transport_error(); }); + this->transport->set_max_bitrate_send(this->send_bit_rate_limit / 1000.0); + if (!this->transport->create(this->server_port)) { return false; @@ -427,6 +439,9 @@ void RemoteHeadset::update_bitrates(lava::delta delta_time) { this->statistic_send_bitrate->add_sample(this->transport->get_bitrate_send()); this->statistic_receive_bitrate->add_sample(this->transport->get_bitrate_received()); + + this->statistic_send_queue_size->add_sample(this->transport->get_send_queue_size()); + this->statistic_receive_queue_size->add_sample(this->transport->get_receive_queue_size()); } void RemoteHeadset::update_stage(lava::delta delta_time) diff --git a/src/headset/remote_headset.hpp b/src/headset/remote_headset.hpp index 1381e483..119d1482 100644 --- a/src/headset/remote_headset.hpp +++ b/src/headset/remote_headset.hpp @@ -112,9 +112,12 @@ private: uint32_t encoder_key_rate = 120; uint32_t encoder_bit_rate = 6000; //NOTE: Bitrate in kbit per seconds uint32_t encoder_frame_rate = 90; - uint32_t encoder_quality = 35; + uint32_t encoder_quality = 51; + uint32_t send_bit_rate_limit = 8000; //NOTE: Bitrate in kbit per seconds Statistic::Ptr statistic_send_bitrate; + Statistic::Ptr statistic_send_queue_size; Statistic::Ptr statistic_receive_bitrate; + Statistic::Ptr statistic_receive_queue_size; LatencyStatistic::Ptr statistic_latency; //NOTE: Protected by transport_mutex }; \ No newline at end of file diff --git a/src/strategy/depth_peeling_reprojection_stereo.cpp b/src/strategy/depth_peeling_reprojection_stereo.cpp index bf71459f..c1dcc2c4 100644 --- a/src/strategy/depth_peeling_reprojection_stereo.cpp +++ b/src/strategy/depth_peeling_reprojection_stereo.cpp @@ -69,7 +69,8 @@ bool DepthPeelingReprojectionStereo::on_render(VkCommandBuffer command_buffer, l VK_PIPELINE_STAGE_TRANSFER_BIT, VK_PIPELINE_STAGE_BOTTOM_OF_PIPE_BIT, image_range); - this->get_headset()->submit_metadata(FrameMetadata()); + std::array<uint8_t, 1> metadata = { 0x00 }; + this->get_headset()->submit_metadata(metadata); this->render(command_buffer, frame); this->render_companion_window(command_buffer); diff --git a/src/strategy/multi_view_stereo.cpp b/src/strategy/multi_view_stereo.cpp index 3fe3e192..690cc214 100644 --- a/src/strategy/multi_view_stereo.cpp +++ b/src/strategy/multi_view_stereo.cpp @@ -91,7 +91,8 @@ bool MultiViewStereo::on_update(lava::delta delta_time) bool MultiViewStereo::on_render(VkCommandBuffer command_buffer, lava::index frame) { - this->get_headset()->submit_metadata(FrameMetadata()); + std::array<uint8_t, 1> metadata = { 0x00 }; + this->get_headset()->submit_metadata(metadata); this->get_pass_timer()->begin_pass(command_buffer, "shadow"); this->shadow_cache->compute_shadow(command_buffer, frame); diff --git a/src/strategy/native_stereo_deferred.cpp b/src/strategy/native_stereo_deferred.cpp index d8e5b1ba..87d65e20 100644 --- a/src/strategy/native_stereo_deferred.cpp +++ b/src/strategy/native_stereo_deferred.cpp @@ -81,7 +81,8 @@ bool NativeStereoDeferred::on_update(lava::delta delta_time) bool NativeStereoDeferred::on_render(VkCommandBuffer command_buffer, lava::index frame) { - this->get_headset()->submit_metadata(FrameMetadata()); + std::array<uint8_t, 1> metadata = { 0x00 }; + this->get_headset()->submit_metadata(metadata); this->get_pass_timer()->begin_pass(command_buffer, "shadow"); this->shadow_cache->compute_shadow(command_buffer, frame); diff --git a/src/strategy/native_stereo_forward.cpp b/src/strategy/native_stereo_forward.cpp index 4547c964..dfe86df3 100644 --- a/src/strategy/native_stereo_forward.cpp +++ b/src/strategy/native_stereo_forward.cpp @@ -81,7 +81,8 @@ bool NativeStereoForward::on_update(lava::delta delta_time) bool NativeStereoForward::on_render(VkCommandBuffer command_buffer, lava::index frame) { - this->get_headset()->submit_metadata(FrameMetadata()); + std::array<uint8_t, 1> metadata = { 0x00 }; + this->get_headset()->submit_metadata(metadata); this->get_pass_timer()->begin_pass(command_buffer, "shadow"); this->shadow_cache->compute_shadow(command_buffer, frame); diff --git a/src/stream/ring_buffer.cpp b/src/stream/ring_buffer.cpp index bc501408..3f28a9f9 100644 --- a/src/stream/ring_buffer.cpp +++ b/src/stream/ring_buffer.cpp @@ -3,22 +3,21 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) { - if(this->get_bytes() < size) + if (this->get_read_bytes() < size) { return false; } - uint32_t offset = 0; + uint32_t copy_offset = this->read_offset; - while(offset < size) + while (size > 0) { - uint32_t read_offset = (this->read_offset + offset) % this->data.size(); + uint32_t copy_size = glm::min(size, (uint32_t)this->ring_buffer.size() - copy_offset); + memcpy(buffer, this->ring_buffer.data() + copy_offset, copy_size); - uint32_t max_size = 0; - uint8_t* read_buffer = this->get_read_buffer(read_offset, max_size); - - memcpy(buffer + offset, read_buffer, glm::min(size - offset, max_size)); - offset += glm::min(size - offset, max_size); + copy_offset = (copy_offset + copy_size) % this->ring_buffer.size(); + buffer += copy_size; + size -= copy_size; } return true; @@ -26,70 +25,39 @@ bool RingBuffer::peak(uint8_t* buffer, uint32_t size) bool RingBuffer::read(uint8_t* buffer, uint32_t size) { - if(!this->peak(buffer, size)) + if (!this->peak(buffer, size)) { return false; } - this->read_offset = (this->read_offset + size) % this->data.size(); + this->read_offset = (this->read_offset + size) % this->ring_buffer.size(); + this->read_count -= size; return true; } bool RingBuffer::write(WriteFunction function) { - uint32_t max_size = 0; - uint8_t* write_buffer = this->get_write_buffer(this->write_offset, max_size); - uint32_t size = function(write_buffer, max_size); - - this->write_offset = (this->write_offset + size) % this->data.size(); - - return true; -} - -uint32_t RingBuffer::get_bytes() const -{ - if(this->read_offset <= this->write_offset) + if (this->get_write_bytes() < 0) { - return this->write_offset - this->read_offset; - } - - return (this->data.size() - this->read_offset) + this->write_offset; -} - -uint8_t* RingBuffer::get_write_buffer(uint32_t offset, uint32_t& max_size) -{ - if(this->read_offset <= offset) - { - max_size = this->data.size() - offset; - } - - else - { - max_size = this->read_offset - offset; + return false; } - uint32_t new_offset = (offset + max_size) % this->data.size(); + uint32_t offset = (this->read_offset + this->read_count) % this->ring_buffer.size(); + uint32_t max_size = glm::min(this->get_write_bytes(), (uint32_t)this->ring_buffer.size() - offset); + uint32_t size = function(this->ring_buffer.data() + offset, max_size); - if(new_offset == this->read_offset) - { - max_size -= 1; - } + this->read_count += size; - return this->data.data() + offset; + return true; } -uint8_t* RingBuffer::get_read_buffer(uint32_t offset, uint32_t& max_size) +uint32_t RingBuffer::get_read_bytes() const { - if(offset <= this->write_offset) - { - max_size = this->write_offset - offset; - } - - else - { - max_size = this->data.size() - offset; - } + return this->read_count; +} - return this->data.data() + offset; +uint32_t RingBuffer::get_write_bytes() const +{ + return this->ring_buffer.size() - this->read_count; } \ No newline at end of file diff --git a/src/stream/ring_buffer.hpp b/src/stream/ring_buffer.hpp index d0518538..d3dcff20 100644 --- a/src/stream/ring_buffer.hpp +++ b/src/stream/ring_buffer.hpp @@ -11,9 +11,9 @@ public: typedef std::function<uint32_t(uint8_t* buffer, uint32_t max_size)> WriteFunction; private: - std::array<uint8_t, RING_BUFFER_SIZE> data; + std::array<uint8_t, RING_BUFFER_SIZE> ring_buffer; uint32_t read_offset = 0; - uint32_t write_offset = 0; + uint32_t read_count = 0; public: RingBuffer() = default; @@ -22,9 +22,6 @@ public: bool read(uint8_t* buffer, uint32_t size); bool write(WriteFunction function); - uint32_t get_bytes() const; - -private: - uint8_t* get_write_buffer(uint32_t offset, uint32_t& max_size); - uint8_t* get_read_buffer(uint32_t offset, uint32_t& max_size); + uint32_t get_read_bytes() const; + uint32_t get_write_bytes() const; }; \ No newline at end of file diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index fb0d4f7e..77735e65 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -295,6 +295,12 @@ void Transport::set_on_transport_error(OnTransportError function) this->on_transport_error = std::move(function); } +void Transport::set_max_bitrate_send(double bitrate) +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->max_bitrate_send = bitrate; +} + asio::io_context& Transport::get_context() { return this->server_context; @@ -327,6 +333,45 @@ double Transport::get_bitrate_received() return value; } +double Transport::get_max_bitrate_send() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + double value = this->max_bitrate_send; + lock.unlock(); + + return value; +} + +uint32_t Transport::get_send_queue_size() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + uint32_t bytes = 0; + + for (const Datagram& datagram : this->send_queue) + { + const PacketHeader* header = (const PacketHeader*)datagram.buffer; + bytes += header->size; + } + lock.unlock(); + + return bytes; +} + +uint32_t Transport::get_receive_queue_size() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + uint32_t bytes = 0; + + for (const Datagram& datagram : this->receive_queue) + { + const PacketHeader* header = (const PacketHeader*)datagram.buffer; + bytes += header->size; + } + lock.unlock(); + + return bytes; +} + void Transport::set_state(TransportState state) { std::unique_lock<std::mutex> lock(this->worker_mutex); @@ -356,6 +401,11 @@ void Transport::check_state() std::unique_lock<std::mutex> lock(this->worker_mutex); this->bitrate_send = ((double)this->bits_send / (double)delta_time) / (1000.0 * 1000.0); this->bitrate_received = ((double)this->bits_received / (double)delta_time) / (1000.0 * 1000.0); + + if (!this->send_active && !this->send_queue.empty()) + { + this->process_send_queue(); + } lock.unlock(); if (this->bits_received > 0) @@ -389,7 +439,7 @@ void Transport::send_datagram(const Datagram& datagram) std::unique_lock<std::mutex> lock(this->worker_mutex); this->send_queue.push_back(datagram); - if (this->send_queue.size() <= 1) + if (!this->send_active) { this->process_send_queue(); } @@ -397,129 +447,93 @@ void Transport::send_datagram(const Datagram& datagram) void Transport::receive_datagram() { - this->server_socket->async_wait(asio::ip::udp::socket::wait_read, [this](const asio::error_code& error_code) + std::unique_lock<std::mutex> lock(this->worker_mutex); + Datagram datagram = this->datagram_pool.acquire_datagram(); + lock.unlock(); + + this->server_socket->async_receive_from(asio::buffer(datagram.buffer, datagram.size), this->receive_endpoint, [this, datagram](const asio::error_code& error_code, std::size_t bytes) { if (error_code) { - lava::log()->error("Transport: Error during datagram receive wait! Message: {}", error_code.message()); + lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); this->on_transport_error(); this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; } - this->receive_buffer.write([this](uint8_t* buffer, uint32_t max_size) -> uint32_t - { - asio::error_code error_code; - uint32_t bytes = this->server_socket->receive_from(asio::buffer(buffer, max_size), this->receive_endpoint, 0, error_code); - - if (error_code == asio::error::would_block) - { - return 0; - } - - if (error_code) - { - lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - - return 0; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->bits_received += bytes * 8; - lock.unlock(); - - if (!this->client_endpoint.has_value()) - { - lava::log()->debug("Transport: Client connected"); - - this->client_endpoint = this->receive_endpoint; - this->check_state(); - } - - else if (this->receive_endpoint != this->client_endpoint) - { - lava::log()->error("Transport: Client already connected!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - - return 0; - } + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->bits_received += bytes * 8; + lock.unlock(); - return bytes; - }); + if (!this->client_endpoint.has_value()) + { + lava::log()->debug("Transport: Client connected"); - PacketHeader header; + this->client_endpoint = this->receive_endpoint; + this->check_state(); + } - while (this->receive_buffer.peak((uint8_t*)&header, sizeof(header))) + else if (this->receive_endpoint != this->client_endpoint) { - if (this->receive_buffer.get_bytes() < header.size) - { - break; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->datagram_pool.acquire_datagram(); - lock.unlock(); + lava::log()->error("Transport: Client already connected!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); - if(!this->receive_buffer.read(datagram.buffer, header.size)) - { - lava::log()->error("Transport: Can't read from receive buffer!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); - return; - } + return; + } - switch (header.id) - { - case PACKET_ID_SETUP: - case PACKET_ID_PROJECTION_CHANGE: - case PACKET_ID_HEAD_TRANSFORM: - case PACKET_ID_CONTROLLER_TRANSFORM: - case PACKET_ID_CONTROLLER_EVENT: - case PACKET_ID_CONTROLLER_BUTTON: - case PACKET_ID_CONTROLLER_THUMBSTICK: - case PACKET_ID_LATENCY: - this->receive_queue.push_back(datagram); - break; - case PACKET_ID_SETUP_COMPLETE: - lava::log()->error("Transport: Received setup complete packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_CONFIG_NAL: - lava::log()->error("Transport: Received frame config nal packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_NAL: - lava::log()->error("Transport: Received frame nal packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - case PACKET_ID_FRAME_METADATA: - lava::log()->error("Transport: Received frame metadata packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - default: - lava::log()->error("Transport: Received unknown packet!"); - this->on_transport_error(); - this->set_state(TRANSPORT_STATE_ERROR); - this->error_queue.push_back(datagram); - return; - } + const PacketHeader* packet = (const PacketHeader*)datagram.buffer; - this->process_receive_queue(); + switch (packet->id) + { + case PACKET_ID_SETUP: + case PACKET_ID_PROJECTION_CHANGE: + case PACKET_ID_HEAD_TRANSFORM: + case PACKET_ID_CONTROLLER_TRANSFORM: + case PACKET_ID_CONTROLLER_EVENT: + case PACKET_ID_CONTROLLER_BUTTON: + case PACKET_ID_CONTROLLER_THUMBSTICK: + case PACKET_ID_LATENCY: + this->receive_queue.push_back(datagram); + break; + case PACKET_ID_SETUP_COMPLETE: + lava::log()->error("Transport: Received setup complete packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_CONFIG_NAL: + lava::log()->error("Transport: Received frame config nal packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_NAL: + lava::log()->error("Transport: Received frame nal packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + case PACKET_ID_FRAME_METADATA: + lava::log()->error("Transport: Received frame metadata packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; + default: + lava::log()->error("Transport: Received unknown packet!"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); + return; } - + + this->process_receive_queue(); this->receive_datagram(); }); } @@ -537,9 +551,51 @@ void Transport::process_send_queue() } Datagram datagram = this->send_queue.front(); - const PacketHeader* header = (const PacketHeader*)datagram.buffer; + const PacketHeader* header = (const PacketHeader*) datagram.buffer; - this->server_socket->async_send_to(asio::buffer(datagram.buffer, header->size), this->client_endpoint.value(), [this, datagram](const asio::error_code& error_code, std::size_t bytes) + this->server_socket->async_wait(asio::socket_base::wait_write, [this, datagram, header](const asio::error_code& error_code) + { + asio::error_code err; + uint32_t bytes = this->server_socket->send_to(asio::buffer(datagram.buffer, header->size), this->client_endpoint.value(), 0, err); + + std::unique_lock<std::mutex> lock(this->worker_mutex); + + if(err == asio::error::would_block) + { + this->process_send_queue(); + + return; + } + + if (err) { + lava::log()->error("Transport: Error during send of datagram! Message: {}", err.message()); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + return; + } + + + this->send_active = false; + this->bits_send += bytes * 8; + + this->send_queue.erase(this->send_queue.begin()); + this->datagram_pool.release_datagram(datagram); + + double wait = ((bytes * 8) / (1000.0 * 1000.0)) / this->max_bitrate_send; + double wait_mico = wait / (1000.0 * 1000.0); + + std::chrono::high_resolution_clock::time_point a = std::chrono::high_resolution_clock::now(); + + while (((uint64_t) std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - a).count()) / (1000.0 * 1000.0 * 1000.0) < wait) { + } + + this->process_send_queue(); + lock.unlock(); + }); + + + /*this->server_socket->async_send_to(asio::buffer(datagram.buffer, header->size), this->client_endpoint.value(), [this, datagram](const asio::error_code& error_code, std::size_t bytes) { if (error_code) { @@ -553,14 +609,28 @@ void Transport::process_send_queue() } std::unique_lock<std::mutex> lock(this->worker_mutex); + this->send_active = false; this->bits_send += bytes * 8; this->send_queue.erase(this->send_queue.begin()); this->datagram_pool.release_datagram(datagram); - + + + double wait = ((bytes * 8) / (1000.0 * 1000.0)) / this->max_bitrate_send; + double wait_mico = wait / (1000.0 * 1000.0); + + std::chrono::high_resolution_clock::time_point a = std::chrono::high_resolution_clock::now(); + + while (((uint64_t)std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - a).count()) / (1000.0 * 1000.0 * 1000.0) < wait) + { + + } + this->process_send_queue(); lock.unlock(); - }); + });*/ + + this->send_active = true; } void Transport::process_receive_queue() diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index a171a789..e2df6a32 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -5,7 +5,6 @@ #include <span> #include <optional> -#include "ring_buffer.hpp" #include "datagram.hpp" #include "packet.hpp" @@ -62,11 +61,18 @@ public: void set_on_latency(OnLatency function); void set_on_transport_error(OnTransportError function); + //NOTE: The following functions should be thread safe + void set_max_bitrate_send(double bitrate); //NOTE: The bitrate is given in MBits/s + //NOTE: The following function should be thread safe asio::io_context& get_context(); TransportState get_state(); - double get_bitrate_send(); //NOTE: The bitrate is given in MBits/s - double get_bitrate_received(); //NOTE: The bitrate is given in MBits/s + double get_bitrate_send(); //NOTE: The bitrate is given in MBits/s + double get_bitrate_received(); //NOTE: The bitrate is given in MBits/s + double get_max_bitrate_send(); //NOTE: The bitrate is given in MBits/s + + uint32_t get_send_queue_size(); //NOTE: In Bytes + uint32_t get_receive_queue_size(); //NOTE: In Bytes private: void set_state(TransportState state); @@ -75,7 +81,7 @@ private: void send_datagram(const Datagram& datagram); void receive_datagram(); - void process_send_queue(); //NOTE: Protected by worker_mutex + void process_send_queue(); //NOTE: Protected by worker_mutex void process_receive_queue(); void parse_setup(const Datagram& datagram); @@ -99,18 +105,19 @@ private: asio::ip::udp::endpoint receive_endpoint; std::optional<asio::ip::udp::endpoint> client_endpoint; - RingBuffer receive_buffer; //NOTE: Owned by worker_thread DatagramPool datagram_pool; //NOTE: Protected by worker_mutex std::vector<Datagram> send_queue; //NOTE: Protected by worker_mutex std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped std::vector<Datagram> error_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped - + TransportState state = TRANSPORT_STATE_UNINITIALIZED; //NOTE: Protected by worker_mutex uint32_t bits_send = 0; //NOTE: Owned by worker_thread uint32_t bits_received = 0; //NOTE: Owned by worker_thread double bitrate_send = 0.0; //NOTE: Protected by worker_mutex double bitrate_received = 0.0; //NOTE: Protected by worker_mutex + double max_bitrate_send = 1.0; //NOTE: Protected by worker_mutex double inactive_time = 0.0; //NOTE: Owned by worker_thread + bool send_active = false; //NOTE: Protected by worker_mutex OnSetup on_setup; OnProjectionChange on_projection_change; diff --git a/src/utility/encoder.cpp b/src/utility/encoder.cpp index 9843b6e7..48979081 100644 --- a/src/utility/encoder.cpp +++ b/src/utility/encoder.cpp @@ -2233,6 +2233,7 @@ void Encoder::invalidate_slots() bool Encoder::is_key_frame() const { + return true; return (this->frame_index % this->setting_key_rate) == 0; } diff --git a/src/utility/extern_fence.cpp b/src/utility/extern_fence.cpp index f55220ea..1cad711b 100644 --- a/src/utility/extern_fence.cpp +++ b/src/utility/extern_fence.cpp @@ -44,12 +44,12 @@ public: get_info.fence = fence; get_info.handleType = VK_EXTERNAL_FENCE_HANDLE_TYPE_OPAQUE_WIN32_BIT; - HANDLE object_handle = NULL; + HANDLE object_handle = NULL; - if (vkGetFenceWin32HandleKHR(device->get(), &get_info, &object_handle) != VK_SUCCESS) - { + if (vkGetFenceWin32HandleKHR(device->get(), &get_info, &object_handle) != VK_SUCCESS) + { return false; - } + } this->object = ObjectType(executor, object_handle); @@ -110,12 +110,12 @@ public: get_info.fence = fence; get_info.handleType = VK_EXTERNAL_FENCE_HANDLE_TYPE_OPAQUE_FD_BIT; - int file_descriptor = 0; + int file_descriptor = 0; - if (vkGetFenceFdKHR(device->get(), &get_info, &file_descriptor) != VK_SUCCESS) - { + if (vkGetFenceFdKHR(device->get(), &get_info, &file_descriptor) != VK_SUCCESS) + { return false; - } + } this->stream = StreamType(executor, file_descriptor); -- GitLab