Skip to content
Snippets Groups Projects
Commit 6982dd58 authored by Simon Oehrl's avatar Simon Oehrl
Browse files

Merge branch 'feature/add-database' into 'develop'

Feature/add database

See merge request VR-Group/in-situ-pipeline/insite-nest-module!10
parents f1d5c5b7 8e2d2be8
No related branches found
No related tags found
1 merge request!10Feature/add database
Pipeline #163421 passed
...@@ -105,6 +105,7 @@ project( ${MODULE_NAME} CXX ) ...@@ -105,6 +105,7 @@ project( ${MODULE_NAME} CXX )
# Add cpprestsdk # Add cpprestsdk
find_package(cpprestsdk REQUIRED) find_package(cpprestsdk REQUIRED)
find_package(libpqxx REQUIRED)
# Get the Python executable (for help generation). # Get the Python executable (for help generation).
execute_process( execute_process(
...@@ -283,7 +284,7 @@ if ( BUILD_SHARED_LIBS ) ...@@ -283,7 +284,7 @@ if ( BUILD_SHARED_LIBS )
install( TARGETS ${MODULE_NAME}_module install( TARGETS ${MODULE_NAME}_module
DESTINATION ${CMAKE_INSTALL_LIBDIR} DESTINATION ${CMAKE_INSTALL_LIBDIR}
) )
target_link_libraries(${MODULE_NAME}_module PRIVATE cpprestsdk::cpprest) target_link_libraries(${MODULE_NAME}_module PRIVATE cpprestsdk::cpprest pqxx)
endif () endif ()
# Build dynamic/static library for standard linking from NEST. # Build dynamic/static library for standard linking from NEST.
......
...@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \ ...@@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \
libwebsocketpp-dev openssl libssl-dev ninja-build \ libwebsocketpp-dev openssl libssl-dev ninja-build \
openmpi-bin libopenmpi-dev openmpi-bin libopenmpi-dev
RUN pip3 install Cython RUN pip3 install Cython
RUN git clone --single-branch --branch nest-3 https://github.com/nest/nest-simulator.git nest && \ RUN git clone --single-branch --branch nest-3 https://github.com/nest/nest-simulator.git nest && \
cd nest && \ cd nest && \
git checkout 5c0f41230dda9e4b99b8df89729ea43b340246ad && \ git checkout 5c0f41230dda9e4b99b8df89729ea43b340246ad && \
...@@ -20,6 +21,7 @@ RUN cmake \ ...@@ -20,6 +21,7 @@ RUN cmake \
-DCMAKE_BUILD_TYPE=Release \ -DCMAKE_BUILD_TYPE=Release \
/nest /nest
RUN ninja && ninja install RUN ninja && ninja install
RUN git clone --single-branch --branch v2.10.14 --recurse-submodules https://github.com/microsoft/cpprestsdk.git /cpprestsdk RUN git clone --single-branch --branch v2.10.14 --recurse-submodules https://github.com/microsoft/cpprestsdk.git /cpprestsdk
WORKDIR /cpprestsdk-build WORKDIR /cpprestsdk-build
RUN cmake \ RUN cmake \
...@@ -28,6 +30,17 @@ RUN cmake \ ...@@ -28,6 +30,17 @@ RUN cmake \
-DBUILD_TESTS=OFF \ -DBUILD_TESTS=OFF \
/cpprestsdk /cpprestsdk
RUN ninja && ninja install RUN ninja && ninja install
RUN apt-get update && apt-get install -y libpq-dev postgresql-server-dev-all
RUN git clone --single-branch --branch 6.4.6 https://github.com/jtv/libpqxx.git /libpqxx
WORKDIR /libpqxx-build
RUN cmake \
-G Ninja \
-DCMAKE_BUILD_TYPE=Release \
-DBUILD_TESTS=OFF \
/libpqxx
RUN ninja && ninja install
COPY . /insite COPY . /insite
WORKDIR /insite-build WORKDIR /insite-build
RUN cmake \ RUN cmake \
......
...@@ -20,26 +20,18 @@ namespace insite { ...@@ -20,26 +20,18 @@ namespace insite {
RecordingBackendInsite::RecordingBackendInsite() RecordingBackendInsite::RecordingBackendInsite()
: data_storage_("tgest"), : data_storage_("tgest"),
http_server_("http://0.0.0.0:" + get_port_string(), &data_storage_), http_server_("http://0.0.0.0:" + get_port_string(), &data_storage_),
info_node_("http://info-node:8080"), database_connection_("postgresql://postgres@database") {
address_("insite-nest-module:" + get_port_string()) { pqxx::work txn(database_connection_);
web::uri_builder builder("/node"); simulation_node_id_ = txn.exec1(
builder.append_query("node_type", "nest_simulation", true); "INSERT INTO nest_simulation_node (address) "
builder.append_query("address", address_, true); "VALUES ('http://insite-nest-module:" +
get_port_string() +
try { "') "
info_node_.request(web::http::methods::PUT, builder.to_string()) "RETURNING id;")[0]
.then([](const web::http::http_response& response) { .as<int>();
if (response.status_code() != web::http::status_codes::OK) { std::cout << "Simulation node registered to database. Node ID: "
throw std::runtime_error(response.to_string()); << simulation_node_id_ << std::endl;
} txn.commit();
})
.wait();
} catch (const std::exception& exception) {
std::cerr << "Failed to register to info node: \n"
<< exception.what() << "\n"
<< std::endl;
throw;
}
} }
RecordingBackendInsite::~RecordingBackendInsite() throw() {} RecordingBackendInsite::~RecordingBackendInsite() throw() {}
...@@ -76,12 +68,32 @@ void RecordingBackendInsite::set_value_names( ...@@ -76,12 +68,32 @@ void RecordingBackendInsite::set_value_names(
if (device.get_type() == nest::RecordingDevice::MULTIMETER) { if (device.get_type() == nest::RecordingDevice::MULTIMETER) {
auto& multimeter = multimeter_infos_.at(device.get_node_id()); auto& multimeter = multimeter_infos_.at(device.get_node_id());
for (auto& name : double_value_names) std::stringstream multimeter_query;
multimeter.double_attributes.push_back(name.toString()); multimeter_query << "INSERT INTO nest_multimeter (id, attributes) "
for (auto& name : long_value_names) << "VALUES (" << device.get_node_id() << ",\'{";
multimeter.long_attributes.push_back(name.toString());
bool first = true;
for (auto& name : double_value_names) {
const auto& name_string = name.toString();
multimeter.double_attributes.push_back(name_string);
multimeter_query << (first ? "" : ",") << '\"' << name_string << "\"";
first = false;
}
for (auto& name : long_value_names) {
const auto& name_string = name.toString();
multimeter.long_attributes.push_back(name_string);
multimeter_query << (first ? "" : ",") << '\"' << name_string << "\"";
first = false;
}
multimeter_query << "}\') ON CONFLICT DO NOTHING;";
multimeter.needs_update = true; multimeter.needs_update = true;
pqxx::work txn(database_connection_);
txn.exec0(multimeter_query.str());
txn.commit();
} }
} }
...@@ -102,63 +114,58 @@ void RecordingBackendInsite::post_run_hook() { ...@@ -102,63 +114,58 @@ void RecordingBackendInsite::post_run_hook() {
void RecordingBackendInsite::post_step_hook() { void RecordingBackendInsite::post_step_hook() {
// Send simulation time // Send simulation time
{ {
web::uri_builder builder("/current_time"); pqxx::work txn(database_connection_);
builder.append_query("time", latest_simulation_time_, false); txn.exec0(
builder.append_query("node_address", address_, true); "UPDATE nest_simulation_node "
"SET current_simulation_time = " +
info_node_.request(web::http::methods::PUT, builder.to_string()) std::to_string(latest_simulation_time_) +
.then([](const web::http::http_response& response) { ""
if (response.status_code() != web::http::status_codes::OK) { "WHERE id = " +
std::cerr << "Failed to send time to info node: \n" std::to_string(simulation_node_id_));
<< response.to_string() << "\n" txn.commit();
<< std::endl;
throw std::runtime_error(response.to_string());
}
}).wait(); // TODO: this wait definitely needs to go!
} }
if (new_neuron_infos_.size() > 0) { if (new_neuron_infos_.size() > 0) {
// Send new gids std::stringstream neuron_query;
web::uri_builder builder("/gids"); neuron_query << "INSERT INTO nest_neuron (id, simulation_node_id, "
"population_id, position) "
<< "VALUES ";
for (auto& neuron_info : new_neuron_infos_) { for (auto& neuron_info : new_neuron_infos_) {
builder.append_query("gids", neuron_info.gid, false); const bool first = neuron_info.gid == new_neuron_infos_[0].gid;
} if (!first) {
builder.append_query("address", address_, true); neuron_query << ",";
}
info_node_.request(web::http::methods::PUT, builder.to_string())
.then([](const web::http::http_response& response) { uint64_t population_id = 0;
if (response.status_code() != web::http::status_codes::OK) { for (const nest::NodeIDTriple& node_id_triple :
std::cerr << "Failed to send gids to info node: \n" *neuron_info.gid_collection.get()) {
<< response.to_string() << "\n" population_id ^= node_id_triple.node_id * 938831;
<< std::endl; }
throw std::runtime_error(response.to_string());
} neuron_query << "(" << neuron_info.gid << "," << simulation_node_id_
}).wait(); << "," << population_id % 0x800000;
// Send new properties const auto position_size = neuron_info.position.size();
builder = web::uri_builder("/neuron_properties"); if (position_size > 0) {
web::json::value request_body = assert(position_size <= 3);
web::json::value::array(new_neuron_infos_.size()); neuron_query << ",\'{";
for (std::size_t i = 0; i < new_neuron_infos_.size(); ++i) { for (size_t i = 0; i < position_size; ++i) {
auto& neuron_info = new_neuron_infos_[i]; if (i > 0) {
request_body[i]["gid"] = neuron_info.gid; neuron_query << ",";
request_body[i]["properties"] = web::json::value(); }
if (!neuron_info.position.empty()) neuron_query << neuron_info.position[i];
request_body[i]["properties"]["position"] = }
web::json::value::array(std::vector<web::json::value>( neuron_query << "}\'";
neuron_info.position.begin(), neuron_info.position.end())); } else {
} neuron_query << ",NULL";
}
info_node_ neuron_query << ")";
.request(web::http::methods::PUT, builder.to_string(), request_body) }
.then([](const web::http::http_response& response) { neuron_query << ";";
if (response.status_code() != web::http::status_codes::OK) {
std::cerr << "Failed to send neuron properties to info node: \n" pqxx::work txn(database_connection_);
<< response.to_string() << "\n" txn.exec0(neuron_query.str());
<< std::endl; txn.commit();
throw std::runtime_error(response.to_string());
}
}).wait();
neuron_infos_.insert(neuron_infos_.end(), new_neuron_infos_.begin(), neuron_infos_.insert(neuron_infos_.end(), new_neuron_infos_.begin(),
new_neuron_infos_.end()); new_neuron_infos_.end());
...@@ -169,64 +176,28 @@ void RecordingBackendInsite::post_step_hook() { ...@@ -169,64 +176,28 @@ void RecordingBackendInsite::post_step_hook() {
// Send multimeter info // Send multimeter info
for (auto& kvp : multimeter_infos_) { for (auto& kvp : multimeter_infos_) {
auto& multimeter = kvp.second; auto& multimeter = kvp.second;
if (!multimeter.needs_update) if (!multimeter.needs_update) continue;
continue;
multimeter.needs_update = false; multimeter.needs_update = false;
web::uri_builder builder("/multimeter_info"); if (multimeter.gids.size() > 0) {
builder.append_query("id", multimeter.device_id, true); std::stringstream neuron_multimeter_query;
for (auto attribute : multimeter.double_attributes) neuron_multimeter_query
builder.append_query("attributes", attribute, true); << "INSERT INTO nest_neuron_multimeter (neuron_id, multimeter_id) "
for (auto attribute : multimeter.long_attributes) << "VALUES ";
builder.append_query("attributes", attribute, true);
for (auto gid : multimeter.gids) for (const auto& neuron_id : multimeter.gids) {
builder.append_query("gids", gid, false); const bool first = neuron_id == multimeter.gids[0];
neuron_multimeter_query << (first ? "" : ",") << "(" << neuron_id << ","
try { << multimeter.device_id << ")";
info_node_.request(web::http::methods::PUT, builder.to_string()) }
.then([](const web::http::http_response& response) {
if (response.status_code() != web::http::status_codes::OK) { neuron_multimeter_query << " ON CONFLICT DO NOTHING;";
throw std::runtime_error(response.to_string());
} pqxx::work txn(database_connection_);
}) txn.exec0(neuron_multimeter_query.str());
.wait(); txn.commit();
} catch (const std::exception& exception) {
std::cerr << "Failed to put multimeter info: \n"
<< exception.what() << "\n"
<< std::endl;
throw;
}
}
// Send new collections
for (const auto& node_collection : node_collections_to_register_) {
web::uri_builder builder("/populations");
for (auto node_id_triple : *node_collection) {
builder.append_query("gids", node_id_triple.node_id, false);
}
// Initialize to "invalid" state
registered_node_collections_[node_collection] = -1;
info_node_.request(web::http::methods::PUT, builder.to_string())
.then([this,
node_collection](const web::http::http_response& response) {
if (response.status_code() != web::http::status_codes::OK) {
std::cerr << "Failed to send population to info node: \n"
<< response.to_string() << "\n"
<< std::endl;
throw std::runtime_error(response.to_string());
} else {
response.extract_json().then(
[this, node_collection](const web::json::value& population_id) {
std::cout << "Got id for node collection " << node_collection
<< ": " << population_id << std::endl;
registered_node_collections_[node_collection] =
population_id.as_number().to_int64();
});
} }
})
.wait(); // Wait because it may cause a race condition
} }
node_collections_to_register_.clear();
} }
void RecordingBackendInsite::write(const nest::RecordingDevice& device, void RecordingBackendInsite::write(const nest::RecordingDevice& device,
...@@ -251,12 +222,12 @@ void RecordingBackendInsite::write(const nest::RecordingDevice& device, ...@@ -251,12 +222,12 @@ void RecordingBackendInsite::write(const nest::RecordingDevice& device,
} }
for (std::size_t i = 0; i < double_values.size(); ++i) for (std::size_t i = 0; i < double_values.size(); ++i)
data_storage_.AddMultimeterMeasurement(device_id, data_storage_.AddMultimeterMeasurement(
multimeter.double_attributes[i], time_stamp, sender_gid, device_id, multimeter.double_attributes[i], time_stamp, sender_gid,
double_values[i]); double_values[i]);
for (std::size_t i = 0; i < long_values.size(); ++i) for (std::size_t i = 0; i < long_values.size(); ++i)
data_storage_.AddMultimeterMeasurement(device_id, data_storage_.AddMultimeterMeasurement(
multimeter.long_attributes[i], time_stamp, sender_gid, device_id, multimeter.long_attributes[i], time_stamp, sender_gid,
double(long_values[i])); double(long_values[i]));
} }
latest_simulation_time_ = std::max(latest_simulation_time_, time_stamp); latest_simulation_time_ = std::max(latest_simulation_time_, time_stamp);
...@@ -278,20 +249,6 @@ void RecordingBackendInsite::write(const nest::RecordingDevice& device, ...@@ -278,20 +249,6 @@ void RecordingBackendInsite::write(const nest::RecordingDevice& device,
std::lower_bound(new_neuron_infos_.begin(), new_neuron_infos_.end(), std::lower_bound(new_neuron_infos_.begin(), new_neuron_infos_.end(),
neuron_info), neuron_info),
neuron_info); neuron_info);
// Check if the node collection (population) is already sent to the
// info-node
const auto sender_node_collection = event.get_sender().get_nc();
if (registered_node_collections_.count(sender_node_collection) == 0 &&
!binary_search(node_collections_to_register_.begin(),
node_collections_to_register_.end(),
sender_node_collection)) {
node_collections_to_register_.insert(
std::lower_bound(node_collections_to_register_.begin(),
node_collections_to_register_.end(),
sender_node_collection),
sender_node_collection);
}
} }
} }
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <unordered_map> #include <unordered_map>
#include <cpprest/http_client.h> #include <cpprest/http_client.h>
#include <pqxx/pqxx>
#include "data_storage.hpp" #include "data_storage.hpp"
#include "http_server.hpp" #include "http_server.hpp"
...@@ -62,12 +63,10 @@ class RecordingBackendInsite : public nest::RecordingBackend { ...@@ -62,12 +63,10 @@ class RecordingBackendInsite : public nest::RecordingBackend {
DataStorage data_storage_; DataStorage data_storage_;
HttpServer http_server_; HttpServer http_server_;
web::http::client::http_client info_node_; pqxx::connection database_connection_;
std::string address_; int simulation_node_id_;
std::vector<NeuronInfo> neuron_infos_; std::vector<NeuronInfo> neuron_infos_;
std::vector<NeuronInfo> new_neuron_infos_; std::vector<NeuronInfo> new_neuron_infos_;
std::unordered_map<nest::NodeCollectionPTR, int64_t> registered_node_collections_;
std::vector<nest::NodeCollectionPTR> node_collections_to_register_;
std::unordered_map<nest::index, MultimeterInfo> multimeter_infos_; std::unordered_map<nest::index, MultimeterInfo> multimeter_infos_;
double latest_simulation_time_ = 0; double latest_simulation_time_ = 0;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment