Skip to content
Snippets Groups Projects
Select Git revision
  • e9d41edc59ed5ffd63337e17126a525e78532705
  • master default protected
  • feature/add-dockerfile
  • feature/#16_Wait_until_a_connection_is_established
  • feature/#12_Make_transport_protocol_configurable
  • stable protected
  • feature/#1_Add_VTK_Demo
7 results

streaming_recording_backend.cpp

Blame
  • streaming_recording_backend.cpp 5.81 KiB
    /*
     *  recording_backend_stream_niv.cpp
     *
     *  This file is part of NEST.
     *
     *  Copyright (C) 2004 The NEST Initiative
     *
     *  NEST is free software: you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation, either version 2 of the License, or
     *  (at your option) any later version.
     *
     *  NEST is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with NEST.  If not, see <http://www.gnu.org/licenses/>.
     *
     */
    
    #include "streaming_recording_backend.h"
    
    #include "recording_device.h"
    #include "nesci/producer/nest_multimeter.hpp"
    #include "nesci/producer/spike_detector.hpp"
    
    #include <iostream>
    #include <memory>
    #include <sstream>
    #include <thread>
    
    namespace streamingnest {
    
    StreamingRecordingBackend::StreamingRecordingBackend() {}
    
    void StreamingRecordingBackend::initialize() {
      // Called once
      std::cout << "initialize()" << std::endl;
    }
    
    void StreamingRecordingBackend::prepare() {
      std::cout << "prepare()" << std::endl;
      std::cout << "Get the number of nodes" << nest::kernel().node_manager.size() << std::endl;
    }
    
    void StreamingRecordingBackend::cleanup() {
      std::cout << "cleanup()" << std::endl;
    }
    
    void StreamingRecordingBackend::post_run_cleanup() {
      std::cout << "post_run_cleanup()" << std::endl;
    }
    
    void StreamingRecordingBackend::enroll(
        const nest::RecordingDevice &device,
        const std::vector<Name> &double_value_names,
        const std::vector<Name> &long_value_names) {
      // Called per thread
      std::lock_guard<std::mutex> lock_guard(enroll_mutex_);
    
      std::vector<std::string> double_parameter_names;
      double_parameter_names.reserve(double_value_names.size());
      for (const auto &value_name : double_value_names) {
        double_parameter_names.push_back(value_name.toString());
      }
    
      std::vector<std::string> long_parameter_names;
      long_parameter_names.reserve(long_value_names.size());
      for (const auto &value_name : long_value_names) {
        long_parameter_names.push_back(value_name.toString());
      }
    
      switch (device.get_type()) {
        case nest::RecordingDevice::Type::MULTIMETER:
          devices_[std::this_thread::get_id()][device.get_name()] =
              std::make_unique<nesci::producer::NestMultimeter>(
                  device.get_name(), double_parameter_names, long_parameter_names);
          std::cout << "Thread " << std::this_thread::get_id()
                    << ": enrolled multimeter `" << device.get_name()
                    << "` recording ";
          for (const auto &double_value : double_value_names) {
            std::cout << '`' << double_value << "` ";
          }
          for (const auto &long_value : long_value_names) {
            std::cout << '`' << long_value << "` ";
          }
          std::cout << std::endl;
          break;
    
        case nest::RecordingDevice::Type::SPIKE_DETECTOR:
          devices_[std::this_thread::get_id()][device.get_name()] =
              std::make_unique<nesci::producer::SpikeDetector>(device.get_name());
          std::cout << "Thread " << std::this_thread::get_id()
                    << ": enrolled spike detector `" << device.get_name() << "`"
                    << std::endl;
          std::cout << std::endl;
          break;
    
        default:
          std::cerr << "Invalid recording device type for " << device.get_name()
                    << std::endl;
          break;
      }
    }
    
    void StreamingRecordingBackend::write(const nest::RecordingDevice &device,
                                          const nest::Event &event,
                                          const std::vector<double> &double_values,
                                          const std::vector<long> &long_values) {
      const auto thread_devices = devices_.find(std::this_thread::get_id());
      if (thread_devices == devices_.end()) {
        std::cerr << "Error: no devices assigned to this thread!" << std::endl;
        return;
      }
    
      const auto thread_device = thread_devices->second.find(device.get_name());
      if (thread_device == thread_devices->second.end()) {
        std::cerr << "Error: device not found in this thread (device = "
                  << device.get_name() << ")" << std::endl;
        return;
      }
    
      auto &nesci_device = thread_device->second;
    
      if (device.get_type() == nest::RecordingDevice::Type::MULTIMETER) {
        auto multimeter =
            static_cast<nesci::producer::NestMultimeter *>(nesci_device.get());
        multimeter->Record(event.get_stamp().get_ms(), event.get_sender_gid(),
                           double_values.data(), long_values.data());
      } else if (device.get_type() == nest::RecordingDevice::Type::SPIKE_DETECTOR) {
        auto spike_detector =
            static_cast<nesci::producer::SpikeDetector *>(nesci_device.get());
        nesci::producer::SpikeDetector::Datum datum{event.get_stamp().get_ms(),
                                                    event.get_sender_gid()};
        spike_detector->Record(datum);
      } else {
        std::cerr << "Error: no device registered for `" << device.get_name() << '`'
                  << std::endl;
      }
    }
    
    void StreamingRecordingBackend::synchronize() {
      // Called per thread
    
      const auto thread_devices = devices_.find(std::this_thread::get_id());
      if (thread_devices == devices_.end()) {
        return;
      }
    
      for (const auto &device : thread_devices->second) {
        const auto node = device.second->node();
        // TODO: check if node is empty
        {
          std::lock_guard<std::mutex> lock_guard(relay_mutex_);
          relay_.Send(node, false);
        }
        device.second->Clear();
      }
    }
    
    void StreamingRecordingBackend::clear(const nest::RecordingDevice& device) {
      std::cout << "clear(" << device.get_name() << ")" << std::endl;
    }
    
    void StreamingRecordingBackend::finalize() {
      // Called once
      std::cout << "finalize()" << std::endl;
    }
    
    }  // namespace streamingnest