Skip to content
Snippets Groups Projects
Commit 11b68183 authored by Ali Can Demiralp's avatar Ali Can Demiralp
Browse files

Making it.

parent fb26c9dd
Branches
No related tags found
No related merge requests found
################################################## Project ################################################## ################################################## Project ##################################################
cmake_minimum_required(VERSION 3.10 FATAL_ERROR) cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
project (boost_mpi_extensions VERSION 1.0 LANGUAGES CXX) project (boost_mpi_extensions VERSION 1.0 LANGUAGES C CXX)
list (APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) list (APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)
set_property (GLOBAL PROPERTY USE_FOLDERS ON) set_property (GLOBAL PROPERTY USE_FOLDERS ON)
set (CMAKE_CXX_STANDARD 17) set (CMAKE_CXX_STANDARD 17)
set (CMAKE_BUILD_TYPE Release)
include (set_max_warning_level) include (set_max_warning_level)
set_max_warning_level () set_max_warning_level ()
...@@ -35,6 +36,9 @@ import_library(Boost_INCLUDE_DIRS Boost_SERIALIZATION_LIBRARY_DEBUG Boost_SERIAL ...@@ -35,6 +36,9 @@ import_library(Boost_INCLUDE_DIRS Boost_SERIALIZATION_LIBRARY_DEBUG Boost_SERIAL
find_package (Catch2 REQUIRED) find_package (Catch2 REQUIRED)
list (APPEND PROJECT_LIBRARIES Catch2::Catch2) list (APPEND PROJECT_LIBRARIES Catch2::Catch2)
find_package (MPI REQUIRED)
list (APPEND PROJECT_LIBRARIES MPI::MPI_C)
################################################## Targets ################################################## ################################################## Targets ##################################################
add_library(${PROJECT_NAME} INTERFACE) add_library(${PROJECT_NAME} INTERFACE)
target_include_directories(${PROJECT_NAME} INTERFACE target_include_directories(${PROJECT_NAME} INTERFACE
...@@ -61,6 +65,7 @@ if(BUILD_TESTS) ...@@ -61,6 +65,7 @@ if(BUILD_TESTS)
set (TEST_MAIN_NAME catch_main) set (TEST_MAIN_NAME catch_main)
set (TEST_MAIN_SOURCES tests/catch/main.cpp) set (TEST_MAIN_SOURCES tests/catch/main.cpp)
add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES}) add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES})
target_link_libraries (${TEST_MAIN_NAME} ${PROJECT_NAME} ${PROJECT_LIBRARIES})
set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch) set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch)
assign_source_group (${TEST_MAIN_SOURCES}) assign_source_group (${TEST_MAIN_SOURCES})
...@@ -68,7 +73,7 @@ if(BUILD_TESTS) ...@@ -68,7 +73,7 @@ if(BUILD_TESTS)
foreach(_SOURCE ${PROJECT_TEST_CPPS}) foreach(_SOURCE ${PROJECT_TEST_CPPS})
get_filename_component(_NAME ${_SOURCE} NAME_WE) get_filename_component(_NAME ${_SOURCE} NAME_WE)
add_executable (${_NAME} ${_SOURCE} $<TARGET_OBJECTS:${TEST_MAIN_NAME}>) add_executable (${_NAME} ${_SOURCE} $<TARGET_OBJECTS:${TEST_MAIN_NAME}>)
target_link_libraries (${_NAME} ${PROJECT_NAME}) target_link_libraries (${_NAME} ${PROJECT_NAME} ${PROJECT_LIBRARIES})
add_test (${_NAME} ${_NAME}) add_test (${_NAME} ${_NAME})
set_property (TARGET ${_NAME} PROPERTY FOLDER tests) set_property (TARGET ${_NAME} PROPERTY FOLDER tests)
assign_source_group (${_SOURCE}) assign_source_group (${_SOURCE})
......
...@@ -9,7 +9,7 @@ if not exist "vcpkg.exe" call bootstrap-vcpkg.bat ...@@ -9,7 +9,7 @@ if not exist "vcpkg.exe" call bootstrap-vcpkg.bat
set VCPKG_COMMAND=vcpkg install --recurse set VCPKG_COMMAND=vcpkg install --recurse
set VCPKG_DEFAULT_TRIPLET=x64-windows set VCPKG_DEFAULT_TRIPLET=x64-windows
rem Add your library ports here. rem Add your library ports here.
%VCPKG_COMMAND% boost[mpi] catch2 %VCPKG_COMMAND% boost[mpi] catch2 mpi
cd .. cd ..
cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake ..
......
...@@ -9,7 +9,7 @@ if [ ! -f "vcpkg" ]; then ./bootstrap-vcpkg.sh fi ...@@ -9,7 +9,7 @@ if [ ! -f "vcpkg" ]; then ./bootstrap-vcpkg.sh fi
VCPKG_COMMAND=vcpkg install --recurse VCPKG_COMMAND=vcpkg install --recurse
VCPKG_DEFAULT_TRIPLET=x64-linux VCPKG_DEFAULT_TRIPLET=x64-linux
# Add your library ports here. # Add your library ports here.
$VCPKG_COMMAND boost[mpi] catch2 $VCPKG_COMMAND boost[mpi] catch2 mpi
cd .. cd ..
cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake .. cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake ..
......
#ifndef BOOST_MPI_EXTENSIONS_HPP
#define BOOST_MPI_EXTENSIONS_HPP
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <list>
#include <mutex>
#include <thread>
#include <mpi.h>
namespace boost::mpi
{
namespace detail
{
typedef void MPI_Detach_callback (void*);
typedef void MPI_Detach_callback_status (void*, MPI_Status*);
typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*);
std::int32_t MPI_Detach ( MPI_Request* request, MPI_Detach_callback* callback, void* data);
std::int32_t MPI_Detach_status ( MPI_Request* request, MPI_Detach_callback_status* callback, void* data);
std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* array_of_data[]);
std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_status* callback, void* array_of_data[]);
std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data);
std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data);
struct single_request
{
single_request(MPI_Request* request, MPI_Detach_callback* callback, void* data)
: request (*request)
, callback (callback)
, callback_status(nullptr)
, status ()
, data (data)
, status_ptr (MPI_STATUS_IGNORE)
{
*request = MPI_REQUEST_NULL;
}
single_request(MPI_Request* request, MPI_Detach_callback_status* callback, void* data)
: request (*request)
, callback (nullptr)
, callback_status(callback)
, status ()
, data (data)
, status_ptr (&status)
{
*request = MPI_REQUEST_NULL;
}
MPI_Request request;
MPI_Detach_callback* callback;
MPI_Detach_callback_status* callback_status;
MPI_Status status;
void* data;
MPI_Status* status_ptr; // Pointer to status or MPI_STATUS_IGNORE.
};
struct all_request
{
all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data)
: count (count)
, request (new MPI_Request[count])
, callback (callback)
, callback_statuses(nullptr)
, statuses (MPI_STATUSES_IGNORE)
, data (data)
{
for (auto i = 0; i < count; i++)
{
request [i] = array_of_requests[i];
array_of_requests[i] = MPI_REQUEST_NULL;
}
}
all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data)
: count (count)
, request (new MPI_Request[count])
, callback (nullptr)
, callback_statuses(callback)
, statuses (new MPI_Status[count])
, data (data) {
for (auto i = 0; i < count; i++)
{
request [i] = array_of_requests[i];
array_of_requests[i] = MPI_REQUEST_NULL;
}
}
~all_request()
{
delete[] request;
if (statuses != MPI_STATUSES_IGNORE)
delete[] statuses;
}
std::int32_t count;
MPI_Request* request;
MPI_Detach_callback* callback;
MPI_Detach_callback_statuses* callback_statuses;
MPI_Status* statuses;
void* data;
};
struct state
{
std::atomic<std::int32_t> running {1};
std::thread detach_thread;
std::mutex list_mutex;
std::condition_variable list_condition_variable;
std::once_flag once_flag;
std::int32_t initialized {0};
std::list<single_request*> singleRequestsQueue {};
std::list<all_request*> allRequestsQueue {};
std::list<single_request*> singleRequests {};
std::list<all_request*> allRequests {};
};
static state global_state;
}
}
#endif
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
using namespace std::chrono_literals;
void run(void) {
while (running || !singleRequests.empty() || !allRequests.empty()) {
do {
std::unique_lock<std::mutex> lck(listMtx);
if (!singleRequestsQueue.empty())
singleRequests.splice(singleRequests.begin(), singleRequestsQueue);
if (!allRequestsQueue.empty())
allRequests.splice(allRequests.begin(), allRequestsQueue);
if (singleRequests.empty() && allRequests.empty())
while (running && singleRequestsQueue.empty() &&
allRequestsQueue.empty()) {
listCv.wait(lck);
}
} while (running && singleRequests.empty() && allRequests.empty());
if (!singleRequests.empty()) {
auto iter = singleRequests.begin();
auto end = singleRequests.end();
while (iter != end) {
int flag;
MPI_Test(&(*iter)->req, &flag, (*iter)->statusP);
if (flag) { //
if ((*iter)->callback)
(*iter)->callback((*iter)->data);
else
(*iter)->callback_status((*iter)->data, (*iter)->statusP);
delete (*iter);
iter = singleRequests.erase(iter);
} else {
iter++;
}
}
}
if (!allRequests.empty()){
auto iter = allRequests.begin();
auto end = allRequests.end();
while (iter != end) {
int flag;
MPI_Testall((*iter)->count, (*iter)->req, &flag, (*iter)->statuses);
if (flag) { //
if ((*iter)->callback)
(*iter)->callback((*iter)->data);
else
(*iter)->callback_statuses((*iter)->data, (*iter)->count, (*iter)->statuses);
delete (*iter);
iter = allRequests.erase(iter);
} else {
iter++;
}
}
}
std::this_thread::sleep_for(2ms);
}
}
// using __attribute__((constructor)) does not work,
// because c++ std library might not be initialized yet
void initDetach() {
initialized = 1;
detachThread = std::thread(run);
}
// This function is assigned to execute after
// main using __attribute__((destructor))
void finiDetach() {
{
std::unique_lock<std::mutex> lck(listMtx);
// make sure all requests get finally processed
// access to the *Queue lists is shared, so need to be locked
while (!singleRequestsQueue.empty() || !allRequestsQueue.empty())
{
listCv.notify_one();
lck.unlock();
std::this_thread::sleep_for(2ms);
lck.lock();
}// after while, lock is always set for changing running and notify
running = 0;
listCv.notify_one();
}// now wait for the progress thread to finish, i.e. all requests are handled
detachThread.join();
}
int MPI_Detach(MPI_Request *request, MPI_Detach_callback *callback,
void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Test(request, &flag, MPI_STATUS_IGNORE);
if (flag) {
callback(data);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(new singleRequest(request, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_status(MPI_Request *request,
MPI_Detach_callback_status *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status status;
MPI_Test(request, &flag, &status);
if (flag) {
callback(data, &status);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(new singleRequest(request, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_each(int count, MPI_Request* array_of_requests,
MPI_Detach_callback *callback, void *array_of_data[]) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
for (int i = 0; i < count; i++) {
MPI_Test(array_of_requests + i, &flag, MPI_STATUS_IGNORE);
if (flag) {
callback(array_of_data[i]);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(
new singleRequest(array_of_requests + i, callback, array_of_data[i]));
listCv.notify_one();
}
}
return MPI_SUCCESS;
}
int MPI_Detach_each_status(int count, MPI_Request* array_of_requests,
MPI_Detach_callback_status *callback,
void *array_of_data[]) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status status;
for (int i = 0; i < count; i++) {
MPI_Test(array_of_requests + i, &flag, &status);
if (flag) {
callback(array_of_data[i], &status);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(
new singleRequest(array_of_requests + i, callback, array_of_data[i]));
listCv.notify_one();
}
}
return MPI_SUCCESS;
}
int MPI_Detach_all(int count, MPI_Request* array_of_requests,
MPI_Detach_callback *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Testall(count, array_of_requests, &flag, MPI_STATUSES_IGNORE);
if (flag) {
callback(data);
} else {
std::unique_lock<std::mutex> lck(listMtx);
allRequestsQueue.push_back(
new allRequest(count, array_of_requests, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_all_status(int count, MPI_Request* array_of_requests,
MPI_Detach_callback_statuses *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status statuses[count];
MPI_Testall(count, array_of_requests, &flag, statuses);
if (flag) {
callback(data, count, statuses);
} else {
std::unique_lock<std::mutex> lck(listMtx);
allRequestsQueue.push_back(
new allRequest(count, array_of_requests, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Finalize(){
// we need to make sure, all communication is finished
// before calling MPI_Finalize
if (initialized)
finiDetach();
return PMPI_Finalize();
}
#ifndef MPI_DETACH_HPP
#define MPI_DETACH_HPP
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <list>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include <variant>
#include <vector>
#include <mpi.h>
namespace mpi::detach
{
namespace detail
{
using namespace std::chrono_literals;
struct request
{
// Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
using callback_variant = std::variant<std::function<void(void*)>, std::function<void(void*, MPI_Status*)>>;
request (const MPI_Request& request, callback_variant callback, void* user_data = nullptr)
: native (request)
, callback (std::move(callback))
, user_data(user_data)
{
}
request (const request& that) = default;
request ( request&& temp) = default;
~request () = default;
request& operator=(const request& that) = default;
request& operator=( request&& temp) = default;
[[nodiscard]]
bool ignores_status() const
{
return std::holds_alternative<std::function<void(void*)>>(callback);
}
MPI_Request native;
callback_variant callback;
void* user_data;
MPI_Status status {};
};
struct collective_request
{
// Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
using callback_variant = std::variant<std::function<void(void*)>, std::function<void(void*, std::int32_t, MPI_Status*)>>;
collective_request (std::vector<MPI_Request> requests, callback_variant callback, void* user_data = nullptr)
: native (std::move(requests))
, callback (std::move(callback))
, user_data(user_data)
{
if (!ignores_status())
stati.resize(native.size()); // Bug prone: Resizing native post-constructor leads to inconsistent stati size.
}
collective_request (const collective_request& that) = default;
collective_request ( collective_request&& temp) = default;
~collective_request () = default;
collective_request& operator=(const collective_request& that) = default;
collective_request& operator=( collective_request&& temp) = default;
[[nodiscard]]
bool ignores_status() const
{
return std::holds_alternative<std::function<void(void*)>>(callback);
}
std::vector<MPI_Request> native;
callback_variant callback;
void* user_data;
std::vector<MPI_Status> stati {};
};
struct state
{
state () : detach_thread([this] { run(); }), detach_thread_running(true)
{
}
state (const state& that) = delete;
state ( state&& temp) = delete;
~state ()
{
{
std::unique_lock<std::mutex> lock(container_mutex);
while (!requests.empty() || !collective_requests.empty())
{
container_condition_variable.notify_one();
lock.unlock();
std::this_thread::sleep_for(2ms);
lock.lock ();
}
detach_thread_running = false;
container_condition_variable.notify_one();
}
detach_thread.join();
}
state& operator=(const state& that) = delete;
state& operator=( state&& temp) = delete;
void run()
{
while (detach_thread_running || !active_requests.empty() || !active_collective_requests.empty())
{
do
{
std::unique_lock<std::mutex> lock(container_mutex);
if (!requests .empty()) active_requests .splice(active_requests .begin(), requests);
if (!collective_requests.empty()) active_collective_requests.splice(active_collective_requests.begin(), collective_requests);
if (active_requests.empty() && active_collective_requests.empty())
while (detach_thread_running && requests.empty() && collective_requests.empty())
container_condition_variable.wait(lock);
}
while (detach_thread_running && active_requests.empty() && active_collective_requests.empty());
if (!active_requests .empty())
{
auto current = active_requests.begin();
auto end = active_requests.end ();
while (current != end)
{
auto done {0};
MPI_Test(&current->native, &done, current->ignores_status() ? MPI_STATUS_IGNORE : &current->status);
if (done)
{
current->ignores_status()
? std::get<std::function<void(void*)>> (current->callback)(current->user_data)
: std::get<std::function<void(void*, MPI_Status*)>>(current->callback)(current->user_data, &current->status);
current = active_requests.erase(current);
}
else
++current;
}
}
if (!active_collective_requests.empty())
{
auto current = active_collective_requests.begin();
auto end = active_collective_requests.end ();
while (current != end)
{
auto done {0};
MPI_Testall(static_cast<std::int32_t>(current->native.size()), current->native.data(), &done, current->ignores_status() ? MPI_STATUS_IGNORE : current->stati.data());
if (done)
{
current->ignores_status()
? std::get<std::function<void(void*)>> (current->callback)(current->user_data)
: std::get<std::function<void(void*, std::int32_t, MPI_Status*)>>(current->callback)(current->user_data, static_cast<std::int32_t>(current->native.size()), current->stati.data());
current = active_collective_requests.erase(current);
}
else
++current;
}
}
std::this_thread::sleep_for(2ms);
}
}
std::thread detach_thread;
std::atomic_bool detach_thread_running;
std::mutex container_mutex;
std::condition_variable container_condition_variable;
std::list<request> requests {};
std::list<collective_request> collective_requests {};
std::list<request> active_requests {};
std::list<collective_request> active_collective_requests {};
};
inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize.
}
typedef void MPI_Detach_callback (void*);
typedef void MPI_Detach_callback_status (void*, MPI_Status*);
typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*);
// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.
inline std::int32_t MPI_Detach ( MPI_Request* request , MPI_Detach_callback* callback, void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
MPI_Test(request, &done, MPI_STATUS_IGNORE);
if (done)
callback(data);
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->requests.emplace_back(*request, callback, data);
detail::global_state->container_condition_variable.notify_one();
*request = MPI_REQUEST_NULL;
}
return MPI_SUCCESS;
}
// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.
inline std::int32_t MPI_Detach_status ( MPI_Request* request , MPI_Detach_callback_status* callback, void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
auto status {MPI_Status()};
MPI_Test(request, &done, &status);
if (done)
callback(data, &status);
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->requests.emplace_back(*request, callback, data);
detail::global_state->container_condition_variable.notify_one();
*request = MPI_REQUEST_NULL;
}
return MPI_SUCCESS;
}
// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them.
inline std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void** data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
for (auto i = 0; i < count; ++i)
{
MPI_Test(&requests[i], &done, MPI_STATUS_IGNORE);
if (done)
callback(data[i]);
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->requests.emplace_back(requests[i], callback, data[i]);
detail::global_state->container_condition_variable.notify_one();
requests[i] = MPI_REQUEST_NULL;
}
}
return MPI_SUCCESS;
}
// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them.
inline std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* requests, MPI_Detach_callback_status* callback, void** data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
auto status {MPI_Status()};
for (auto i = 0; i < count; ++i)
{
MPI_Test(&requests[i], &done, &status);
if (done)
callback(data[i], &status);
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->requests.emplace_back(requests[i], callback, data[i]);
detail::global_state->container_condition_variable.notify_one();
requests[i] = MPI_REQUEST_NULL;
}
}
return MPI_SUCCESS;
}
// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them.
inline std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
MPI_Testall(count, requests, &done, MPI_STATUSES_IGNORE);
if (done)
callback(data);
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->collective_requests.emplace_back(std::vector<MPI_Request>(requests, requests + count), callback, data);
detail::global_state->container_condition_variable.notify_one();
for (auto i = 0; i < count; ++i)
requests[i] = MPI_REQUEST_NULL;
}
return MPI_SUCCESS;
}
// Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them.
inline std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* requests, MPI_Detach_callback_statuses* callback, void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
auto stati {std::vector<MPI_Status>(count)};
MPI_Testall(count, requests, &done, stati.data());
if (done)
callback(data, count, stati.data());
else
{
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
detail::global_state->collective_requests.emplace_back(std::vector<MPI_Request>(requests, requests + count), callback, data);
detail::global_state->container_condition_variable.notify_one();
for (auto i = 0; i < count; ++i)
requests[i] = MPI_REQUEST_NULL;
}
return MPI_SUCCESS;
}
inline std::int32_t MPI_Finalize ()
{
detail::global_state.reset();
return PMPI_Finalize();
}
}
using namespace mpi::detach; // Note: Move to global namespace to match the MPI_ definitions.
#endif
\ No newline at end of file
#ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP
#define MPI_DETACH_BOOST_EXTENSIONS_HPP
#include <boost/mpi.hpp>
#include "mpi_detach.hpp"
namespace boost
{
namespace mpi
{
}
}
#endif
\ No newline at end of file
#include "catch.hpp"
#include <boost/serialization/vector.hpp>
#include <boost/mpi.hpp>
#include <boost_mpi_extensions/mpi_detach_boost_extensions.hpp>
struct data
{
template<class archive_type>
void serialize(archive_type& archive, const std::uint32_t version)
{
archive& value;
}
float value;
};
TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp")
{
boost::mpi::environment environment ;
boost::mpi::communicator communicator;
std::vector<data> outgoing(1000);
std::vector<data> incoming(1000);
auto send_request = communicator.isend(0, 42, outgoing);
auto recv_request = communicator.irecv(0, 42, incoming);
auto send_status = send_request.test ();
auto recv_status = recv_request.wait ();
// communicator.isend(0, 42, outgoing, [&] (void* user_data)
// {
//
// }, nullptr);
// communicator.isend(0, 42, outgoing, [&] (void* user_data, MPI_Status* status)
// {
//
// }, nullptr);
// Callbacks, note these can be chained to implement complex scenarios,
// but leads to the problem called "Callback Hell" in JavaScript for complex scenarios.
// http://callbackhell.com/
//
// Example:
// -> Send/recv current particle counts
// -> send/recv load-balanced particles
// -> send/recv out-of-bounds particles.
// is 6 MPI_Detach_all callbacks within one another, damaging readability/maintainability.
//
// See alternative below.
// Identical implementation for irecv, iprobe. Boost::MPI does not support more async operations so that's all.
// Note that in modern C++ the void* user_data parameter is redundant since lambdas can capture context.
// We only use MPI_Detach and MPI_Detach_status in the wrapper.
// communicator(isend, 0, 42, outgoing)
// .then (irecv, 0, 42, outgoing)
// .then (isend, 0, 42, outgoing)
// .then (irecv, 0, 42, outgoing)
// An alternative to callback chains:
// C++17 contains a std::future for (return) values obtained from an async operation.
// This encapsulates an object which may not be yet instantiated, but will be in the future, upon the completion of the async op.
// The user may query, wait or extract a value from the future.
//
// There are three standard ways to interact with std::futures: std::async, std::packaged_task and std::promise.
//
// Boost::ASIO integration through io_service.
mpi::detach::MPI_Finalize();
}
\ No newline at end of file
#include "catch.hpp"
#include <array>
#include <iostream>
#include <boost_mpi_extensions/mpi_detach.hpp>
void detach_callback (void* data)
{
std::cout << "detach_callback: " << static_cast<char*>(data) << std::endl;
}
void detach_status_callback(void* data, MPI_Status* status)
{
std::cout << "detach_stati_callback: " << static_cast<char*>(data) << " Tag: " << status->MPI_TAG << "\n";
}
void detach_stati_callback (void* data, std::int32_t count, MPI_Status* status)
{
std::cout << "detach_stati_callback: " << static_cast<char*>(data) << " Tags: ";
for (auto i = 0; i < count; i++)
std::cout << status[i].MPI_TAG << " ";
std::cout << "\n";
}
TEST_CASE("MPI_Detach", "MPI_Detach")
{
auto argc = 1;
auto argv = std::vector<char*>{"detach_test"};
auto argvp = argv.data();
MPI_Init(&argc, &argvp);
{
auto source = 0;
auto target = 0;
auto request = MPI_Request();
MPI_Isend (&source , 1, MPI_INT, 0, 23, MPI_COMM_SELF, &request);
MPI_Detach(&request, detach_callback, "Sent data with MPI_Isend.");
MPI_Recv (&target , 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
{
auto source = 0;
auto target = 0;
auto request = MPI_Request();
MPI_Isend (&source , 1, MPI_INT, 0, 23, MPI_COMM_SELF, &request);
MPI_Detach_status(&request, detach_status_callback, "Sent data with MPI_Isend.");
MPI_Recv (&target , 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
{
auto sources = std::array<int , 10> {};
auto targets = std::array<int , 10> {};
auto requests = std::array<MPI_Request, 10> {};
for (auto i = 0; i < 10; ++i)
MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]);
MPI_Detach_all(10, requests.data(), detach_callback, "Sent 10 data with MPI_Isend.");
for (auto i = 0; i < 10; ++i)
MPI_Recv(&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
{
auto sources = std::array<int , 10> {};
auto targets = std::array<int , 10> {};
auto requests = std::array<MPI_Request, 10> {};
for (auto i = 0; i < 10; ++i)
MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]);
MPI_Detach_all_status(10, requests.data(), detach_stati_callback, "Sent 10 data with MPI_Isend.");
for (auto i = 0; i < 10; ++i)
MPI_Recv (&targets[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
{
auto sources = std::array<int , 10> {};
auto targets = std::array<int , 10> {};
auto requests = std::array<MPI_Request, 10> {};
const char* data[] =
{
"Sent data 1 with MPI_Isend.",
"Sent data 2 with MPI_Isend.",
"Sent data 3 with MPI_Isend.",
"Sent data 4 with MPI_Isend.",
"Sent data 5 with MPI_Isend.",
"Sent data 6 with MPI_Isend.",
"Sent data 7 with MPI_Isend.",
"Sent data 8 with MPI_Isend.",
"Sent data 9 with MPI_Isend.",
"Sent data 10 with MPI_Isend."
};
for (auto i = 0; i < 10; ++i)
MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]);
MPI_Detach_each(10, requests.data(), detach_callback, reinterpret_cast<void**>(&data));
for (auto i = 0; i < 10; ++i)
MPI_Recv (&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
{
auto sources = std::array<int , 10> {};
auto targets = std::array<int , 10> {};
auto requests = std::array<MPI_Request, 10> {};
const char* data[] =
{
"Sent data 1 with MPI_Isend.",
"Sent data 2 with MPI_Isend.",
"Sent data 3 with MPI_Isend.",
"Sent data 4 with MPI_Isend.",
"Sent data 5 with MPI_Isend.",
"Sent data 6 with MPI_Isend.",
"Sent data 7 with MPI_Isend.",
"Sent data 8 with MPI_Isend.",
"Sent data 9 with MPI_Isend.",
"Sent data 10 with MPI_Isend."
};
for (auto i = 0; i < 10; ++i)
MPI_Isend(&sources[i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, &requests[i]);
MPI_Detach_each_status(10, requests.data(), detach_status_callback, reinterpret_cast<void**>(&data));
for (auto i = 0; i < 10; ++i)
MPI_Recv (&targets [i], 1, MPI_INT, 0, 23, MPI_COMM_SELF, MPI_STATUS_IGNORE);
}
mpi::detach::MPI_Finalize();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment