diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..fe9c10eb14716f1ef0600a13b6aad3aecf7628c5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*build/* \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6298d39114237e437ead3374196b9fa2aad66ca1 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,82 @@ +################################################## Project ################################################## +cmake_minimum_required(VERSION 3.10 FATAL_ERROR) +project (boost_mpi_extensions VERSION 1.0 LANGUAGES CXX) +list (APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) +set_property (GLOBAL PROPERTY USE_FOLDERS ON) +set (CMAKE_CXX_STANDARD 17) + +include (set_max_warning_level) +set_max_warning_level () + +################################################## Options ################################################## +option(BUILD_TESTS "Build tests." OFF) + +################################################## Sources ################################################## +file(GLOB_RECURSE PROJECT_HEADERS include/*.h include/*.hpp) +file(GLOB_RECURSE PROJECT_CMAKE_UTILS cmake/*.cmake) +file(GLOB_RECURSE PROJECT_MISC *.md *.txt) +set (PROJECT_FILES + ${PROJECT_HEADERS} + ${PROJECT_CMAKE_UTILS} + ${PROJECT_MISC}) + +include (assign_source_group) +assign_source_group(${PROJECT_FILES}) + +################################################## Dependencies ################################################## +include(import_library) + +find_package (Boost REQUIRED date_time mpi regex) +import_library(Boost_INCLUDE_DIRS Boost_DATE_TIME_LIBRARY_DEBUG Boost_DATE_TIME_LIBRARY_RELEASE) +import_library(Boost_INCLUDE_DIRS Boost_MPI_LIBRARY_DEBUG Boost_MPI_LIBRARY_RELEASE) +import_library(Boost_INCLUDE_DIRS Boost_REGEX_LIBRARY_DEBUG Boost_REGEX_LIBRARY_RELEASE) +import_library(Boost_INCLUDE_DIRS Boost_SERIALIZATION_LIBRARY_DEBUG Boost_SERIALIZATION_LIBRARY_RELEASE) + +find_package (Catch2 REQUIRED) +list (APPEND PROJECT_LIBRARIES Catch2::Catch2) + +################################################## Targets ################################################## +add_library(${PROJECT_NAME} INTERFACE) +target_include_directories(${PROJECT_NAME} INTERFACE + $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> + $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}> + $<INSTALL_INTERFACE:include>) +target_include_directories(${PROJECT_NAME} INTERFACE ${PROJECT_INCLUDE_DIRS}) +target_link_libraries (${PROJECT_NAME} INTERFACE ${PROJECT_LIBRARIES}) +target_compile_definitions(${PROJECT_NAME} INTERFACE ${PROJECT_COMPILE_DEFINITIONS}) + +# Hack for header-only project to appear in the IDEs. +add_library(${PROJECT_NAME}_ STATIC ${PROJECT_FILES}) +target_include_directories(${PROJECT_NAME}_ PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR}) +target_include_directories(${PROJECT_NAME}_ PUBLIC ${PROJECT_INCLUDE_DIRS}) +target_link_libraries (${PROJECT_NAME}_ PUBLIC ${PROJECT_LIBRARIES}) +target_compile_definitions(${PROJECT_NAME}_ PUBLIC ${PROJECT_COMPILE_DEFINITIONS}) +set_target_properties (${PROJECT_NAME}_ PROPERTIES LINKER_LANGUAGE CXX) + +################################################## Testing ################################################## +if(BUILD_TESTS) + enable_testing () + set (TEST_MAIN_NAME catch_main) + set (TEST_MAIN_SOURCES tests/catch/main.cpp) + add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES}) + set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch) + assign_source_group(${TEST_MAIN_SOURCES}) + + file(GLOB PROJECT_TEST_CPPS tests/*.cpp) + foreach(_SOURCE ${PROJECT_TEST_CPPS}) + get_filename_component(_NAME ${_SOURCE} NAME_WE) + add_executable (${_NAME} ${_SOURCE} $<TARGET_OBJECTS:${TEST_MAIN_NAME}>) + target_link_libraries (${_NAME} ${PROJECT_NAME}) + add_test (${_NAME} ${_NAME}) + set_property (TARGET ${_NAME} PROPERTY FOLDER tests) + assign_source_group (${_SOURCE}) + endforeach() +endif() + +################################################## Installation ################################################## +install(TARGETS ${PROJECT_NAME} EXPORT ${PROJECT_NAME}-config) +install(DIRECTORY include/ DESTINATION include) +install(EXPORT ${PROJECT_NAME}-config DESTINATION cmake) +export (TARGETS ${PROJECT_NAME} FILE ${PROJECT_NAME}-config.cmake) diff --git a/bootstrap.bat b/bootstrap.bat new file mode 100644 index 0000000000000000000000000000000000000000..8a25d6f19868450308ed66836971850f3033ec45 --- /dev/null +++ b/bootstrap.bat @@ -0,0 +1,17 @@ + + +if not exist "build" mkdir build +cd build +if not exist "vcpkg" git clone https://github.com/Microsoft/vcpkg.git +cd vcpkg +if not exist "vcpkg.exe" call bootstrap-vcpkg.bat + +set VCPKG_COMMAND=vcpkg install --recurse +set VCPKG_DEFAULT_TRIPLET=x64-windows +rem Add your library ports here. +%VCPKG_COMMAND% boost[mpi] catch2 +cd .. + +cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. +cmake --build . --target ALL_BUILD --config Release +cd .. \ No newline at end of file diff --git a/bootstrap.sh b/bootstrap.sh new file mode 100644 index 0000000000000000000000000000000000000000..ab2a537abf55d6e1328b97157931b8a63c6bb5a9 --- /dev/null +++ b/bootstrap.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ ! -d "build" ]; then mkdir build fi +cd build +if [ ! -d "vcpkg" ]; then git clone https://github.com/Microsoft/vcpkg.git fi +cd vcpkg +if [ ! -f "vcpkg" ]; then ./bootstrap-vcpkg.sh fi + +VCPKG_COMMAND=vcpkg install --recurse +VCPKG_DEFAULT_TRIPLET=x64-linux +# Add your library ports here. +$VCPKG_COMMAND boost[mpi] catch2 +cd .. + +cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. +cmake --build . --target ALL_BUILD --config Release +cd .. \ No newline at end of file diff --git a/cmake/assign_source_group.cmake b/cmake/assign_source_group.cmake new file mode 100644 index 0000000000000000000000000000000000000000..49e5d72ea9f66aa93ce9ee1f0ece280ec7791e80 --- /dev/null +++ b/cmake/assign_source_group.cmake @@ -0,0 +1,17 @@ +# Assigns the given files to source groups identical to their location. +function(assign_source_group) + foreach(_SOURCE IN ITEMS ${ARGN}) + if (IS_ABSOLUTE "${_SOURCE}") + file(RELATIVE_PATH _SOURCE_REL "${CMAKE_CURRENT_SOURCE_DIR}" "${_SOURCE}") + else() + set(_SOURCE_REL "${_SOURCE}") + endif() + get_filename_component(_SOURCE_PATH "${_SOURCE_REL}" PATH) + if(WIN32) + string(REPLACE "/" "\\" _SOURCE_PATH_MSVC "${_SOURCE_PATH}") + source_group("${_SOURCE_PATH_MSVC}" FILES "${_SOURCE}") + else() + source_group("${_SOURCE_PATH}" FILES "${_SOURCE}") + endif() + endforeach() +endfunction(assign_source_group) diff --git a/cmake/import_library.cmake b/cmake/import_library.cmake new file mode 100644 index 0000000000000000000000000000000000000000..c876601c8c2678eff9e7132cbdb2a48eae124d57 --- /dev/null +++ b/cmake/import_library.cmake @@ -0,0 +1,23 @@ +# Imports a library which is not built with cmake. +# The include directories are appended to the PROJECT_INCLUDE_DIRS variable. +# The libraries are appended to the PROJECT_LIBRARIES variable. +# Usage: +# Header Only: +# import_library(INCLUDE_DIRS) +# Identical Debug and Release: +# import_library(INCLUDE_DIRS LIBRARIES) +# Separate Debug and Release: +# import_library(INCLUDE_DIRS DEBUG_LIBRARIES RELEASE_LIBRARIES) +function(import_library INCLUDE_DIRS) + set (PROJECT_INCLUDE_DIRS ${PROJECT_INCLUDE_DIRS} ${${INCLUDE_DIRS}} PARENT_SCOPE) + set (_EXTRA_ARGS ${ARGN}) + list(LENGTH _EXTRA_ARGS _EXTRA_ARGS_LENGTH) + if (_EXTRA_ARGS_LENGTH EQUAL 1) + list(GET _EXTRA_ARGS 0 _LIBRARIES) + set (PROJECT_LIBRARIES ${PROJECT_LIBRARIES} ${${_LIBRARIES}} PARENT_SCOPE) + elseif(_EXTRA_ARGS_LENGTH EQUAL 2) + list(GET _EXTRA_ARGS 0 _DEBUG_LIBRARIES ) + list(GET _EXTRA_ARGS 1 _RELEASE_LIBRARIES) + set (PROJECT_LIBRARIES ${PROJECT_LIBRARIES} debug ${${_DEBUG_LIBRARIES}} optimized ${${_RELEASE_LIBRARIES}} PARENT_SCOPE) + endif () +endfunction(import_library) diff --git a/cmake/set_max_warning_level.cmake b/cmake/set_max_warning_level.cmake new file mode 100644 index 0000000000000000000000000000000000000000..381f7325bb6123c6f2baba35c7422fbd62498d17 --- /dev/null +++ b/cmake/set_max_warning_level.cmake @@ -0,0 +1,11 @@ +function(set_max_warning_level) + if(MSVC) + if(CMAKE_CXX_FLAGS MATCHES "/W[0-4]") + string(REGEX REPLACE "/W[0-4]" "/W4" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4") + endif() + else() + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-long-long -pedantic") + endif() +endfunction() \ No newline at end of file diff --git a/include/boost_mpi_extensions/boost_mpi_extensions.hpp b/include/boost_mpi_extensions/boost_mpi_extensions.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e3a639ea7c6ec32fe032a9ff5b2737b8ccb91014 --- /dev/null +++ b/include/boost_mpi_extensions/boost_mpi_extensions.hpp @@ -0,0 +1,121 @@ +#ifndef BOOST_MPI_EXTENSIONS_HPP +#define BOOST_MPI_EXTENSIONS_HPP + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <cstdint> +#include <list> +#include <mutex> +#include <thread> + +#include <mpi.h> + +namespace boost::mpi +{ +namespace detail +{ +typedef void MPI_Detach_callback (void*); +typedef void MPI_Detach_callback_status (void*, MPI_Status*); +typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*); + +std::int32_t MPI_Detach ( MPI_Request* request, MPI_Detach_callback* callback, void* data); +std::int32_t MPI_Detach_status ( MPI_Request* request, MPI_Detach_callback_status* callback, void* data); +std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* array_of_data[]); +std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_status* callback, void* array_of_data[]); +std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data); +std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data); + +struct single_request +{ + single_request(MPI_Request* request, MPI_Detach_callback* callback, void* data) + : request (*request) + , callback (callback) + , callback_status(nullptr) + , status () + , data (data) + , status_ptr (MPI_STATUS_IGNORE) + { + *request = MPI_REQUEST_NULL; + } + single_request(MPI_Request* request, MPI_Detach_callback_status* callback, void* data) + : request (*request) + , callback (nullptr) + , callback_status(callback) + , status () + , data (data) + , status_ptr (&status) + { + *request = MPI_REQUEST_NULL; + } + + MPI_Request request; + MPI_Detach_callback* callback; + MPI_Detach_callback_status* callback_status; + MPI_Status status; + void* data; + MPI_Status* status_ptr; // Pointer to status or MPI_STATUS_IGNORE. +}; +struct all_request +{ + all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data) + : count (count) + , request (new MPI_Request[count]) + , callback (callback) + , callback_statuses(nullptr) + , statuses (MPI_STATUSES_IGNORE) + , data (data) + { + for (auto i = 0; i < count; i++) + { + request [i] = array_of_requests[i]; + array_of_requests[i] = MPI_REQUEST_NULL; + } + } + all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data) + : count (count) + , request (new MPI_Request[count]) + , callback (nullptr) + , callback_statuses(callback) + , statuses (new MPI_Status[count]) + , data (data) { + for (auto i = 0; i < count; i++) + { + request [i] = array_of_requests[i]; + array_of_requests[i] = MPI_REQUEST_NULL; + } + } + ~all_request() + { + delete[] request; + if (statuses != MPI_STATUSES_IGNORE) + delete[] statuses; + } + + std::int32_t count; + MPI_Request* request; + MPI_Detach_callback* callback; + MPI_Detach_callback_statuses* callback_statuses; + MPI_Status* statuses; + void* data; +}; + +struct state +{ + std::atomic<std::int32_t> running {1}; + std::thread detach_thread; + std::mutex list_mutex; + std::condition_variable list_condition_variable; + std::once_flag once_flag; + std::int32_t initialized {0}; + std::list<single_request*> singleRequestsQueue {}; + std::list<all_request*> allRequestsQueue {}; + std::list<single_request*> singleRequests {}; + std::list<all_request*> allRequests {}; +}; + +static state global_state; +} +} + +#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/detach.hpp b/include/boost_mpi_extensions/detach.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f881e99c199b72192207000cd95300c621749c4a --- /dev/null +++ b/include/boost_mpi_extensions/detach.hpp @@ -0,0 +1,204 @@ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +using namespace std::chrono_literals; + +void run(void) { + while (running || !singleRequests.empty() || !allRequests.empty()) { + do { + std::unique_lock<std::mutex> lck(listMtx); + if (!singleRequestsQueue.empty()) + singleRequests.splice(singleRequests.begin(), singleRequestsQueue); + if (!allRequestsQueue.empty()) + allRequests.splice(allRequests.begin(), allRequestsQueue); + if (singleRequests.empty() && allRequests.empty()) + while (running && singleRequestsQueue.empty() && + allRequestsQueue.empty()) { + listCv.wait(lck); + } + } while (running && singleRequests.empty() && allRequests.empty()); + if (!singleRequests.empty()) { + auto iter = singleRequests.begin(); + auto end = singleRequests.end(); + while (iter != end) { + int flag; + MPI_Test(&(*iter)->req, &flag, (*iter)->statusP); + if (flag) { // + if ((*iter)->callback) + (*iter)->callback((*iter)->data); + else + (*iter)->callback_status((*iter)->data, (*iter)->statusP); + delete (*iter); + iter = singleRequests.erase(iter); + } else { + iter++; + } + } + } + if (!allRequests.empty()){ + auto iter = allRequests.begin(); + auto end = allRequests.end(); + while (iter != end) { + int flag; + MPI_Testall((*iter)->count, (*iter)->req, &flag, (*iter)->statuses); + if (flag) { // + if ((*iter)->callback) + (*iter)->callback((*iter)->data); + else + (*iter)->callback_statuses((*iter)->data, (*iter)->count, (*iter)->statuses); + delete (*iter); + iter = allRequests.erase(iter); + } else { + iter++; + } + } + } + std::this_thread::sleep_for(2ms); + } +} + +// using __attribute__((constructor)) does not work, +// because c++ std library might not be initialized yet +void initDetach() { + initialized = 1; + detachThread = std::thread(run); +} + +// This function is assigned to execute after +// main using __attribute__((destructor)) +void finiDetach() { + { + std::unique_lock<std::mutex> lck(listMtx); + // make sure all requests get finally processed + // access to the *Queue lists is shared, so need to be locked + while (!singleRequestsQueue.empty() || !allRequestsQueue.empty()) + { + listCv.notify_one(); + lck.unlock(); + std::this_thread::sleep_for(2ms); + lck.lock(); + }// after while, lock is always set for changing running and notify + running = 0; + listCv.notify_one(); + }// now wait for the progress thread to finish, i.e. all requests are handled + detachThread.join(); +} + +int MPI_Detach(MPI_Request *request, MPI_Detach_callback *callback, + void *data) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + MPI_Test(request, &flag, MPI_STATUS_IGNORE); + if (flag) { + callback(data); + } else { + std::unique_lock<std::mutex> lck(listMtx); + singleRequestsQueue.push_back(new singleRequest(request, callback, data)); + listCv.notify_one(); + } + return MPI_SUCCESS; +} + +int MPI_Detach_status(MPI_Request *request, + MPI_Detach_callback_status *callback, void *data) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + MPI_Status status; + MPI_Test(request, &flag, &status); + if (flag) { + callback(data, &status); + } else { + std::unique_lock<std::mutex> lck(listMtx); + singleRequestsQueue.push_back(new singleRequest(request, callback, data)); + listCv.notify_one(); + } + return MPI_SUCCESS; +} + +int MPI_Detach_each(int count, MPI_Request* array_of_requests, + MPI_Detach_callback *callback, void *array_of_data[]) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + for (int i = 0; i < count; i++) { + MPI_Test(array_of_requests + i, &flag, MPI_STATUS_IGNORE); + if (flag) { + callback(array_of_data[i]); + } else { + std::unique_lock<std::mutex> lck(listMtx); + singleRequestsQueue.push_back( + new singleRequest(array_of_requests + i, callback, array_of_data[i])); + listCv.notify_one(); + } + } + return MPI_SUCCESS; +} + +int MPI_Detach_each_status(int count, MPI_Request* array_of_requests, + MPI_Detach_callback_status *callback, + void *array_of_data[]) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + MPI_Status status; + for (int i = 0; i < count; i++) { + MPI_Test(array_of_requests + i, &flag, &status); + if (flag) { + callback(array_of_data[i], &status); + } else { + std::unique_lock<std::mutex> lck(listMtx); + singleRequestsQueue.push_back( + new singleRequest(array_of_requests + i, callback, array_of_data[i])); + listCv.notify_one(); + } + } + return MPI_SUCCESS; +} + +int MPI_Detach_all(int count, MPI_Request* array_of_requests, + MPI_Detach_callback *callback, void *data) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + MPI_Testall(count, array_of_requests, &flag, MPI_STATUSES_IGNORE); + if (flag) { + callback(data); + } else { + std::unique_lock<std::mutex> lck(listMtx); + allRequestsQueue.push_back( + new allRequest(count, array_of_requests, callback, data)); + listCv.notify_one(); + } + return MPI_SUCCESS; +} + +int MPI_Detach_all_status(int count, MPI_Request* array_of_requests, + MPI_Detach_callback_statuses *callback, void *data) { + if (!initialized) + std::call_once(onceFlag, initDetach); + int flag; + MPI_Status statuses[count]; + MPI_Testall(count, array_of_requests, &flag, statuses); + if (flag) { + callback(data, count, statuses); + } else { + std::unique_lock<std::mutex> lck(listMtx); + allRequestsQueue.push_back( + new allRequest(count, array_of_requests, callback, data)); + listCv.notify_one(); + } + return MPI_SUCCESS; +} + +int MPI_Finalize(){ + // we need to make sure, all communication is finished + // before calling MPI_Finalize + if (initialized) + finiDetach(); + + return PMPI_Finalize(); +} diff --git a/tests/catch/main.cpp b/tests/catch/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0c7c351f437f5f43f3bb62beb254a9f1ecbec5a0 --- /dev/null +++ b/tests/catch/main.cpp @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp"