diff --git a/src/headset/remote_headset.cpp b/src/headset/remote_headset.cpp index edd8f34eff0023b5c30007cf4d59099072a075e2..7ca510ef00608f185769032d453b6aee6572d6d9 100644 --- a/src/headset/remote_headset.cpp +++ b/src/headset/remote_headset.cpp @@ -10,11 +10,6 @@ RemoteHeadset::RemoteHeadset() { buttons.fill(BUTTON_STATE_RELEASED); } - - for (std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>& buttons : this->worker_controller_buttons) - { - buttons.fill(BUTTON_STATE_RELEASED); - } } bool RemoteHeadset::on_setup_instance(lava::frame_config& config) @@ -51,16 +46,16 @@ bool RemoteHeadset::on_create() StereoStrategy::Ptr strategy = this->get_application()->get_stereo_strategy(); this->frame_id_count = strategy->get_max_remote_frame_ids(); - if (!this->create_server()) - { - lava::log()->error("Remote Headset: Can't create server!"); + this->transport = make_transport(); - return false; - } + if (!this->transport->create(this->server_port)) + { + lava::log()->error("Remote Headset: Can't create server!"); + } lava::log()->debug("Waiting for connection"); - if (!this->wait_setup()) + if (!this->transport->wait_setup()) { lava::log()->error("Remote Headset: Incomplete setup handshake!"); @@ -83,7 +78,7 @@ bool RemoteHeadset::on_create() return false; } - return !this->check_error(); + return true; } void RemoteHeadset::on_destroy() @@ -278,47 +273,6 @@ bool RemoteHeadset::is_remote() const return true; } -bool RemoteHeadset::create_server() -{ - this->server_endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), this->server_port); - this->server_acceptor = asio::ip::tcp::acceptor(this->server_context, this->server_endpoint); - - this->wait_connect(); - - this->worker_error = false; - this->worker_setup = false; - - this->worker_thread = std::thread([this]() - { - this->server_context.run(); - }); - - return true; -} - -void RemoteHeadset::destroy_server() -{ - this->server_context.stop(); - this->worker_thread.join(); - - if (this->client_socket.has_value()) - { - this->client_socket->close(); - } - - if (this->server_acceptor.has_value()) - { - this->server_acceptor->close(); - } - - for (const Transmission& transmission : this->worker_transmissions) - { - delete[] transmission.buffer; - } - - this->worker_transmissions.clear(); -} - bool RemoteHeadset::create_framebuffers() { this->framebuffers.resize(this->frame_id_count); @@ -361,7 +315,7 @@ bool RemoteHeadset::create_encoders() lava::renderer& renderer = this->get_application()->get_renderer(); uint32_t frame_count = this->get_application()->get_frame_count(); - if (!encoder->create(device, renderer, this->server_context.get_executor(), this->resolution, frame_count)) + if (!encoder->create(device, renderer, this->transport->get_context().get_executor(), this->resolution, frame_count)) { return false; } @@ -388,22 +342,6 @@ void RemoteHeadset::destroy_encoders() this->encoders.clear(); } -void RemoteHeadset::on_client_accept(asio::ip::tcp::socket& socket) -{ - if (this->client_socket.has_value()) - { - lava::log()->error("Remote Headset: Client already connected!"); - this->on_transport_error(); - - return; - } - - lava::log()->debug("Client connected"); - - this->client_socket = std::move(socket); - this->receive_header(); -} - void RemoteHeadset::on_packet_setup(const std::span<uint8_t>& buffer) { PacketSetup* packet = (PacketSetup*)buffer.data(); @@ -632,388 +570,6 @@ void RemoteHeadset::update_stage(lava::delta delta_time) } } -bool RemoteHeadset::check_error() -{ - std::unique_lock<std::mutex> lock(this->worker_mutex); - - return this->worker_error; -} - -bool RemoteHeadset::wait_setup() -{ - std::unique_lock<std::mutex> lock(this->worker_mutex); - - std::chrono::seconds wait_time(60); - std::chrono::high_resolution_clock::time_point last_time = std::chrono::high_resolution_clock::now(); - - while (!this->worker_setup && wait_time.count() > 0) - { - this->worker_setup_condition.wait_for(lock, wait_time); - - std::chrono::high_resolution_clock::time_point current_time = std::chrono::high_resolution_clock::now(); - wait_time -= std::chrono::duration_cast<std::chrono::seconds>(current_time - last_time); - last_time = current_time; - } - - return this->worker_setup && !this->worker_error; -} - -void RemoteHeadset::wait_connect() -{ - this->server_acceptor->async_accept([this](const asio::error_code& error_code, asio::ip::tcp::socket socket) - { - if (error_code) - { - lava::log()->error("Remote Headset: Error during clinet accept! Message: {}", error_code.message()); - this->on_transport_error(); - - return; - } - - this->on_client_accept(socket); - }); -} - -void RemoteHeadset::receive_header() -{ - uint32_t buffer_size = sizeof(PacketHeader); - uint8_t* buffer_ptr = new uint8_t[buffer_size]; - - asio::async_read(this->client_socket.value(), asio::buffer(buffer_ptr, buffer_size), [this, buffer_ptr](const asio::error_code& error_code, std::size_t bytes) - { - if (error_code) - { - lava::log()->error("Remote Headset: Error during packet header receive! Message: {}", error_code.message()); - this->on_transport_error(); - } - - else - { - PacketHeader* header = (PacketHeader*)buffer_ptr; - this->receive_content(*header); - } - - delete[] buffer_ptr; - }); -} - -void RemoteHeadset::receive_datagram() -{ - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - this->client_socket->async_receive(asio::buffer(datagram.buffer, datagram.size), [this, datagram](const asio::error_code& error_code, std::size_t bytes) - { - if (error_code) - { - lava::log()->error("Remote Headset: Error during datagram receive! Message: {}", error_code.message()); - this->on_transport_error(); - - return; - } - - const PacketHeader* packet = (const PacketHeader*)datagram.buffer; - - switch (packet->id) - { - case PACKET_ID_SETUP: - case PACKET_ID_PROJECTION_CHANGE: - case PACKET_ID_HEAD_TRANSFORM: - case PACKET_ID_CONTROLLER_TRANSFORM: - case PACKET_ID_CONTROLLER_EVENT: - case PACKET_ID_CONTROLLER_BUTTON: - case PACKET_ID_CONTROLLER_THUMBSTICK: - case PACKET_ID_LATENCY: - this-> - case PACKET_ID_SETUP_COMPLETE: - lava::log()->error("Remote Headset: Received setup complete packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_CONFIG_NAL: - lava::log()->error("Remote Headset: Received frame config nal packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_NAL: - lava::log()->error("Remote Headset: Received frame nal packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_METADATA: - lava::log()->error("Remote Headset: Received frame metadata packet!"); - this->on_transport_error(); - return; - default: - lava::log()->error("Remote Headset: Received unknown packet!"); - this->on_transport_error(); - return; - } - - - - - - - }); - - - - - uint32_t buffer_size = sizeof(uint8_t) * header.size; - uint8_t* buffer_ptr = new uint8_t[buffer_size]; - - memcpy(buffer_ptr, &header, sizeof(PacketHeader)); - - asio::async_read(this->client_socket.value(), asio::buffer(buffer_ptr + sizeof(PacketHeader), buffer_size - sizeof(PacketHeader)), [this, header, buffer_ptr, buffer_size](const asio::error_code& error_code, std::size_t bytes) - { - if (error_code) - { - lava::log()->error("Remote Headset: Error during packet content receive! Message: {}", error_code.message()); - this->on_transport_error(); - - return; - } - - else - { - std::span<uint8_t> buffer(buffer_ptr, buffer_size); - - switch (header.id) - { - case PACKET_ID_SETUP: - this->on_packet_setup(buffer); - break; - case PACKET_ID_PROJECTION_CHANGE: - this->on_packet_projection_change(buffer); - break; - case PACKET_ID_HEAD_TRANSFORM: - this->on_packet_head_transform(buffer); - break; - case PACKET_ID_CONTROLLER_TRANSFORM: - this->on_packet_controller_transform(buffer); - break; - case PACKET_ID_CONTROLLER_EVENT: - this->on_packet_controller_event(buffer); - break; - case PACKET_ID_CONTROLLER_BUTTON: - this->on_packet_controller_button(buffer); - break; - case PACKET_ID_CONTROLLER_THUMBSTICK: - this->on_packet_controller_thumbstick(buffer); - break; - case PACKET_ID_LATENCY: - this->on_packet_latency(buffer); - break; - case PACKET_ID_SETUP_COMPLETE: - lava::log()->error("Remote Headset: Received setup complete packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_CONFIG_NAL: - lava::log()->error("Remote Headset: Received frame config nal packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_NAL: - lava::log()->error("Remote Headset: Received frame nal packet!"); - this->on_transport_error(); - return; - case PACKET_ID_FRAME_METADATA: - lava::log()->error("Remote Headset: Received frame metadata packet!"); - this->on_transport_error(); - return; - default: - lava::log()->error("Remote Headset: Received unknown packet!"); - this->on_transport_error(); - return; - } - - this->receive_header(); - } - - delete[] buffer_ptr; - }); -} - -void RemoteHeadset::send_datagram(const Datagram& datagram) -{ - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_send_datagrams.push_back(datagram); - - if (this->worker_send_datagrams.size() <= 1) - { - this->send_datagram_queue(); - } -} - -void RemoteHeadset::send_datagram_queue() -{ - if (!this->client_socket.has_value()) - { - return; - } - - if (this->worker_send_datagrams.empty()) - { - return; - } - - Datagram datagram = this->worker_send_datagrams.front(); - - this->client_socket->async_send(asio::buffer(datagram.buffer, datagram.size), [this, datagram](const asio::error_code& error_code, std::size_t bytes) - { - if (error_code) - { - lava::log()->error("Remote Headset: Error during send of datagram! Message: {}", error_code.message()); - this->on_transport_error(); - - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - this->worker_bits_send += bytes * 8; - - this->worker_send_datagrams.erase(this->worker_send_datagrams.begin()); - this->worker_datagram_pool.release_datagram(datagram); - - if (!this->worker_send_datagrams.empty()) - { - this->send_datagram_queue(); - } - lock.unlock(); - }); -} - -void RemoteHeadset::send_packet_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane) -{ - if (!this->client_socket.has_value()) - { - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - PacketSetupComplete* packet = (PacketSetupComplete*)datagram.buffer; - packet->id = PACKET_ID_SETUP_COMPLETE; - packet->size = sizeof(PacketSetupComplete); - packet->strategy_id = strategy_id; - packet->max_frame_ids = max_frame_ids; - packet->near_plane = near_plane; - packet->far_plane = far_plane; - - this->send_datagram(datagram); -} - -void RemoteHeadset::send_packet_stage_transform(const glm::mat4& stage_transform) -{ - if (!this->client_socket.has_value()) - { - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - PacketStageTransform* packet = (PacketStageTransform*)datagram.buffer; - packet->id = PACKET_ID_STAGE_TRANSFORM; - packet->size = sizeof(PacketStageTransform); - packet->stage_transform = stage_transform; - - this->send_datagram(datagram); -} - -void RemoteHeadset::send_packet_frame_config_nal(PacketFrameId frame_id, const std::span<uint8_t>& content) -{ - if (!this->client_socket.has_value()) - { - return; - } - - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - uint32_t size = sizeof(PacketFrameConfigNal) + sizeof(uint8_t) * content.size(); - - if (size > datagram.size) - { - lava::log()->error("Remote Headset: Frame config nal packet exceeds datagram size!"); - - return; - } - - PacketFrameConfigNal* packet = (PacketFrameConfigNal*)datagram.buffer; - packet->id = PACKET_ID_FRAME_CONFIG_NAL; - packet->size = size; - packet->frame_id = frame_id; - - memcpy(datagram.buffer + sizeof(PacketFrameConfigNal), content.data(), sizeof(uint8_t) * content.size()); - - this->send_datagram(datagram); -} - -void RemoteHeadset::send_packet_frame_nal(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, const std::span<uint8_t>& content) -{ - if (!this->client_socket.has_value()) - { - return; - } - - for (uint32_t offset = 0; offset < content.size();) - { - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - uint32_t size = glm::min(datagram.size, (uint32_t) content.size() - offset); - - PacketFrameNal* packet = (PacketFrameNal*)datagram.buffer; - packet->id = PACKET_ID_FRAME_NAL; - packet->size = size; - packet->frame_number = frame_number; - packet->frame_id = frame_id; - packet->transform_id = transform_id; - packet->payload_offset = offset; - packet->payload_size = content.size(); - - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*) content.data() + offset, sizeof(uint8_t) * size); - - this->send_datagram(datagram); - - offset += size; - } -} - -void RemoteHeadset::send_packet_frame_metadata(PacketFrameNumber frame_number, const std::span<uint8_t>& content) -{ - if (!this->client_socket.has_value()) - { - return; - } - - for (uint32_t offset = 0; offset < content.size();) - { - std::unique_lock<std::mutex> lock(this->worker_mutex); - Datagram datagram = this->worker_datagram_pool.acquire_datagram(); - lock.unlock(); - - uint32_t size = glm::min(datagram.size, (uint32_t)content.size() - offset); - - PacketFrameMetadata* packet = (PacketFrameMetadata*)datagram.buffer; - packet->id = PACKET_ID_FRAME_METADATA; - packet->size = size; - packet->frame_number = frame_number; - packet->payload_offset = offset; - packet->payload_size = content.size(); - - memcpy(datagram.buffer + sizeof(PacketFrameMetadata), (uint8_t*)content.data() + offset, sizeof(uint8_t) * size); - - this->send_datagram(datagram); - - offset += size; - } -} - bool RemoteHeadset::convert_strategy(StereoStrategyType strategy_type, PacketStereoStrategyId& strategy_id) { switch (strategy_type) diff --git a/src/headset/remote_headset.hpp b/src/headset/remote_headset.hpp index 3beb7e7c3803189f376eab36f862300613f561e6..9319f406ff2b7c2f87b79f9c84aa274d639a7742 100644 --- a/src/headset/remote_headset.hpp +++ b/src/headset/remote_headset.hpp @@ -1,21 +1,15 @@ #pragma once #include <liblava/lava.hpp> #include <glm/glm.hpp> -#include <asio.hpp> -#include <thread> #include <mutex> -#include <condition_variable> #include <vector> #include <array> -#include <span> -#include <optional> #include <memory> #include "headset.hpp" #include "strategy/stereo_strategy.hpp" -#include "stream/datagram.hpp" -#include "stream/packet.hpp" +#include "stream/transport.hpp" #include "utility/encoder.hpp" #include "utility/extern_fence.hpp" @@ -56,16 +50,12 @@ public: bool is_remote() const override; private: - bool create_server(); - void destroy_server(); - bool create_framebuffers(); void destroy_framebuffers(); bool create_encoders(); void destroy_encoders(); - void on_client_accept(asio::ip::tcp::socket& socket); void on_packet_setup(const std::span<uint8_t>& buffer); void on_packet_projection_change(const std::span <uint8_t>& buffer); void on_packet_head_transform(const std::span<uint8_t>& buffer); @@ -76,26 +66,6 @@ private: void on_packet_latency(const std::span<uint8_t>& buffer); void on_transport_error(); - void update_values(); - void update_stage(lava::delta delta_time); - bool check_error(); - - bool wait_setup(); - void wait_connect(); - - void receive_datagram(); - void send_datagram(const Datagram& datagram); - - void process_send_queue(); - void process_receive_queue(); - void send_datagram_queue(); //NOTE: Protected by wroker_mutex - - void send_packet_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane); - void send_packet_stage_transform(const glm::mat4& stage_transform); - void send_packet_frame_config_nal(PacketFrameId frame_id, const std::span<uint8_t>& content); - void send_packet_frame_nal(PacketFrameNumber frame_number, PacketFrameId frame_id, PacketTransformId transform_id, const std::span<uint8_t>& content); - void send_packet_frame_metadata(PacketFrameNumber frame_number, const std::span<uint8_t>& content); - static bool convert_strategy(StereoStrategyType strategy_type, PacketStereoStrategyId& strategy_id); private: @@ -104,14 +74,7 @@ private: const float near_plane = 0.1f; const float far_plane = 1000.0f; float movement_speed = 1.0f; - - asio::io_context server_context; - asio::ip::udp::endpoint server_endpoint; - std::optional<asio::ip::udp::socket> client_socket; - uint32_t transform_id = 0; - uint32_t bits_send = 0; - glm::uvec2 resolution; glm::vec3 stage_position = glm::vec3(0.0f); glm::mat4 view_matrix; @@ -122,11 +85,12 @@ private: std::array<PacketControllerState, 2> controller_states; std::array<glm::mat4, 2> controller_transforms; - std::array<std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>, 2> controller_buttons; std::array<glm::vec2, 2> controller_thumbsticks; + std::array<std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>, 2> controller_buttons; uint32_t frame_number = 0; uint32_t frame_id_count = 0; + uint32_t transform_id = 0; uint32_t encoder_mode = ENCODER_MODE_CONSTANT_QUALITY; uint32_t encoder_key_rate = 120; @@ -134,34 +98,11 @@ private: uint32_t encoder_frame_rate = 90; uint32_t encoder_quality = 35; + Transport::Ptr transport; + std::vector<lava::image::ptr> framebuffers; std::vector<Encoder::Ptr> encoders; Statistic::Ptr statistic_bitrate; LatencyStatistic::Ptr statistic_latency; - -private: - std::thread worker_thread; - std::mutex worker_mutex; - std::condition_variable worker_setup_condition; - - bool worker_error = false; //NOTE: Protected by wroker_mutex - bool worker_setup = false; //NOTE: Protected by wroker_mutex - - uint32_t worker_transform_id = 0; //NOTE: Protected by wroker_mutex - uint32_t worker_bits_send = 0; //NOTE: Protected by wroker_mutex - - glm::uvec2 worker_resolution; //NOTE: Protected by wroker_mutex - glm::mat4 worker_head_matrix; //NOTE: Protected by worker_mutex - std::array<glm::mat4, 2> worker_head_to_eye_matrices; //NOTE: Protected by worker_mutex - std::array<glm::mat4, 2> worker_projection_matrices; //NOTE: Protected by worker_mutex - - std::array<PacketControllerState, 2> worker_controller_states; //NOTE: Protected by worker_mutex - std::array<glm::mat4, 2> worker_controller_transforms; //NOTE: Protected by worker_mutex - std::array<std::array<PacketButtonState, BUTTON_ID_MAX_COUNT>, 2> worker_controller_buttons; //NOTE: Protected by worker_mutex - std::array<glm::vec2, 2> worker_controller_thumbsticks; //NOTE: Protected by worker_mutex - - DatagramPool worker_datagram_pool; - std::vector<Datagram> worker_receive_datagrams; //NOTE: Protected by worker_mutex - std::vector<Datagram> worker_send_datagrams; //NOTE: Protected by worker_mutex }; \ No newline at end of file diff --git a/src/stream/transport.cpp b/src/stream/transport.cpp index 7ab9dfbba8ddc103c77ea015c740c3bf30158578..a28219ed1dcc1f1c162d00537d2db6d3276ce627 100644 --- a/src/stream/transport.cpp +++ b/src/stream/transport.cpp @@ -3,12 +3,61 @@ bool Transport::create(uint32_t port_number) { + this->server_endpoint = asio::ip::udp::endpoint(asio::ip::udp::v4(), port_number); + this->server_socket = asio::ip::udp::socket(this->server_context); + this->server_socket.bind(this->server_endpoint); + this->setup_received = false; + + this->worker_thread = std::thread([this]() + { + this->server_context.run(); + }); + + this->receive_datagram(); + + return true; } void Transport::destroy() { + this->server_context.stop(); + this->worker_thread.join(); + + this->setup_received = false; + this->server_socket.close(); + + for (const Datagram& datagram : this->send_queue) + { + this->datagram_pool.release_datagram(datagram); + } + + for (const Datagram& datagram : this->receive_queue) + { + this->datagram_pool.release_datagram(datagram); + } + + this->send_queue.clear(); + this->receive_queue.clear(); +} + +bool Transport::wait_setup() +{ + std::unique_lock<std::mutex> lock(this->worker_mutex); + + std::chrono::seconds wait_time(60); + std::chrono::high_resolution_clock::time_point last_time = std::chrono::high_resolution_clock::now(); + + while (!this->setup_received && wait_time.count() > 0) + { + this->worker_setup_condition.wait_for(lock, wait_time); + + std::chrono::high_resolution_clock::time_point current_time = std::chrono::high_resolution_clock::now(); + wait_time -= std::chrono::duration_cast<std::chrono::seconds>(current_time - last_time); + last_time = current_time; + } + return this->setup_received; } bool Transport::send_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane) @@ -240,7 +289,7 @@ void Transport::receive_datagram() Datagram datagram = this->datagram_pool.acquire_datagram(); lock.unlock(); - this->server_socket.async_receive(asio::buffer(datagram.buffer, datagram.size), [this, datagram](const asio::error_code& error_code, std::size_t bytes) + this->server_socket.async_receive_from(asio::buffer(datagram.buffer, datagram.size), this->receive_endpoint, [this, datagram](const asio::error_code& error_code, std::size_t bytes) { if (error_code) { @@ -250,6 +299,21 @@ void Transport::receive_datagram() return; } + if (!this->client_endpoint.has_value()) + { + lava::log()->debug("Client connected"); + + this->client_endpoint = this->receive_endpoint; + } + + else + { + lava::log()->error("Transport: Client already connected!"); + this->on_transport_error(); + + return; + } + const PacketHeader* packet = (const PacketHeader*)datagram.buffer; switch (packet->id) @@ -286,22 +350,238 @@ void Transport::receive_datagram() return; } + this->process_receive_queue(); + this->receive_datagram(); + }); +} +void Transport::process_send_queue() +{ + if (this->send_queue.empty()) + { + return; + } + + if (!this->client_endpoint.has_value()) + { + return; + } + Datagram datagram = this->send_queue.front(); + + this->server_socket.async_send_to(asio::buffer(datagram.buffer, datagram.size), this->client_endpoint.value(), [this, datagram](const asio::error_code& error_code, std::size_t bytes) + { + if (error_code) + { + lava::log()->error("Transport: Error during send of datagram! Message: {}", error_code.message()); + this->on_transport_error(); + return; + } + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->bitrate += bytes * 8; //TODO:!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + this->send_queue.erase(this->send_queue.begin()); + this->datagram_pool.release_datagram(datagram); + + this->process_send_queue(); + lock.unlock(); }); } -void Transport::process_send_queue() +void Transport::process_receive_queue() { + if (this->receive_queue.empty()) + { + return; + } + + if (!this->setup_received) + { + const Datagram& datagram = this->receive_queue.back(); + const PacketHeader* packet = (const PacketHeader*)datagram.buffer; + + if (packet->id == PACKET_ID_SETUP) + { + this->parse_setup(datagram); + + this->setup_received = true; + this->worker_setup_condition.notify_all(); + + this->receive_queue.pop_back(); + + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->datagram_pool.release_datagram(datagram); + lock.unlock(); + } + } + + if (this->setup_received) + { + for (const Datagram& datagram : this->receive_queue) + { + const PacketHeader* packet = (const PacketHeader*)datagram.buffer; + + switch (packet->id) + { + case PACKET_ID_SETUP: + this->parse_setup(datagram); + break; + case PACKET_ID_PROJECTION_CHANGE: + this->parse_projection_change(datagram); + break; + case PACKET_ID_HEAD_TRANSFORM: + this->parse_head_transform(datagram); + break; + case PACKET_ID_CONTROLLER_TRANSFORM: + this->parse_controller_transform(datagram); + break; + case PACKET_ID_CONTROLLER_EVENT: + this->parse_controller_event(datagram); + break; + case PACKET_ID_CONTROLLER_BUTTON: + this->parse_controller_button(datagram); + break; + case PACKET_ID_CONTROLLER_THUMBSTICK: + this->parse_controller_thumbstick(datagram); + break; + case PACKET_ID_LATENCY: + this->parse_latency(datagram); + break; + default: + lava::log()->error("Transport: Unknown packet during receive queue process!"); + this->on_transport_error(); + return; + } + + std::unique_lock<std::mutex> lock(this->worker_mutex); + this->datagram_pool.release_datagram(datagram); + lock.unlock(); + } + this->receive_queue.clear(); + } } -void Transport::process_receive_queue() +void Transport::parse_setup(const Datagram& datagram) +{ + PacketSetup* packet = (PacketSetup*)datagram.buffer; + + if (packet->id != PACKET_ID_SETUP) + { + lava::log()->error("Transport: Wrong packet id for setup packet!"); + this->on_transport_error(); + + return; + } + + this->on_setup(packet->resolution); +} + +void Transport::parse_projection_change(const Datagram& datagram) +{ + PacketProjectionChange* packet = (PacketProjectionChange*)datagram.buffer; + + if (packet->id != PACKET_ID_PROJECTION_CHANGE) + { + lava::log()->error("Transport: Wrong packet id for projection change packet!"); + this->on_transport_error(); + + return; + } + + this->on_projection_change(packet->left_eye_projection, packet->right_eye_projection, packet->left_head_to_eye, packet->right_head_to_eye); +} + +void Transport::parse_head_transform(const Datagram& datagram) +{ + PacketHeadTransform* packet = (PacketHeadTransform*)datagram.buffer; + + if (packet->id != PACKET_ID_HEAD_TRANSFORM) + { + lava::log()->error("Transport: Wrong packet id for head transform packet!"); + this->on_transport_error(); + + return; + } + + this->on_head_transform(packet->transform_id, packet->head_transform); +} + +void Transport::parse_controller_transform(const Datagram& datagram) +{ + PacketControllerTransform* packet = (PacketControllerTransform*)datagram.buffer; + + if (packet->id != PACKET_ID_CONTROLLER_TRANSFORM) + { + lava::log()->error("Transport: Wrong packet id for controller transform packet!"); + this->on_transport_error(); + + return; + } + + this->on_controller_transform(packet->controller_id, packet->controller_transform); +} + +void Transport::parse_controller_event(const Datagram& datagram) +{ + PacketControllerEvent* packet = (PacketControllerEvent*)datagram.buffer; + + if (packet->id != PACKET_ID_CONTROLLER_EVENT) + { + lava::log()->error("Transport: Wrong packet id for controller event packet!"); + this->on_transport_error(); + + return; + } + + this->on_controller_event(packet->controller_id, packet->controller_state); +} + +void Transport::parse_controller_button(const Datagram& datagram) { + PacketControllerButton* packet = (PacketControllerButton*)datagram.buffer; + + if (packet->id != PACKET_ID_CONTROLLER_BUTTON) + { + lava::log()->error("Transport: Wrong packet id for controller button packet!"); + this->on_transport_error(); + + return; + } + + this->on_controller_button(packet->controller_id, packet->button_id, packet->button_state); +} + +void Transport::parse_controller_thumbstick(const Datagram& datagram) +{ + PacketControllerThumbstick* packet = (PacketControllerThumbstick*)datagram.buffer; + + if (packet->id != PACKET_ID_CONTROLLER_THUMBSTICK) + { + lava::log()->error("Transport: Wrong packet id for controller thumbstick packet!"); + this->on_transport_error(); + + return; + } + + this->on_controller_thumbstick(packet->controller_id, packet->thumbstick); +} + +void Transport::parse_latency(const Datagram& datagram) +{ + PacketLatency* packet = (PacketLatency*)datagram.buffer; + + if (packet->id != PACKET_ID_LATENCY) + { + lava::log()->error("Transport: Wrong packet id for latency packet!"); + this->on_transport_error(); + + return; + } + this->on_latency(packet->frame_number, packet->frame_id, packet->transform_id, packet->timestamp_transform_query, packet->timestamp_server_response, packet->timestamp_frame_decoded, packet->timestamp_command_construct, packet->timestamp_command_executed); } Transport::Ptr make_transport() diff --git a/src/stream/transport.hpp b/src/stream/transport.hpp index a698ef84fc7ca59165e36d44265146eb595f5837..c56f12fa70bc4f3c0191f7056608bf1cfbb2167d 100644 --- a/src/stream/transport.hpp +++ b/src/stream/transport.hpp @@ -28,6 +28,7 @@ public: bool create(uint32_t port_number); void destroy(); + bool wait_setup(); //NOTE: The following functions should be thread safe bool send_setup_complete(PacketStereoStrategyId strategy_id, uint32_t max_frame_ids, float near_plane, float far_plane); @@ -61,15 +62,22 @@ private: void parse_setup(const Datagram& datagram); void parse_projection_change(const Datagram& datagram); - //TODO: ...... + void parse_head_transform(const Datagram& datagram); + void parse_controller_transform(const Datagram& datagram); + void parse_controller_event(const Datagram& datagram); + void parse_controller_button(const Datagram& datagram); + void parse_controller_thumbstick(const Datagram& datagram); + void parse_latency(const Datagram& datagram); private: std::thread worker_thread; std::mutex worker_mutex; + std::condition_variable worker_setup_condition; asio::io_context server_context; asio::ip::udp::endpoint server_endpoint; asio::ip::udp::socket server_socket; + asio::ip::udp::endpoint receive_endpoint; std::optional<asio::ip::udp::endpoint> client_endpoint; DatagramPool datagram_pool; //NOTE: Protected by worker_mutex @@ -77,6 +85,7 @@ private: std::vector<Datagram> receive_queue; //NOTE: Owned by worker_thread uint32_t bitrate = 0; //NOTE: Protected by worker_mutex + bool setup_received = false; //NOTE: Protected by worker_mutex OnSetup on_setup; OnProjectionChange on_projection_change;