From 5a2dcf82a757d34226a244e7a72b8046162b6e51 Mon Sep 17 00:00:00 2001 From: Jens Koenen <koenen@vr.rwth-aachen.de> Date: Wed, 21 Sep 2022 15:33:53 +0200 Subject: [PATCH] Worked on udp-connection. --- src/headset/remote_headset.cpp | 361 +++++++++++++++------------------ src/headset/remote_headset.hpp | 60 +++--- src/stream/datagram.cpp | 5 +- src/stream/transport.cpp | 192 +++++++++++++++--- src/stream/transport.hpp | 49 +++-- 5 files changed, 410 insertions(+), 257 deletions(-) diff --git a/src/headset/remote_headset.cpp b/src/headset/remote_headset.cpp index 7ca510ef..69b59291 100644 --- a/src/headset/remote_headset.cpp +++ b/src/headset/remote_headset.cpp @@ -5,6 +5,7 @@ RemoteHeadset::RemoteHeadset() { this->controller_states.fill(CONTROLLER_STATE_DETACHED); + this->transport_controller_states.fill(CONTROLLER_STATE_DETACHED); for (std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>& buttons : this->controller_buttons) { @@ -40,22 +41,28 @@ bool RemoteHeadset::on_setup_device(lava::device::create_param& parameters) bool RemoteHeadset::on_create() { - this->statistic_bitrate = make_statistic("Bitrate (Mbps)", 128); + this->statistic_send_bitrate = make_statistic("Send-Bitrate (Mbps)", 128); + this->statistic_receive_bitrate = make_statistic("Receive-Bitrate (Mbps)", 128); this->statistic_latency = make_latency_statistic("latency"); StereoStrategy::Ptr strategy = this->get_application()->get_stereo_strategy(); this->frame_id_count = strategy->get_max_remote_frame_ids(); - this->transport = make_transport(); + if (!convert_strategy(this->get_application()->get_stereo_strategy_type(), this->strategy_id)) + { + return false; + } - if (!this->transport->create(this->server_port)) + if (!this->create_transport()) { lava::log()->error("Remote Headset: Can't create server!"); + + return false; } lava::log()->debug("Waiting for connection"); - if (!this->transport->wait_setup()) + if (!this->transport->wait_connect()) { lava::log()->error("Remote Headset: Incomplete setup handshake!"); @@ -83,18 +90,27 @@ bool RemoteHeadset::on_create() void RemoteHeadset::on_destroy() { + if (this->transport != nullptr) + { + this->transport->destroy(); + } + if (this->statistic_latency != nullptr) { statistic_latency->write_to_file(); } - this->destroy_server(); this->destroy_framebuffers(); this->destroy_encoders(); } bool RemoteHeadset::on_interface() { + if (this->transport->get_state() != TRANSPORT_STATE_CONNECTED) + { + return false; + } + ImGui::DragFloat("Movement Speed", &this->movement_speed, 1.0f, 0.0f, 1000.0); ImGui::Separator(); @@ -158,23 +174,30 @@ bool RemoteHeadset::on_interface() ImGui::Separator(); - this->statistic_bitrate->interface(); - this->statistic_latency->interface(); + this->statistic_send_bitrate->interface(); + this->statistic_receive_bitrate->interface(); - this->frame_number++; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->statistic_latency->interface(); + lock.unlock(); - return !this->check_error(); + return true; } bool RemoteHeadset::on_update(lava::delta delta_time) { + if (this->transport->get_state() != TRANSPORT_STATE_CONNECTED) + { + return false; + } + this->update_values(); + this->update_bitrates(delta_time); this->update_stage(delta_time); + + this->frame_number++; - double bitrate = ((double) this->bits_send / (double) delta_time) / (1000.0 * 1000.0); - this->statistic_bitrate->add_sample(bitrate); - - return !this->check_error(); + return true; } void RemoteHeadset::submit_frame(VkCommandBuffer command_buffer, VkImageLayout frame_layout, FrameId frame_id) @@ -200,10 +223,10 @@ void RemoteHeadset::submit_frame(VkCommandBuffer command_buffer, VkImageLayout f switch (type) { case ENCODER_FRAME_TYPE_CONFIG: - this->send_packet_frame_config_nal(frame_id, content); + this->transport->send_frame_config_nal(frame_id, content); break; case ENCODER_FRAME_TYPE_FRAME: - this->send_packet_frame_nal(frame_number, frame_id, transform_id, content); + this->transport->send_frame_nal(frame_number, frame_id, transform_id, content); break; case ENCODER_FRAME_TYPE_CONTROL: default: @@ -220,7 +243,7 @@ void RemoteHeadset::submit_frame(VkCommandBuffer command_buffer, VkImageLayout f void RemoteHeadset::submit_metadata(const FrameMetadata& metadata) { - this->send_packet_frame_metadata(this->frame_number, metadata); + this->transport->send_frame_metadata(this->frame_number, metadata); } VkFormat RemoteHeadset::get_format() const @@ -273,6 +296,55 @@ bool RemoteHeadset::is_remote() const return true; } +bool RemoteHeadset::create_transport() +{ + this->transport = make_transport(); + + this->transport->set_on_setup([this](const glm::u32vec2& resolution) + { + this->on_setup(resolution); + }); + this->transport->set_on_projection_change([this](const glm::mat4& left_eye_projection, const glm::mat4& right_eye_projection, const glm::mat4& left_head_to_eye, const glm::mat4& right_head_to_eye) + { + this->on_projection_change(left_eye_projection, right_eye_projection, left_head_to_eye, right_head_to_eye); + }); + this->transport->set_on_head_transform([this](PacketTransformId transform_id, const glm::mat4& head_transform) + { + this->on_head_transform(transform_id, head_transform); + }); + this->transport->set_on_controller_transform([this](PacketControllerId controller_id, const glm::mat4& controller_transform) + { + this->on_controller_transform(controller_id, controller_transform); + }); + this->transport->set_on_controller_event([this](PacketControllerId controller_id, PacketControllerState controller_state) + { + this->on_controller_event(controller_id, controller_state); + }); + this->transport->set_on_controller_button([this](PacketControllerId controller_id, PacketButtonId button_id, PacketButtonState button_state) + { + this->on_controller_button(controller_id, button_id, button_state); + }); + this->transport->set_on_controller_thumbstick([this](PacketControllerId controller_id, const glm::vec2& thumbstick) + { + this->on_controller_thumbstick(controller_id, thumbstick); + }); + this->transport->set_on_latency([this](PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, double timestamp_transfrom_query, double timestamp_server_response, double timestamp_frame_decoded, double timestamp_command_construct, double timestamp_command_executed) + { + this->on_latency(frame_number, frame_id, transform_id, timestamp_transfrom_query, timestamp_server_response, timestamp_frame_decoded, timestamp_command_construct, timestamp_command_executed); + }); + this->transport->set_on_transport_error([this]() + { + this->on_transport_error(); + }); + + if (!this->transport->create(this->server_port)) + { + return false; + } + + return true; +} + bool RemoteHeadset::create_framebuffers() { this->framebuffers.resize(this->frame_id_count); @@ -342,232 +414,133 @@ void RemoteHeadset::destroy_encoders() this->encoders.clear(); } -void RemoteHeadset::on_packet_setup(const std::span<uint8_t>& buffer) +void RemoteHeadset::update_values() { - PacketSetup* packet = (PacketSetup*)buffer.data(); + std::unique_lock<std::mutex> lock(this->transport_mutex); - if (packet->id != PACKET_ID_SETUP) - { - lava::log()->error("Remote Headset: Wrong packet id for setup packet!"); - this->on_transport_error(); + this->head_to_eye_matrices = this->transport_head_to_eye_matrices; + this->projection_matrices = this->transport_projection_matrices; + this->controller_states = this->transport_controller_states; +} - return; - } +void RemoteHeadset::update_bitrates(lava::delta delta_time) +{ + this->statistic_send_bitrate->add_sample(this->transport->get_bitrate_send()); + this->statistic_receive_bitrate->add_sample(this->transport->get_bitrate_received()); +} - PacketStereoStrategyId strategy_id; +void RemoteHeadset::update_stage(lava::delta delta_time) +{ + std::unique_lock<std::mutex> lock(this->transport_mutex); - if (!convert_strategy(this->get_application()->get_stereo_strategy_type(), strategy_id)) + for (uint32_t index = 0; index < this->controller_buttons.size(); index++) { - this->on_transport_error(); + const std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>& buttons = this->controller_buttons[index]; + glm::vec2 thumbstick = this->controller_thumbsticks[index]; - return; - } + glm::mat4 controller_transform = this->controller_transforms[index]; + glm::vec3 controller_forward = controller_transform[2]; + glm::vec3 controller_sideward = controller_transform[0]; - this->send_packet_setup_complete(strategy_id, this->frame_id_count, this->near_plane, this->far_plane); + float strength = this->movement_speed * delta_time; - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_resolution = packet->resolution; - this->worker_setup = true; - lock.unlock(); - - this->worker_setup_condition.notify_all(); -} + if (buttons[BUTTON_ID_TRIGGER] == BUTTON_STATE_PRESSED) + { + strength *= 2.0f; + } -void RemoteHeadset::on_packet_projection_change(const std::span<uint8_t>& buffer) -{ - PacketProjectionChange* packet = (PacketProjectionChange*)buffer.data(); + stage_position += controller_forward * thumbstick.y * strength; + stage_position -= controller_sideward * thumbstick.x * strength; + } - if (packet->id != PACKET_ID_PROJECTION_CHANGE) - { - lava::log()->error("Remote Headset: Wrong packet id for projection change packet!"); - this->on_transport_error(); + glm::mat4 stage_matrix = glm::translate(glm::mat4(1.0f), this->stage_position); + this->view_matrix = this->head_matrix * stage_matrix; - return; + for (uint32_t index = 0; index < this->controller_matrices.size(); index++) + { + this->controller_matrices[index] = glm::inverse(stage_matrix) * this->controller_transforms[index]; } - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_projection_matrices[EYE_LEFT] = packet->left_eye_projection; - this->worker_projection_matrices[EYE_RIGHT] = packet->right_eye_projection; - this->worker_head_to_eye_matrices[EYE_LEFT] = packet->left_head_to_eye; - this->worker_head_to_eye_matrices[EYE_RIGHT] = packet->right_head_to_eye; lock.unlock(); + + this->transport->send_stage_transform(stage_matrix); } -void RemoteHeadset::on_packet_head_transform(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_setup(const glm::u32vec2& resolution) { - PacketHeadTransform* packet = (PacketHeadTransform*)buffer.data(); - - if (packet->id != PACKET_ID_HEAD_TRANSFORM) - { - lava::log()->error("Remote Headset: Wrong packet id for head transform packet!"); - this->on_transport_error(); - - return; - } + this->transport->send_setup_complete(this->strategy_id, this->frame_id_count, this->near_plane, this->far_plane); - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_transform_id = packet->transform_id; - this->worker_head_matrix = packet->head_transform; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->resolution = resolution; lock.unlock(); } -void RemoteHeadset::on_packet_controller_transform(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_projection_change(const glm::mat4& left_eye_projection, const glm::mat4& right_eye_projection, const glm::mat4& left_head_to_eye, const glm::mat4& right_head_to_eye) { - PacketControllerTransform* packet = (PacketControllerTransform*)buffer.data(); - - if (packet->id != PACKET_ID_CONTROLLER_TRANSFORM) - { - lava::log()->error("Remote Headset: Wrong packet id for controller transform packet!"); - this->on_transport_error(); - - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_controller_transforms[packet->controller_id] = packet->controller_transform; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->transport_head_to_eye_matrices[EYE_LEFT] = left_head_to_eye; + this->transport_head_to_eye_matrices[EYE_RIGHT] = right_head_to_eye; + this->transport_projection_matrices[EYE_LEFT] = left_eye_projection; + this->transport_projection_matrices[EYE_RIGHT] = right_eye_projection; lock.unlock(); } -void RemoteHeadset::on_packet_controller_event(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_head_transform(PacketTransformId transform_id, const glm::mat4& head_transform) { - PacketControllerEvent* packet = (PacketControllerEvent*)buffer.data(); - - if (packet->id != PACKET_ID_CONTROLLER_EVENT) - { - lava::log()->error("Remote Headset: Wrong packet id for controller event packet!"); - this->on_transport_error(); - - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_controller_states[packet->controller_id] = packet->controller_state; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->transform_id = transform_id; + this->head_matrix = head_transform; lock.unlock(); } -void RemoteHeadset::on_packet_controller_button(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_controller_transform(PacketControllerId controller_id, const glm::mat4& controller_transform) { - PacketControllerButton* packet = (PacketControllerButton*)buffer.data(); - - if (packet->id != PACKET_ID_CONTROLLER_BUTTON) - { - lava::log()->error("Remote Headset: Wrong packet id for controller button packet!"); - this->on_transport_error(); - - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_controller_buttons[packet->controller_id][packet->button_id] = packet->button_state; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->controller_matrices[controller_id] = controller_transform; lock.unlock(); } -void RemoteHeadset::on_packet_controller_thumbstick(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_controller_event(PacketControllerId controller_id, PacketControllerState controller_state) { - PacketControllerThumbstick* packet = (PacketControllerThumbstick*) buffer.data(); - - if (packet->id != PACKET_ID_CONTROLLER_THUMBSTICK) - { - lava::log()->error("Remote Headset: Wrong packet id for controller thumbstick packet!"); - this->on_transport_error(); - - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_controller_thumbsticks[packet->controller_id] = packet->thumbstick; + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->transport_controller_states[controller_id] = controller_state; lock.unlock(); } -void RemoteHeadset::on_packet_latency(const std::span<uint8_t>& buffer) +void RemoteHeadset::on_controller_button(PacketControllerId controller_id, PacketButtonId button_id, PacketButtonState button_state) { - PacketLatency* packet = (PacketLatency*) buffer.data(); - - if (packet->id != PACKET_ID_LATENCY) - { - lava::log()->error("Remote Headset: Wrong packet id for latency packet!"); - this->on_transport_error(); - - return; - } - - LatencyCapture capture; - capture.frame_id = packet->frame_id; - capture.frame_number = packet->frame_number; - capture.transform_id = packet->transform_id; - capture.timestamp_transform_query = packet->timestamp_transform_query; - capture.timestamp_server_response = packet->timestamp_server_response; - capture.timestamp_frame_decoded = packet->timestamp_frame_decoded; - capture.timestamp_command_construct = packet->timestamp_command_construct; - capture.timestamp_command_executed = packet->timestamp_command_executed; - - this->statistic_latency->submit_capture(capture); + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->controller_buttons[controller_id][button_id] = button_state; + lock.unlock(); } -void RemoteHeadset::on_transport_error() +void RemoteHeadset::on_controller_thumbstick(PacketControllerId controller_id, const glm::vec2& thumbstick) { - lava::log()->error("Remote Headset: Transport error detected!"); - - std::unique_lock<std::mutex> lock(this->worker_mutex); - - this->worker_error = true; - - if (this->client_socket.has_value()) - { - this->client_socket->close(); - this->client_socket.reset(); - } + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->controller_thumbsticks[controller_id] = thumbstick; + lock.unlock(); } -void RemoteHeadset::update_values() +void RemoteHeadset::on_latency(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, double timestamp_transform_query, double timestamp_server_response, double timestamp_frame_decoded, double timestamp_command_construct, double timestamp_command_executed) { - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->bits_send = this->worker_bits_send; - this->worker_bits_send = 0; - - this->transform_id = this->worker_transform_id; - this->resolution = this->worker_resolution; - this->head_matrix = this->worker_head_matrix; - this->head_to_eye_matrices = this->worker_head_to_eye_matrices; - this->projection_matrices = this->worker_projection_matrices; - - this->controller_states = this->worker_controller_states; - this->controller_transforms = this->worker_controller_transforms; - this->controller_buttons = this->worker_controller_buttons; - this->controller_thumbsticks = this->worker_controller_thumbsticks; + LatencyCapture capture; + capture.frame_id = frame_id; + capture.frame_number = frame_number; + capture.transform_id = transform_id; + capture.timestamp_transform_query = timestamp_transform_query; + capture.timestamp_server_response = timestamp_server_response; + capture.timestamp_frame_decoded = timestamp_frame_decoded; + capture.timestamp_command_construct = timestamp_command_construct; + capture.timestamp_command_executed = timestamp_command_executed; + + std::unique_lock<std::mutex> lock(this->transport_mutex); + this->statistic_latency->submit_capture(capture); + lock.unlock(); } -void RemoteHeadset::update_stage(lava::delta delta_time) +void RemoteHeadset::on_transport_error() { - for (uint32_t index = 0; index < this->controller_buttons.size(); index++) - { - const std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>& buttons = this->controller_buttons[index]; - glm::vec2 thumbstick = this->controller_thumbsticks[index]; - - glm::mat4 controller_transform = this->controller_transforms[index]; - glm::vec3 controller_forward = controller_transform[2]; - glm::vec3 controller_sideward = controller_transform[0]; - - float strength = this->movement_speed * delta_time; - - if (buttons[BUTTON_ID_TRIGGER] == BUTTON_STATE_PRESSED) - { - strength *= 2.0f; - } - - stage_position += controller_forward * thumbstick.y * strength; - stage_position -= controller_sideward * thumbstick.x * strength; - } - - glm::mat4 stage_matrix = glm::translate(glm::mat4(1.0f), this->stage_position); - this->send_packet_stage_transform(stage_matrix); - - this->view_matrix = this->head_matrix * stage_matrix; - - for (uint32_t index = 0; index < this->controller_matrices.size(); index++) - { - this->controller_matrices[index] = glm::inverse(stage_matrix) * this->controller_transforms[index]; - } + lava::log()->error("Remote Headset: Transport error detected!"); } bool RemoteHeadset::convert_strategy(StereoStrategyType strategy_type, PacketStereoStrategyId& strategy_id) diff --git a/src/headset/remote_headset.hpp b/src/headset/remote_headset.hpp index 9319f406..1381e483 100644 --- a/src/headset/remote_headset.hpp +++ b/src/headset/remote_headset.hpp @@ -50,20 +50,26 @@ public: bool is_remote() const override; private: + bool create_transport(); + bool create_framebuffers(); void destroy_framebuffers(); bool create_encoders(); void destroy_encoders(); - void on_packet_setup(const std::span<uint8_t>& buffer); - void on_packet_projection_change(const std::span <uint8_t>& buffer); - void on_packet_head_transform(const std::span<uint8_t>& buffer); - void on_packet_controller_transform(const std::span<uint8_t>& buffer); - void on_packet_controller_event(const std::span<uint8_t>& buffer); - void on_packet_controller_button(const std::span<uint8_t>& buffer); - void on_packet_controller_thumbstick(const std::span<uint8_t>& buffer); - void on_packet_latency(const std::span<uint8_t>& buffer); + void update_values(); + void update_bitrates(lava::delta delta_time); + void update_stage(lava::delta delta_time); + + void on_setup(const glm::u32vec2& resolution); + void on_projection_change(const glm::mat4& left_eye_projection, const glm::mat4& right_eye_projection, const glm::mat4& left_head_to_eye, const glm::mat4& right_head_to_eye); + void on_head_transform(PacketTransformId transform_id, const glm::mat4& head_transform); + void on_controller_transform(PacketControllerId controller_id, const glm::mat4& controller_transform); + void on_controller_event(PacketControllerId controller_id, PacketControllerState controller_state); + void on_controller_button(PacketControllerId controller_id, PacketButtonId button_id, PacketButtonState button_state); + void on_controller_thumbstick(PacketControllerId controller_id, const glm::vec2& thumbstick); + void on_latency(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, double timestamp_transform_query, double timestamp_server_response, double timestamp_frame_decoded, double timestamp_command_construct, double timestamp_command_executed); void on_transport_error(); static bool convert_strategy(StereoStrategyType strategy_type, PacketStereoStrategyId& strategy_id); @@ -74,35 +80,41 @@ private: const float near_plane = 0.1f; const float far_plane = 1000.0f; float movement_speed = 1.0f; - - glm::uvec2 resolution; + glm::vec3 stage_position = glm::vec3(0.0f); glm::mat4 view_matrix; - glm::mat4 head_matrix; std::array<glm::mat4, 2> head_to_eye_matrices; std::array<glm::mat4, 2> projection_matrices; std::array<glm::mat4, 2> controller_matrices; - std::array<PacketControllerState, 2> controller_states; - std::array<glm::mat4, 2> controller_transforms; - std::array<glm::vec2, 2> controller_thumbsticks; - std::array<std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>, 2> controller_buttons; + Transport::Ptr transport; + std::mutex transport_mutex; + + glm::uvec2 resolution; //NOTE: Protected by transport_mutex + glm::mat4 head_matrix; //NOTE: Protected by transport_mutex + std::array<glm::mat4, 2> transport_head_to_eye_matrices; //NOTE: Protected by transport_mutex + std::array<glm::mat4, 2> transport_projection_matrices; //NOTE: Protected by transport_mutex + + std::array<PacketControllerState, 2> transport_controller_states; //NOTE: Protected by transport_mutex + std::array<glm::mat4, 2> controller_transforms; //NOTE: Protected by transport_mutex + std::array<glm::vec2, 2> controller_thumbsticks; //NOTE: Protected by transport_mutex + std::array<std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>, 2> controller_buttons; //NOTE: Protected by transport_mutex + + PacketStereoStrategyId strategy_id = STEREO_STRATEGY_ID_NATIVE_STEREO; uint32_t frame_number = 0; uint32_t frame_id_count = 0; - uint32_t transform_id = 0; + uint32_t transform_id = 0; //NOTE: Protected by transport_mutex + std::vector<lava::image::ptr> framebuffers; + std::vector<Encoder::Ptr> encoders; uint32_t encoder_mode = ENCODER_MODE_CONSTANT_QUALITY; uint32_t encoder_key_rate = 120; - uint32_t encoder_bit_rate = 6000; //NOTE: Bitrate in kbit per seconds + uint32_t encoder_bit_rate = 6000; //NOTE: Bitrate in kbit per seconds uint32_t encoder_frame_rate = 90; uint32_t encoder_quality = 35; - Transport::Ptr transport; - - std::vector<lava::image::ptr> framebuffers; - std::vector<Encoder::Ptr> encoders; - - Statistic::Ptr statistic_bitrate; - LatencyStatistic::Ptr statistic_latency; + Statistic::Ptr statistic_send_bitrate; + Statistic::Ptr statistic_receive_bitrate; + LatencyStatistic::Ptr statistic_latency; //NOTE: Protected by transport_mutex }; \ No newline at end of file diff --git a/src/stream/datagram.cpp b/src/stream/datagram.cpp index c7acc96c..e04c6275 100644 --- a/src/stream/datagram.cpp +++ b/src/stream/datagram.cpp @@ -1,10 +1,11 @@ #include "datagram.hpp" +#include <liblava/lava.hpp> DatagramPool::~DatagramPool() { for (const Datagram& datagam : this->pool) { - delete[] datagam.buffer; + lava::free_data(datagam.buffer); } this->pool.clear(); @@ -17,7 +18,7 @@ Datagram DatagramPool::acquire_datagram() if (this->pool.empty()) { datagram.size = DATAGRAM_SIZE; - datagram.buffer = new uint8_t[datagram.size]; + datagram.buffer = (uint8_t*)lava::alloc_data(datagram.size); } else diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index a28219ed..c6273b34 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -1,13 +1,16 @@ #include "transport.hpp" #include <liblava/lava.hpp> +#include <chrono> bool Transport::create(uint32_t port_number) { + this->server_timer = asio::high_resolution_timer(this->server_context); this->server_endpoint = asio::ip::udp::endpoint(asio::ip::udp::v4(), port_number); this->server_socket = asio::ip::udp::socket(this->server_context); - this->server_socket.bind(this->server_endpoint); + this->server_socket->open(asio::ip::udp::v4()); + this->server_socket->bind(this->server_endpoint); - this->setup_received = false; + this->set_state(TRANSPORT_STATE_WAITING); this->worker_thread = std::thread([this]() { @@ -24,8 +27,19 @@ void Transport::destroy() this->server_context.stop(); this->worker_thread.join(); - this->setup_received = false; - this->server_socket.close(); + this->client_endpoint.reset(); + + if (this->server_socket.has_value()) + { + this->server_socket->close(); + this->server_socket.reset(); + } + + if (this->server_timer.has_value()) + { + this->server_timer->cancel(); + this->server_timer.reset(); + } for (const Datagram& datagram : this->send_queue) { @@ -37,18 +51,38 @@ void Transport::destroy() this->datagram_pool.release_datagram(datagram); } + for (const Datagram& datagram : this->error_queue) + { + this->datagram_pool.release_datagram(datagram); + } + this->send_queue.clear(); this->receive_queue.clear(); + this->error_queue.clear(); + + this->bits_send = 0; + this->bits_received = 0; + this->bitrate_send = 0.0; + this->bitrate_received = 0.0; + + this->set_state(TRANSPORT_STATE_UNITIALIZED); } -bool Transport::wait_setup() +bool Transport::wait_connect() { + if (this->get_state() != TRANSPORT_STATE_WAITING) + { + lava::log()->error("Transport: Can't wait for connection!"); + + return false; + } + std::unique_lock<std::mutex> lock(this->worker_mutex); - std::chrono::seconds wait_time(60); + std::chrono::seconds wait_time(TRANSPORT_CLIENT_CONNECT_TIMEOUT); std::chrono::high_resolution_clock::time_point last_time = std::chrono::high_resolution_clock::now(); - while (!this->setup_received && wait_time.count() > 0) + while (this->state == TRANSPORT_STATE_WAITING && wait_time.count() > 0) { this->worker_setup_condition.wait_for(lock, wait_time); @@ -57,12 +91,14 @@ bool Transport::wait_setup() last_time = current_time; } - return this->setup_received; + lock.unlock(); + + return this->get_state() == TRANSPORT_STATE_CONNECTED; } bool Transport::send_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane) { - if (!this->client_endpoint.has_value()) + if (this->get_state() != TRANSPORT_STATE_CONNECTED) { lava::log()->error("Transport: Can't send setup complete packet since not connection is established!"); @@ -88,7 +124,7 @@ bool Transport::send_setup_complete(PacketStereoStrategyId strategy_id, uint32_t bool Transport::send_stage_transform(const glm::mat4& stage_transform) { - if (!this->client_endpoint.has_value()) + if (this->get_state() != TRANSPORT_STATE_CONNECTED) { lava::log()->error("Transport: Can't send stage transform packet since not connection is established!"); @@ -111,7 +147,7 @@ bool Transport::send_stage_transform(const glm::mat4& stage_transform) bool Transport::send_frame_config_nal(PacketFrameId frame_id, const std::span<uint8_t>& content) { - if (!this->client_endpoint.has_value()) + if (this->get_state() != TRANSPORT_STATE_CONNECTED) { lava::log()->error("Transport: Can't send frame config nal packet since not connection is established!"); @@ -145,7 +181,7 @@ bool Transport::send_frame_config_nal(PacketFrameId frame_id, const std::span<ui bool Transport::send_frame_nal(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, const std::span<uint8_t>& content) { - if (!this->client_endpoint.has_value()) + if (this->get_state() != TRANSPORT_STATE_CONNECTED) { lava::log()->error("Transport: Can't send frame nal packet since not connection is established!"); @@ -181,7 +217,7 @@ bool Transport::send_frame_nal(PacketFrameNumber frame_number, PacketFrameId fra bool Transport::send_frame_metadata(PacketFrameNumber frame_number, const std::span<uint8_t>& content) { - if (!this->client_endpoint.has_value()) + if (this->get_state() != TRANSPORT_STATE_CONNECTED) { lava::log()->error("Transport: Can't send frame metadata packet since not connection is established!"); @@ -228,7 +264,7 @@ void Transport::set_on_head_transform(OnHeadTransform function) this->on_head_transform = std::move(function); } -void Transport::set_on_controller_transfrom(OnControllerTransform function) +void Transport::set_on_controller_transform(OnControllerTransform function) { this->on_controller_transform = std::move(function); } @@ -263,15 +299,90 @@ asio::io_context& Transport::get_context() return this->server_context; } -uint32_t Transport::get_bitrate() +TransportState Transport::get_state() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + TransportState value = this->state; + lock.unlock(); + + return value; +} + +double Transport::get_bitrate_send() { std::unique_lock<std::mutex> lock(this->worker_mutex); - uint32_t value = this->bitrate; + double value = this->bitrate_send; lock.unlock(); return value; } +double Transport::get_bitrate_received() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + double value = this->bitrate_received; + lock.unlock(); + + return value; +} + +void Transport::set_state(TransportState state) +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->state = state; + lock.unlock(); +} + +void Transport::check_state() +{ + std::chrono::high_resolution_clock::time_point start_time = std::chrono::high_resolution_clock::now(); + + this->server_timer->expires_at(start_time + std::chrono::milliseconds(TRANSPORT_STATE_CHECK_INTERVALL)); + this->server_timer->async_wait([this, start_time](const asio::error_code& error_code) + { + if (error_code) + { + lava::log()->error("Transport: Error during state check! Message: {}", error_code.message()); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + return; + } + + std::chrono::high_resolution_clock::time_point end_time = std::chrono::high_resolution_clock::now(); + double delta_time = (double)std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() / 1000.0; //NOTE: delta_time in seconds + + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->bitrate_send = ((double)this->bits_send / (double)delta_time) / (1000.0 * 1000.0); + this->bitrate_received = ((double)this->bits_received / (double)delta_time) / (1000.0 * 1000.0); + lock.unlock(); + + if (this->bits_received > 0) + { + this->inactive_time = 0; + } + + else + { + this->inactive_time += delta_time; + } + + if (this->inactive_time > TRANSPORT_CLIENT_TIMEOUT) + { + lava::log()->error("Transport: Client timeout"); + this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + return; + } + + this->bits_send = 0; + this->bits_received = 0; + + this->check_state(); + }); +} + void Transport::send_datagram(const Datagram& datagram) { std::unique_lock<std::mutex> lock(this->worker_mutex); @@ -289,27 +400,38 @@ void Transport::receive_datagram() Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - this->server_socket.async_receive_from(asio::buffer(datagram.buffer, datagram.size), this->receive_endpoint, [this, datagram](const asio::error_code& error_code, std::size_t bytes) + this->server_socket->async_receive_from(asio::buffer(datagram.buffer, datagram.size), this->receive_endpoint, [this, datagram](const asio::error_code& error_code, std::size_t bytes) { if (error_code) { lava::log()->error("Transport: Error during datagram receive! Message: {}", error_code.message()); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + this->error_queue.push_back(datagram); return; } + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->bits_received += bytes * 8; + lock.unlock(); + if (!this->client_endpoint.has_value()) { lava::log()->debug("Client connected"); this->client_endpoint = this->receive_endpoint; + this->check_state(); } - else + else if (this->receive_endpoint != this->client_endpoint) { lava::log()->error("Transport: Client already connected!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + this->error_queue.push_back(datagram); return; } @@ -331,22 +453,32 @@ void Transport::receive_datagram() case PACKET_ID_SETUP_COMPLETE: lava::log()->error("Transport: Received setup complete packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); return; case PACKET_ID_FRAME_CONFIG_NAL: lava::log()->error("Transport: Received frame config nal packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); return; case PACKET_ID_FRAME_NAL: lava::log()->error("Transport: Received frame nal packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); return; case PACKET_ID_FRAME_METADATA: lava::log()->error("Transport: Received frame metadata packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); return; default: lava::log()->error("Transport: Received unknown packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + this->error_queue.push_back(datagram); return; } @@ -368,19 +500,23 @@ void Transport::process_send_queue() } Datagram datagram = this->send_queue.front(); + const PacketHeader* header = (const PacketHeader*)datagram.buffer; - this->server_socket.async_send_to(asio::buffer(datagram.buffer, datagram.size), this->client_endpoint.value(), [this, datagram](const asio::error_code& error_code, std::size_t bytes) + this->server_socket->async_send_to(asio::buffer(datagram.buffer, header->size), this->client_endpoint.value(), [this, datagram](const asio::error_code& error_code, std::size_t bytes) { if (error_code) { lava::log()->error("Transport: Error during send of datagram! Message: {}", error_code.message()); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); + + this->error_queue.push_back(datagram); return; } std::unique_lock<std::mutex> lock(this->worker_mutex); - this->bitrate += bytes * 8; //TODO:!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + this->bits_send += bytes * 8; this->send_queue.erase(this->send_queue.begin()); this->datagram_pool.release_datagram(datagram); @@ -397,18 +533,17 @@ void Transport::process_receive_queue() return; } - if (!this->setup_received) + if (this->get_state() == TRANSPORT_STATE_WAITING) { const Datagram& datagram = this->receive_queue.back(); const PacketHeader* packet = (const PacketHeader*)datagram.buffer; if (packet->id == PACKET_ID_SETUP) { + this->set_state(TRANSPORT_STATE_CONNECTED); this->parse_setup(datagram); - this->setup_received = true; this->worker_setup_condition.notify_all(); - this->receive_queue.pop_back(); std::unique_lock<std::mutex> lock(this->worker_mutex); @@ -417,7 +552,7 @@ void Transport::process_receive_queue() } } - if (this->setup_received) + if (this->get_state() == TRANSPORT_STATE_CONNECTED) { for (const Datagram& datagram : this->receive_queue) { @@ -452,6 +587,7 @@ void Transport::process_receive_queue() default: lava::log()->error("Transport: Unknown packet during receive queue process!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -472,6 +608,7 @@ void Transport::parse_setup(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for setup packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -487,6 +624,7 @@ void Transport::parse_projection_change(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for projection change packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -502,6 +640,7 @@ void Transport::parse_head_transform(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for head transform packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -517,6 +656,7 @@ void Transport::parse_controller_transform(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for controller transform packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -532,6 +672,7 @@ void Transport::parse_controller_event(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for controller event packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -547,6 +688,7 @@ void Transport::parse_controller_button(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for controller button packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -562,6 +704,7 @@ void Transport::parse_controller_thumbstick(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for controller thumbstick packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } @@ -577,6 +720,7 @@ void Transport::parse_latency(const Datagram& datagram) { lava::log()->error("Transport: Wrong packet id for latency packet!"); this->on_transport_error(); + this->set_state(TRANSPORT_STATE_ERROR); return; } diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index c56f12fa..7ea0bbd4 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -2,12 +2,24 @@ #include <glm/glm.hpp> #include <asio.hpp> #include <memory> -#include <optional> #include <span> +#include <optional> #include "datagram.hpp" #include "packet.hpp" +#define TRANSPORT_STATE_CHECK_INTERVALL 10 //NOTE: In milliseconds +#define TRANSPORT_CLIENT_CONNECT_TIMEOUT 60 //NOTE: In seconds +#define TRANSPORT_CLIENT_TIMEOUT 2 //NOTE: In seconds + +enum TransportState +{ + TRANSPORT_STATE_UNITIALIZED, + TRANSPORT_STATE_WAITING, + TRANSPORT_STATE_CONNECTED, + TRANSPORT_STATE_ERROR +}; + class Transport { public: @@ -20,7 +32,7 @@ public: typedef std::function<void(PacketControllerId controller_id, PacketControllerState controller_state)> OnControllerEvent; typedef std::function<void(PacketControllerId controller_id, PacketButtonId button_id, PacketButtonState button_state)> OnControllerButton; typedef std::function<void(PacketControllerId controller_id, const glm::vec2& thumbstick)> OnControllerThumbstick; - typedef std::function<void(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, double timestamp_transfrom_query, double timestamp_server_response, double timestamp_frame_decoded, double timestamp_command_construct, double timestamp_command_executed)> OnLatency; + typedef std::function<void(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, double timestamp_transform_query, double timestamp_server_response, double timestamp_frame_decoded, double timestamp_command_construct, double timestamp_command_executed)> OnLatency; typedef std::function<void()> OnTransportError; public: @@ -28,7 +40,7 @@ public: bool create(uint32_t port_number); void destroy(); - bool wait_setup(); + bool wait_connect(); //NOTE: The following functions should be thread safe bool send_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane); @@ -42,18 +54,23 @@ public: void set_on_setup(OnSetup function); void set_on_projection_change(OnProjectionChange function); void set_on_head_transform(OnHeadTransform function); - void set_on_controller_transfrom(OnControllerTransform function); + void set_on_controller_transform(OnControllerTransform function); void set_on_controller_event(OnControllerEvent function); void set_on_controller_button(OnControllerButton function); void set_on_controller_thumbstick(OnControllerThumbstick function); void set_on_latency(OnLatency function); void set_on_transport_error(OnTransportError function); - //NOTE: The following functions should be thread safe + //NOTE: The following function should be thread safe asio::io_context& get_context(); - uint32_t get_bitrate(); + TransportState get_state(); + double get_bitrate_send(); //NOTE: The bitrate is given in MBits/s + double get_bitrate_received(); //NOTE: The bitrate is given in MBits/s private: + void set_state(TransportState state); + void check_state(); + void send_datagram(const Datagram& datagram); void receive_datagram(); @@ -75,17 +92,23 @@ private: std::condition_variable worker_setup_condition; asio::io_context server_context; + std::optional<asio::high_resolution_timer> server_timer; asio::ip::udp::endpoint server_endpoint; - asio::ip::udp::socket server_socket; + std::optional<asio::ip::udp::socket> server_socket; asio::ip::udp::endpoint receive_endpoint; std::optional<asio::ip::udp::endpoint> client_endpoint; - DatagramPool datagram_pool; //NOTE: Protected by worker_mutex - std::vector<Datagram> send_queue; //NOTE: Protected by worker_mutex - std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread - - uint32_t bitrate = 0; //NOTE: Protected by worker_mutex - bool setup_received = false; //NOTE: Protected by worker_mutex + DatagramPool datagram_pool; //NOTE: Protected by worker_mutex + std::vector<Datagram> send_queue; //NOTE: Protected by worker_mutex + std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped + std::vector<Datagram> error_queue; //NOTE: Owned by worker_thread and accessible after the thread has been stoped + + TransportState state = TRANSPORT_STATE_UNITIALIZED; //NOTE: Protected by worker_mutex + uint32_t bits_send = 0; //NOTE: Owned by worker_thread + uint32_t bits_received = 0; //NOTE: Owned by worker_thread + double bitrate_send = 0.0; //NOTE: Protected by worker_mutex + double bitrate_received = 0.0; //NOTE: Protected by worker_mutex + double inactive_time = 0.0; //NOTE: Owned by worker_thread OnSetup on_setup; OnProjectionChange on_projection_change; -- GitLab