diff --git a/CMakeLists.txt b/CMakeLists.txt index 6298d39114237e437ead3374196b9fa2aad66ca1..cbfb6e08c231dbef48c7cd0447db71e73c46bf4f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,10 @@ ################################################## Project ################################################## cmake_minimum_required(VERSION 3.10 FATAL_ERROR) -project (boost_mpi_extensions VERSION 1.0 LANGUAGES CXX) +project (boost_mpi_extensions VERSION 1.0 LANGUAGES C CXX) list (APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) set_property (GLOBAL PROPERTY USE_FOLDERS ON) set (CMAKE_CXX_STANDARD 17) +set (CMAKE_BUILD_TYPE Release) include (set_max_warning_level) set_max_warning_level () @@ -35,6 +36,9 @@ import_library(Boost_INCLUDE_DIRS Boost_SERIALIZATION_LIBRARY_DEBUG Boost_SERIAL find_package (Catch2 REQUIRED) list (APPEND PROJECT_LIBRARIES Catch2::Catch2) +find_package (MPI REQUIRED) +list (APPEND PROJECT_LIBRARIES MPI::MPI_C) + ################################################## Targets ################################################## add_library(${PROJECT_NAME} INTERFACE) target_include_directories(${PROJECT_NAME} INTERFACE @@ -57,18 +61,19 @@ set_target_properties (${PROJECT_NAME}_ PROPERTIES LINKER_LANGUAGE CXX) ################################################## Testing ################################################## if(BUILD_TESTS) - enable_testing () - set (TEST_MAIN_NAME catch_main) - set (TEST_MAIN_SOURCES tests/catch/main.cpp) - add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES}) - set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch) - assign_source_group(${TEST_MAIN_SOURCES}) + enable_testing () + set (TEST_MAIN_NAME catch_main) + set (TEST_MAIN_SOURCES tests/catch/main.cpp) + add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES}) + target_link_libraries (${TEST_MAIN_NAME} ${PROJECT_NAME} ${PROJECT_LIBRARIES}) + set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch) + assign_source_group (${TEST_MAIN_SOURCES}) file(GLOB PROJECT_TEST_CPPS tests/*.cpp) foreach(_SOURCE ${PROJECT_TEST_CPPS}) get_filename_component(_NAME ${_SOURCE} NAME_WE) add_executable (${_NAME} ${_SOURCE} $<TARGET_OBJECTS:${TEST_MAIN_NAME}>) - target_link_libraries (${_NAME} ${PROJECT_NAME}) + target_link_libraries (${_NAME} ${PROJECT_NAME} ${PROJECT_LIBRARIES}) add_test (${_NAME} ${_NAME}) set_property (TARGET ${_NAME} PROPERTY FOLDER tests) assign_source_group (${_SOURCE}) diff --git a/bootstrap.bat b/bootstrap.bat index 8a25d6f19868450308ed66836971850f3033ec45..461ecb42c21787b0a378901532ffae3d3b18fdb3 100644 --- a/bootstrap.bat +++ b/bootstrap.bat @@ -9,7 +9,7 @@ if not exist "vcpkg.exe" call bootstrap-vcpkg.bat set VCPKG_COMMAND=vcpkg install --recurse set VCPKG_DEFAULT_TRIPLET=x64-windows rem Add your library ports here. -%VCPKG_COMMAND% boost[mpi] catch2 +%VCPKG_COMMAND% boost[mpi] catch2 mpi cd .. cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. diff --git a/bootstrap.sh b/bootstrap.sh index ab2a537abf55d6e1328b97157931b8a63c6bb5a9..85a54a3600a2fd46ab3c9aede459246e95b3c51e 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -9,7 +9,7 @@ if [ ! -f "vcpkg" ]; then ./bootstrap-vcpkg.sh fi VCPKG_COMMAND=vcpkg install --recurse VCPKG_DEFAULT_TRIPLET=x64-linux # Add your library ports here. -$VCPKG_COMMAND boost[mpi] catch2 +$VCPKG_COMMAND boost[mpi] catch2 mpi cd .. cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. diff --git a/include/boost_mpi_extensions/boost_mpi_extensions.hpp b/include/boost_mpi_extensions/boost_mpi_extensions.hpp deleted file mode 100644 index e3a639ea7c6ec32fe032a9ff5b2737b8ccb91014..0000000000000000000000000000000000000000 --- a/include/boost_mpi_extensions/boost_mpi_extensions.hpp +++ /dev/null @@ -1,121 +0,0 @@ -#ifndef BOOST_MPI_EXTENSIONS_HPP -#define BOOST_MPI_EXTENSIONS_HPP - -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <cstdint> -#include <list> -#include <mutex> -#include <thread> - -#include <mpi.h> - -namespace boost::mpi -{ -namespace detail -{ -typedef void MPI_Detach_callback (void*); -typedef void MPI_Detach_callback_status (void*, MPI_Status*); -typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*); - -std::int32_t MPI_Detach ( MPI_Request* request, MPI_Detach_callback* callback, void* data); -std::int32_t MPI_Detach_status ( MPI_Request* request, MPI_Detach_callback_status* callback, void* data); -std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* array_of_data[]); -std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_status* callback, void* array_of_data[]); -std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data); -std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data); - -struct single_request -{ - single_request(MPI_Request* request, MPI_Detach_callback* callback, void* data) - : request (*request) - , callback (callback) - , callback_status(nullptr) - , status () - , data (data) - , status_ptr (MPI_STATUS_IGNORE) - { - *request = MPI_REQUEST_NULL; - } - single_request(MPI_Request* request, MPI_Detach_callback_status* callback, void* data) - : request (*request) - , callback (nullptr) - , callback_status(callback) - , status () - , data (data) - , status_ptr (&status) - { - *request = MPI_REQUEST_NULL; - } - - MPI_Request request; - MPI_Detach_callback* callback; - MPI_Detach_callback_status* callback_status; - MPI_Status status; - void* data; - MPI_Status* status_ptr; // Pointer to status or MPI_STATUS_IGNORE. -}; -struct all_request -{ - all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data) - : count (count) - , request (new MPI_Request[count]) - , callback (callback) - , callback_statuses(nullptr) - , statuses (MPI_STATUSES_IGNORE) - , data (data) - { - for (auto i = 0; i < count; i++) - { - request [i] = array_of_requests[i]; - array_of_requests[i] = MPI_REQUEST_NULL; - } - } - all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data) - : count (count) - , request (new MPI_Request[count]) - , callback (nullptr) - , callback_statuses(callback) - , statuses (new MPI_Status[count]) - , data (data) { - for (auto i = 0; i < count; i++) - { - request [i] = array_of_requests[i]; - array_of_requests[i] = MPI_REQUEST_NULL; - } - } - ~all_request() - { - delete[] request; - if (statuses != MPI_STATUSES_IGNORE) - delete[] statuses; - } - - std::int32_t count; - MPI_Request* request; - MPI_Detach_callback* callback; - MPI_Detach_callback_statuses* callback_statuses; - MPI_Status* statuses; - void* data; -}; - -struct state -{ - std::atomic<std::int32_t> running {1}; - std::thread detach_thread; - std::mutex list_mutex; - std::condition_variable list_condition_variable; - std::once_flag once_flag; - std::int32_t initialized {0}; - std::list<single_request*> singleRequestsQueue {}; - std::list<all_request*> allRequestsQueue {}; - std::list<single_request*> singleRequests {}; - std::list<all_request*> allRequests {}; -}; - -static state global_state; -} -} - -#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/detach.hpp b/include/boost_mpi_extensions/detach.hpp deleted file mode 100644 index f881e99c199b72192207000cd95300c621749c4a..0000000000000000000000000000000000000000 --- a/include/boost_mpi_extensions/detach.hpp +++ /dev/null @@ -1,204 +0,0 @@ - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -using namespace std::chrono_literals; - -void run(void) { - while (running || !singleRequests.empty() || !allRequests.empty()) { - do { - std::unique_lock<std::mutex> lck(listMtx); - if (!singleRequestsQueue.empty()) - singleRequests.splice(singleRequests.begin(), singleRequestsQueue); - if (!allRequestsQueue.empty()) - allRequests.splice(allRequests.begin(), allRequestsQueue); - if (singleRequests.empty() && allRequests.empty()) - while (running && singleRequestsQueue.empty() && - allRequestsQueue.empty()) { - listCv.wait(lck); - } - } while (running && singleRequests.empty() && allRequests.empty()); - if (!singleRequests.empty()) { - auto iter = singleRequests.begin(); - auto end = singleRequests.end(); - while (iter != end) { - int flag; - MPI_Test(&(*iter)->req, &flag, (*iter)->statusP); - if (flag) { // - if ((*iter)->callback) - (*iter)->callback((*iter)->data); - else - (*iter)->callback_status((*iter)->data, (*iter)->statusP); - delete (*iter); - iter = singleRequests.erase(iter); - } else { - iter++; - } - } - } - if (!allRequests.empty()){ - auto iter = allRequests.begin(); - auto end = allRequests.end(); - while (iter != end) { - int flag; - MPI_Testall((*iter)->count, (*iter)->req, &flag, (*iter)->statuses); - if (flag) { // - if ((*iter)->callback) - (*iter)->callback((*iter)->data); - else - (*iter)->callback_statuses((*iter)->data, (*iter)->count, (*iter)->statuses); - delete (*iter); - iter = allRequests.erase(iter); - } else { - iter++; - } - } - } - std::this_thread::sleep_for(2ms); - } -} - -// using __attribute__((constructor)) does not work, -// because c++ std library might not be initialized yet -void initDetach() { - initialized = 1; - detachThread = std::thread(run); -} - -// This function is assigned to execute after -// main using __attribute__((destructor)) -void finiDetach() { - { - std::unique_lock<std::mutex> lck(listMtx); - // make sure all requests get finally processed - // access to the *Queue lists is shared, so need to be locked - while (!singleRequestsQueue.empty() || !allRequestsQueue.empty()) - { - listCv.notify_one(); - lck.unlock(); - std::this_thread::sleep_for(2ms); - lck.lock(); - }// after while, lock is always set for changing running and notify - running = 0; - listCv.notify_one(); - }// now wait for the progress thread to finish, i.e. all requests are handled - detachThread.join(); -} - -int MPI_Detach(MPI_Request *request, MPI_Detach_callback *callback, - void *data) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - MPI_Test(request, &flag, MPI_STATUS_IGNORE); - if (flag) { - callback(data); - } else { - std::unique_lock<std::mutex> lck(listMtx); - singleRequestsQueue.push_back(new singleRequest(request, callback, data)); - listCv.notify_one(); - } - return MPI_SUCCESS; -} - -int MPI_Detach_status(MPI_Request *request, - MPI_Detach_callback_status *callback, void *data) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - MPI_Status status; - MPI_Test(request, &flag, &status); - if (flag) { - callback(data, &status); - } else { - std::unique_lock<std::mutex> lck(listMtx); - singleRequestsQueue.push_back(new singleRequest(request, callback, data)); - listCv.notify_one(); - } - return MPI_SUCCESS; -} - -int MPI_Detach_each(int count, MPI_Request* array_of_requests, - MPI_Detach_callback *callback, void *array_of_data[]) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - for (int i = 0; i < count; i++) { - MPI_Test(array_of_requests + i, &flag, MPI_STATUS_IGNORE); - if (flag) { - callback(array_of_data[i]); - } else { - std::unique_lock<std::mutex> lck(listMtx); - singleRequestsQueue.push_back( - new singleRequest(array_of_requests + i, callback, array_of_data[i])); - listCv.notify_one(); - } - } - return MPI_SUCCESS; -} - -int MPI_Detach_each_status(int count, MPI_Request* array_of_requests, - MPI_Detach_callback_status *callback, - void *array_of_data[]) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - MPI_Status status; - for (int i = 0; i < count; i++) { - MPI_Test(array_of_requests + i, &flag, &status); - if (flag) { - callback(array_of_data[i], &status); - } else { - std::unique_lock<std::mutex> lck(listMtx); - singleRequestsQueue.push_back( - new singleRequest(array_of_requests + i, callback, array_of_data[i])); - listCv.notify_one(); - } - } - return MPI_SUCCESS; -} - -int MPI_Detach_all(int count, MPI_Request* array_of_requests, - MPI_Detach_callback *callback, void *data) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - MPI_Testall(count, array_of_requests, &flag, MPI_STATUSES_IGNORE); - if (flag) { - callback(data); - } else { - std::unique_lock<std::mutex> lck(listMtx); - allRequestsQueue.push_back( - new allRequest(count, array_of_requests, callback, data)); - listCv.notify_one(); - } - return MPI_SUCCESS; -} - -int MPI_Detach_all_status(int count, MPI_Request* array_of_requests, - MPI_Detach_callback_statuses *callback, void *data) { - if (!initialized) - std::call_once(onceFlag, initDetach); - int flag; - MPI_Status statuses[count]; - MPI_Testall(count, array_of_requests, &flag, statuses); - if (flag) { - callback(data, count, statuses); - } else { - std::unique_lock<std::mutex> lck(listMtx); - allRequestsQueue.push_back( - new allRequest(count, array_of_requests, callback, data)); - listCv.notify_one(); - } - return MPI_SUCCESS; -} - -int MPI_Finalize(){ - // we need to make sure, all communication is finished - // before calling MPI_Finalize - if (initialized) - finiDetach(); - - return PMPI_Finalize(); -} diff --git a/include/boost_mpi_extensions/mpi_detach.hpp b/include/boost_mpi_extensions/mpi_detach.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b7118bb291f1d5edf0e275b4ce9bbca6627883aa --- /dev/null +++ b/include/boost_mpi_extensions/mpi_detach.hpp @@ -0,0 +1,331 @@ +#ifndef MPI_DETACH_HPP +#define MPI_DETACH_HPP + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <cstdint> +#include <functional> +#include <list> +#include <mutex> +#include <optional> +#include <thread> +#include <utility> +#include <variant> +#include <vector> + +#include <mpi.h> + +namespace mpi::detach +{ +namespace detail +{ +using namespace std::chrono_literals; + +struct request +{ + // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal. + using callback_variant = std::variant<std::function<void(void*)>, std::function<void(void*, MPI_Status*)>>; + + request (const MPI_Request& request, callback_variant callback, void* user_data = nullptr) + : native (request) + , callback (std::move(callback)) + , user_data(user_data) + { + + } + request (const request& that) = default; + request ( request&& temp) = default; + ~request () = default; + request& operator=(const request& that) = default; + request& operator=( request&& temp) = default; + + [[nodiscard]] + bool ignores_status() const + { + return std::holds_alternative<std::function<void(void*)>>(callback); + } + + MPI_Request native; + callback_variant callback; + void* user_data; + MPI_Status status {}; +}; +struct collective_request +{ + // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal. + using callback_variant = std::variant<std::function<void(void*)>, std::function<void(void*, std::int32_t, MPI_Status*)>>; + + collective_request (std::vector<MPI_Request> requests, callback_variant callback, void* user_data = nullptr) + : native (std::move(requests)) + , callback (std::move(callback)) + , user_data(user_data) + { + if (!ignores_status()) + stati.resize(native.size()); // Bug prone: Resizing native post-constructor leads to inconsistent stati size. + } + collective_request (const collective_request& that) = default; + collective_request ( collective_request&& temp) = default; + ~collective_request () = default; + collective_request& operator=(const collective_request& that) = default; + collective_request& operator=( collective_request&& temp) = default; + + [[nodiscard]] + bool ignores_status() const + { + return std::holds_alternative<std::function<void(void*)>>(callback); + } + + std::vector<MPI_Request> native; + callback_variant callback; + void* user_data; + std::vector<MPI_Status> stati {}; +}; +struct state +{ + state () : detach_thread([this] { run(); }), detach_thread_running(true) + { + + } + state (const state& that) = delete; + state ( state&& temp) = delete; + ~state () + { + { + std::unique_lock<std::mutex> lock(container_mutex); + while (!requests.empty() || !collective_requests.empty()) + { + container_condition_variable.notify_one(); + lock.unlock(); + std::this_thread::sleep_for(2ms); + lock.lock (); + } + detach_thread_running = false; + container_condition_variable.notify_one(); + } + detach_thread.join(); + } + state& operator=(const state& that) = delete; + state& operator=( state&& temp) = delete; + + void run() + { + while (detach_thread_running || !active_requests.empty() || !active_collective_requests.empty()) + { + do + { + std::unique_lock<std::mutex> lock(container_mutex); + if (!requests .empty()) active_requests .splice(active_requests .begin(), requests); + if (!collective_requests.empty()) active_collective_requests.splice(active_collective_requests.begin(), collective_requests); + + if (active_requests.empty() && active_collective_requests.empty()) + while (detach_thread_running && requests.empty() && collective_requests.empty()) + container_condition_variable.wait(lock); + } + while (detach_thread_running && active_requests.empty() && active_collective_requests.empty()); + + if (!active_requests .empty()) + { + auto current = active_requests.begin(); + auto end = active_requests.end (); + while (current != end) + { + auto done {0}; + + MPI_Test(¤t->native, &done, current->ignores_status() ? MPI_STATUS_IGNORE : ¤t->status); + if (done) + { + current->ignores_status() + ? std::get<std::function<void(void*)>> (current->callback)(current->user_data) + : std::get<std::function<void(void*, MPI_Status*)>>(current->callback)(current->user_data, ¤t->status); + current = active_requests.erase(current); + } + else + ++current; + } + } + if (!active_collective_requests.empty()) + { + auto current = active_collective_requests.begin(); + auto end = active_collective_requests.end (); + while (current != end) + { + auto done {0}; + + MPI_Testall(static_cast<std::int32_t>(current->native.size()), current->native.data(), &done, current->ignores_status() ? MPI_STATUS_IGNORE : current->stati.data()); + if (done) + { + current->ignores_status() + ? std::get<std::function<void(void*)>> (current->callback)(current->user_data) + : std::get<std::function<void(void*, std::int32_t, MPI_Status*)>>(current->callback)(current->user_data, static_cast<std::int32_t>(current->native.size()), current->stati.data()); + current = active_collective_requests.erase(current); + } + else + ++current; + } + } + + std::this_thread::sleep_for(2ms); + } + } + + std::thread detach_thread; + std::atomic_bool detach_thread_running; + + std::mutex container_mutex; + std::condition_variable container_condition_variable; + + std::list<request> requests {}; + std::list<collective_request> collective_requests {}; + std::list<request> active_requests {}; + std::list<collective_request> active_collective_requests {}; +}; + +inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize. +} + +typedef void MPI_Detach_callback (void*); +typedef void MPI_Detach_callback_status (void*, MPI_Status*); +typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*); + +// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. +inline std::int32_t MPI_Detach ( MPI_Request* request , MPI_Detach_callback* callback, void* data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + + MPI_Test(request, &done, MPI_STATUS_IGNORE); + if (done) + callback(data); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->requests.emplace_back(*request, callback, data); + detail::global_state->container_condition_variable.notify_one(); + *request = MPI_REQUEST_NULL; + } + + return MPI_SUCCESS; +} +// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. +inline std::int32_t MPI_Detach_status ( MPI_Request* request , MPI_Detach_callback_status* callback, void* data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + auto status {MPI_Status()}; + + MPI_Test(request, &done, &status); + if (done) + callback(data, &status); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->requests.emplace_back(*request, callback, data); + detail::global_state->container_condition_variable.notify_one(); + *request = MPI_REQUEST_NULL; + } + + return MPI_SUCCESS; +} +// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. +inline std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void** data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + + for (auto i = 0; i < count; ++i) + { + MPI_Test(&requests[i], &done, MPI_STATUS_IGNORE); + if (done) + callback(data[i]); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->requests.emplace_back(requests[i], callback, data[i]); + detail::global_state->container_condition_variable.notify_one(); + requests[i] = MPI_REQUEST_NULL; + } + } + + return MPI_SUCCESS; +} +// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. +inline std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* requests, MPI_Detach_callback_status* callback, void** data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + auto status {MPI_Status()}; + + for (auto i = 0; i < count; ++i) + { + MPI_Test(&requests[i], &done, &status); + if (done) + callback(data[i], &status); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->requests.emplace_back(requests[i], callback, data[i]); + detail::global_state->container_condition_variable.notify_one(); + requests[i] = MPI_REQUEST_NULL; + } + } + + return MPI_SUCCESS; +} +// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. +inline std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void* data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + + MPI_Testall(count, requests, &done, MPI_STATUSES_IGNORE); + if (done) + callback(data); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->collective_requests.emplace_back(std::vector<MPI_Request>(requests, requests + count), callback, data); + detail::global_state->container_condition_variable.notify_one(); + for (auto i = 0; i < count; ++i) + requests[i] = MPI_REQUEST_NULL; + } + + return MPI_SUCCESS; +} +// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. +inline std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* requests, MPI_Detach_callback_statuses* callback, void* data) +{ + if (!detail::global_state) detail::global_state.emplace(); + + auto done {0}; + auto stati {std::vector<MPI_Status>(count)}; + + MPI_Testall(count, requests, &done, stati.data()); + if (done) + callback(data, count, stati.data()); + else + { + std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); + detail::global_state->collective_requests.emplace_back(std::vector<MPI_Request>(requests, requests + count), callback, data); + detail::global_state->container_condition_variable.notify_one(); + for (auto i = 0; i < count; ++i) + requests[i] = MPI_REQUEST_NULL; + } + + return MPI_SUCCESS; +} +inline std::int32_t MPI_Finalize () +{ + detail::global_state.reset(); + return PMPI_Finalize(); +} +} + +using namespace mpi::detach; // Note: Move to global namespace to match the MPI_ definitions. + +#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp b/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp new file mode 100644 index 0000000000000000000000000000000000000000..54f4699649a10eba12bf9744e5b2ce654b50fed5 --- /dev/null +++ b/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp @@ -0,0 +1,16 @@ +#ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP +#define MPI_DETACH_BOOST_EXTENSIONS_HPP + +#include <boost/mpi.hpp> + +#include "mpi_detach.hpp" + +namespace boost +{ +namespace mpi +{ + +} +} + +#endif \ No newline at end of file diff --git a/tests/detach_boost_extensions_test.cpp b/tests/detach_boost_extensions_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..db933aa6f348f7bc1026a872d35b0ca34ff185b1 --- /dev/null +++ b/tests/detach_boost_extensions_test.cpp @@ -0,0 +1,69 @@ +#include "catch.hpp" + +#include <boost/serialization/vector.hpp> +#include <boost/mpi.hpp> + +#include <boost_mpi_extensions/mpi_detach_boost_extensions.hpp> + +struct data +{ + template<class archive_type> + void serialize(archive_type& archive, const std::uint32_t version) + { + archive& value; + } + + float value; +}; + +TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp") +{ + boost::mpi::environment environment ; + boost::mpi::communicator communicator; + + std::vector<data> outgoing(1000); + std::vector<data> incoming(1000); + + auto send_request = communicator.isend(0, 42, outgoing); + auto recv_request = communicator.irecv(0, 42, incoming); + auto send_status = send_request.test (); + auto recv_status = recv_request.wait (); + + // communicator.isend(0, 42, outgoing, [&] (void* user_data) + // { + // + // }, nullptr); + // communicator.isend(0, 42, outgoing, [&] (void* user_data, MPI_Status* status) + // { + // + // }, nullptr); + // Callbacks, note these can be chained to implement complex scenarios, + // but leads to the problem called "Callback Hell" in JavaScript for complex scenarios. + // http://callbackhell.com/ + // + // Example: + // -> Send/recv current particle counts + // -> send/recv load-balanced particles + // -> send/recv out-of-bounds particles. + // is 6 MPI_Detach_all callbacks within one another, damaging readability/maintainability. + // + // See alternative below. + // Identical implementation for irecv, iprobe. Boost::MPI does not support more async operations so that's all. + // Note that in modern C++ the void* user_data parameter is redundant since lambdas can capture context. + // We only use MPI_Detach and MPI_Detach_status in the wrapper. + + // communicator(isend, 0, 42, outgoing) + // .then (irecv, 0, 42, outgoing) + // .then (isend, 0, 42, outgoing) + // .then (irecv, 0, 42, outgoing) + // An alternative to callback chains: + // C++17 contains a std::future for (return) values obtained from an async operation. + // This encapsulates an object which may not be yet instantiated, but will be in the future, upon the completion of the async op. + // The user may query, wait or extract a value from the future. + // + // There are three standard ways to interact with std::futures: std::async, std::packaged_task and std::promise. + // + // Boost::ASIO integration through io_service. + + mpi::detach::MPI_Finalize(); +} \ No newline at end of file diff --git a/tests/detach_test.cpp b/tests/detach_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a09ad3ac79e1757f9071366cda7840726fa45a91 --- /dev/null +++ b/tests/detach_test.cpp @@ -0,0 +1,117 @@ +#include "catch.hpp" + +#include <array> +#include <iostream> + +#include <boost_mpi_extensions/mpi_detach.hpp> + +void detach_callback (void* data) +{ + std::cout << "detach_callback: " << static_cast<char*>(data) << std::endl; +} +void detach_status_callback(void* data, MPI_Status* status) +{ + std::cout << "detach_stati_callback: " << static_cast<char*>(data) << " Tag: " << status->MPI_TAG << "\n"; +} +void detach_stati_callback (void* data, std::int32_t count, MPI_Status* status) +{ + std::cout << "detach_stati_callback: " << static_cast<char*>(data) << " Tags: "; + for (auto i = 0; i < count; i++) + std::cout << status[i].MPI_TAG << " "; + std::cout << "\n"; +} + +TEST_CASE("MPI_Detach", "MPI_Detach") +{ + auto argc = 1; + auto argv = std::vector<char*>{"detach_test"}; + auto argvp = argv.data(); + MPI_Init(&argc, &argvp); + + { + auto source = 0; + auto target = 0; + auto request = MPI_Request(); + MPI_Isend (&source , 1, MPI_INT, 0, 23, MPI_COMM_SELF, &request); + MPI_Detach(&request, detach_callback, "Sent data with MPI_Isend."); + MPI_Recv (&target , 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + { + auto source = 0; + auto target = 0; + auto request = MPI_Request(); + MPI_Isend (&source , 1, MPI_INT, 0, 23, MPI_COMM_SELF, &request); + MPI_Detach_status(&request, detach_status_callback, "Sent data with MPI_Isend."); + MPI_Recv (&target , 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + { + auto sources = std::array<int , 10> {}; + auto targets = std::array<int , 10> {}; + auto requests = std::array<MPI_Request, 10> {}; + for (auto i = 0; i < 10; ++i) + MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]); + MPI_Detach_all(10, requests.data(), detach_callback, "Sent 10 data with MPI_Isend."); + for (auto i = 0; i < 10; ++i) + MPI_Recv(&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + { + auto sources = std::array<int , 10> {}; + auto targets = std::array<int , 10> {}; + auto requests = std::array<MPI_Request, 10> {}; + for (auto i = 0; i < 10; ++i) + MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]); + MPI_Detach_all_status(10, requests.data(), detach_stati_callback, "Sent 10 data with MPI_Isend."); + for (auto i = 0; i < 10; ++i) + MPI_Recv (&targets[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + { + auto sources = std::array<int , 10> {}; + auto targets = std::array<int , 10> {}; + auto requests = std::array<MPI_Request, 10> {}; + const char* data[] = + { + "Sent data 1 with MPI_Isend.", + "Sent data 2 with MPI_Isend.", + "Sent data 3 with MPI_Isend.", + "Sent data 4 with MPI_Isend.", + "Sent data 5 with MPI_Isend.", + "Sent data 6 with MPI_Isend.", + "Sent data 7 with MPI_Isend.", + "Sent data 8 with MPI_Isend.", + "Sent data 9 with MPI_Isend.", + "Sent data 10 with MPI_Isend." + }; + + for (auto i = 0; i < 10; ++i) + MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]); + MPI_Detach_each(10, requests.data(), detach_callback, reinterpret_cast<void**>(&data)); + for (auto i = 0; i < 10; ++i) + MPI_Recv (&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + { + auto sources = std::array<int , 10> {}; + auto targets = std::array<int , 10> {}; + auto requests = std::array<MPI_Request, 10> {}; + const char* data[] = + { + "Sent data 1 with MPI_Isend.", + "Sent data 2 with MPI_Isend.", + "Sent data 3 with MPI_Isend.", + "Sent data 4 with MPI_Isend.", + "Sent data 5 with MPI_Isend.", + "Sent data 6 with MPI_Isend.", + "Sent data 7 with MPI_Isend.", + "Sent data 8 with MPI_Isend.", + "Sent data 9 with MPI_Isend.", + "Sent data 10 with MPI_Isend." + }; + + for (auto i = 0; i < 10; ++i) + MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]); + MPI_Detach_each_status(10, requests.data(), detach_status_callback, reinterpret_cast<void**>(&data)); + for (auto i = 0; i < 10; ++i) + MPI_Recv (&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE); + } + + mpi::detach::MPI_Finalize(); +}