diff --git a/streaming_recording_backend.cpp b/streaming_recording_backend.cpp index ad659713a3be20b9b287e79f543f66ca489f3e51..f6cf26da6dcd3eb83c68d0473b96bc68a737c5b3 100644 --- a/streaming_recording_backend.cpp +++ b/streaming_recording_backend.cpp @@ -27,33 +27,113 @@ #include <iostream> #include <memory> #include <sstream> +#include <thread> namespace streamingnest { StreamingRecordingBackend::StreamingRecordingBackend() {} +void StreamingRecordingBackend::initialize() { + // Called once + std::cout << "initialize()" << std::endl; +} + void StreamingRecordingBackend::enroll( const nest::RecordingDevice &device, const std::vector<Name> &double_value_names, const std::vector<Name> &long_value_names) { - std::cout << "double value names:" << std::endl; + // Called per thread + std::lock_guard<std::mutex> lock_guard(enroll_mutex_); + + std::vector<std::string> double_parameter_names; + double_parameter_names.reserve(double_value_names.size()); + for (const auto &value_name : double_value_names) { + double_parameter_names.push_back(value_name.toString()); + } + + std::vector<std::string> long_parameter_names; + long_parameter_names.reserve(long_value_names.size()); + for (const auto &value_name : long_value_names) { + long_parameter_names.push_back(value_name.toString()); + } + + if (device.get_type() == nest::RecordingDevice::Type::MULTIMETER) { + devices_[std::this_thread::get_id()][device.get_name()] = + std::make_unique<nesci::producer::NestMultimeter>( + device.get_name(), double_parameter_names, long_parameter_names); + } + + std::cout << std::this_thread::get_id() << ' '; + std::cout << device.get_name() << ' '; + std::cout << "double_value_names: "; for (const auto &name : double_value_names) { - std::cout << name << std::endl; + std::cout << name << ' '; } - std::cout << "\nlong value names:" << std::endl; + std::cout << ", long_value_names: "; for (const auto &name : long_value_names) { - std::cout << name << std::endl; + std::cout << name << ' '; } + std::cout << std::endl; } -void StreamingRecordingBackend::initialize() {} +void StreamingRecordingBackend::write(const nest::RecordingDevice &device, + const nest::Event &event, + const std::vector<double> &double_values, + const std::vector<long> &long_values) { + // Called per thread + // std::lock_guard<std::mutex> lock_guard(write_mutex_); + // std::cout << std::this_thread::get_id() << ' '; + // std::cout << device.get_name() << ' '; + // std::cout << event.get_sender_gid() << ' '; + // std::cout << event.get_stamp() << ' '; + // for (const auto value : double_values) { + // std::cout << value << ' '; + // } + // std::cout << ' '; + // for (const auto value : long_values) { + // std::cout << value << ' '; + // } + // std::cout << std::endl; + + const auto thread_devices = devices_.find(std::this_thread::get_id()); + if (thread_devices == devices_.end()) { + // std::cout << "Error: no devices assigned to this thread!" << std::endl; + return; + } + + const auto thread_device = thread_devices->second.find(device.get_name()); + if (thread_device == thread_devices->second.end()) { + // std::cout << "Error: device not found in this thread (device = " + // << device.get_name() << ")" << std::endl; + + return; + } + + auto &nesci_device = thread_device->second; -void StreamingRecordingBackend::finalize() {} + if (device.get_type() == nest::RecordingDevice::Type::MULTIMETER) { + auto multimeter = + static_cast<nesci::producer::NestMultimeter *>(nesci_device.get()); + multimeter->Record(event.get_stamp().get_ms(), event.get_sender_gid(), + double_values.data(), long_values.data()); + } +} -void StreamingRecordingBackend::synchronize() {} +void StreamingRecordingBackend::synchronize() { + // Called per thread + for (const auto &device : devices_.at(std::this_thread::get_id())) { + { + std::lock_guard<std::mutex> lock_guard(relay_mutex_); + relay_.Send(device.second->node(), false); + } + static_cast<nesci::producer::NestMultimeter *>(device.second.get()) + ->Clear(); + } +} -void StreamingRecordingBackend::write(const nest::RecordingDevice &, - const nest::Event &, - const std::vector<double> &, - const std::vector<long> &) {} +void StreamingRecordingBackend::finalize() { + // Called once + std::cout << "finalize()" << std::endl; } + +} // namespace streamingnest diff --git a/streaming_recording_backend.h b/streaming_recording_backend.h index 145fdfed9b5325c1606d8575d7607534097b60b3..ede20e6b3761b28fb4be86f4d6a153c2fae85778 100644 --- a/streaming_recording_backend.h +++ b/streaming_recording_backend.h @@ -25,6 +25,7 @@ #include <map> #include <memory> +#include <mutex> #include <string> #include <vector> @@ -55,8 +56,15 @@ class StreamingRecordingBackend : public nest::RecordingBackend { const std::vector<double> &, const std::vector<long> &) override; private: + std::mutex relay_mutex_; contra::Relay<contra::SharedMemoryTransport> relay_; - std::map<nest::index, std::unique_ptr<nesci::producer::Device>> recorders_; + + std::mutex write_mutex_; + + std::map<std::thread::id, + std::map<std::string, std::unique_ptr<nesci::producer::Device>>> + devices_; + std::mutex enroll_mutex_; }; } // namespace streamingnest