diff --git a/include/boost_mpi_extensions/mpi_detach.hpp b/include/boost_mpi_extensions/detach.hpp similarity index 82% rename from include/boost_mpi_extensions/mpi_detach.hpp rename to include/boost_mpi_extensions/detach.hpp index ec99cf4734d382cad4a6315fc3bf70b73a282a1a..bbfab1ec86623551bb6d23a53864c9a4b10abe83 100644 --- a/include/boost_mpi_extensions/mpi_detach.hpp +++ b/include/boost_mpi_extensions/detach.hpp @@ -1,6 +1,8 @@ #ifndef MPI_DETACH_HPP #define MPI_DETACH_HPP +#define BOOST_THREAD_VERSION 5 + #include <atomic> #include <chrono> #include <condition_variable> @@ -16,6 +18,11 @@ #include <mpi.h> +#include <boost/asio.hpp> +#include <boost/thread.hpp> +#include <boost/beast/core/span.hpp> +#include <boost/mpi.hpp> + namespace mpi::detach { namespace detail @@ -182,10 +189,52 @@ struct state }; inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize. + +class detach_service +{ +public: + explicit detach_service (const std::size_t thread_count = 1) + : work_guard_(boost::asio::make_work_guard(io_context_)) + { + for (auto i = 0; i < thread_count; ++i) + threads_.create_thread([&] () { io_context_.run(); }); + } + detach_service (const detach_service& that) = delete ; + detach_service ( detach_service&& temp) = delete ; + ~detach_service () = default; + detach_service& operator=(const detach_service& that) = delete ; + detach_service& operator=( detach_service&& temp) = delete ; + + void post_request () + { + + } + void post_collective_request() + { + + } + +protected: + boost::thread_group threads_ ; + boost::asio::io_context io_context_; + boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_; + + std::mutex container_mutex_; + std::condition_variable container_condition_variable_; + + std::list<request> requests_ {}; + std::list<collective_request> collective_requests_ {}; + std::list<request> active_requests_ {}; + std::list<collective_request> active_collective_requests_ {}; +}; } +typedef void MPI_Detach_callback (void*); +typedef void MPI_Detach_callback_status (void*, MPI_Status*); +typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*); + // Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. -inline std::int32_t MPI_Detach ( MPI_Request* request , std::function<void(void*)> callback, void* data) +inline std::int32_t MPI_Detach ( MPI_Request* request , MPI_Detach_callback* callback, void* data) { if (!detail::global_state) detail::global_state.emplace(); @@ -205,7 +254,7 @@ inline std::int32_t MPI_Detach ( MPI_Request* requ return MPI_SUCCESS; } // Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. -inline std::int32_t MPI_Detach_status ( MPI_Request* request , std::function<void(void*, MPI_Status*)> callback, void* data) +inline std::int32_t MPI_Detach_status ( MPI_Request* request , MPI_Detach_callback_status* callback, void* data) { if (!detail::global_state) detail::global_state.emplace(); @@ -226,7 +275,7 @@ inline std::int32_t MPI_Detach_status ( MPI_Request* requ return MPI_SUCCESS; } // Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. -inline std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* requests, std::function<void(void*)> callback, void** data) +inline std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void** data) { if (!detail::global_state) detail::global_state.emplace(); @@ -249,7 +298,7 @@ inline std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* requ return MPI_SUCCESS; } // Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. -inline std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* requests, std::function<void(void*, MPI_Status*)> callback, void** data) +inline std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* requests, MPI_Detach_callback_status* callback, void** data) { if (!detail::global_state) detail::global_state.emplace(); @@ -273,7 +322,7 @@ inline std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* requ return MPI_SUCCESS; } // Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. -inline std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* requests, std::function<void(void*)> callback, void* data) +inline std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* requests, MPI_Detach_callback* callback, void* data) { if (!detail::global_state) detail::global_state.emplace(); @@ -294,7 +343,7 @@ inline std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* requ return MPI_SUCCESS; } // Note: If the test does not succeed immediately, takes the ownership of the requests and invalidates them. -inline std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* requests, std::function<void(void*, std::int32_t, MPI_Status*)> callback, void* data) +inline std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* requests, MPI_Detach_callback_statuses* callback, void* data) { if (!detail::global_state) detail::global_state.emplace(); @@ -321,7 +370,6 @@ inline std::int32_t MPI_Finalize () return PMPI_Finalize(); } } - using namespace mpi::detach; // Note: Move to global namespace to match the MPI_ definitions. #endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/detach_extensions.hpp b/include/boost_mpi_extensions/detach_extensions.hpp new file mode 100644 index 0000000000000000000000000000000000000000..4de623d46c673a404e88d462d0c942ecaa61c0eb --- /dev/null +++ b/include/boost_mpi_extensions/detach_extensions.hpp @@ -0,0 +1,754 @@ +#ifndef BOOST_MPI_DETACH_EXTENSIONS_HPP +#define BOOST_MPI_DETACH_EXTENSIONS_HPP + +#define BOOST_THREAD_VERSION 5 + +#include <boost/asio.hpp> +#include <boost/thread.hpp> +#include <boost/beast/core/span.hpp> +#include <boost/mpi.hpp> + +#include "mpi-detach.h" + +namespace boost::mpi +{ +using beast::span; // An independent std::span equivalent is not available in boost yet, hence we utilize beast's implementation. + +// Callback interface with unified call syntax for request. TODO: Implement for non-trivials. +inline void detach (request& request , const std::function<void()>& callback) +{ + struct state_type + { + std::function<void()> callback; + }; + + if (request.trivial()) + MPIX_Detach(&request.trivial().get(), [ ] (void* user_data, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(); + delete state; + }, new state_type {callback}); // MPI_Detach may complete past the scope of the callback, hence we need to re-instantiate the callback and couple it to the lifetime of MPI_Detach. +} +template <typename type> +void detach (request& request , const std::function<void(type&)>& callback, const type& data) +{ + struct state_type + { + std::function<void(type&)> callback; + type data ; + }; + + if (request.trivial()) + MPIX_Detach(&request.trivial().get(), [ ] (void* user_data, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data); + delete state; + }, new state_type {callback, data}); +} + +inline void detach_status (request& request , const std::function<void( const MPI_Status&)>& callback) +{ + struct state_type + { + std::function<void(const MPI_Status&)> callback; + }; + + if (request.trivial()) + MPIX_Detach_status(&request.trivial().get(), [ ] (void* user_data, MPI_Request* request, MPI_Status* status) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(*status); + delete state; + }, new state_type {callback}); +} +template <typename type> +void detach_status (request& request , const std::function<void(type&, const MPI_Status&)>& callback, const type& data) +{ + struct state_type + { + std::function<void(type&, const MPI_Status&)> callback; + type data ; + }; + + if (request.trivial()) + MPIX_Detach_status(&request.trivial().get(), [ ] (void* user_data, MPI_Request* request, MPI_Status* status) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data, *status); + delete state; + }, new state_type {callback, data}); +} + +inline void detach_each (span<request&>& requests, const std::function<void()>& callback) +{ + struct state_type + { + std::function<void()> callback; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + std::vector<state_type*> user_data (requests.size(), nullptr); + std::vector<void*> raw_user_data (requests.size(), nullptr); + for (auto i = 0; i < requests.size(); ++i) + { + if (requests.data()[i].trivial()) + { + native_requests[i] = requests.data()[i].trivial().get(); + user_data [i] = new state_type {callback}; + raw_user_data [i] = user_data[i]; + } + } + + MPIX_Detach_each(native_requests.size(), native_requests.data(), [ ] (void* user_data, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(); + delete state; + }, raw_user_data.data()); +} +template <typename type> +void detach_each (span<request&>& requests, const std::function<void(type&)>& callback, const span<type>& data) +{ + struct state_type + { + std::function<void(type&)> callback; + type data ; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + std::vector<state_type*> user_data (requests.size(), nullptr); + std::vector<void*> raw_user_data (requests.size(), nullptr); + for (auto i = 0; i < requests.size(); ++i) + { + if (requests.data()[i].trivial()) + { + native_requests[i] = requests.data()[i].trivial().get(); + user_data [i] = new state_type {callback, data.data_[i]}; + raw_user_data [i] = user_data[i]; + } + } + + MPIX_Detach_each(native_requests.size(), native_requests.data(), [ ] (void* user_data, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data); + delete state; + }, raw_user_data.data()); +} + +inline void detach_each_status(span<request&>& requests, const std::function<void( const MPI_Status&)>& callback) +{ + struct state_type + { + std::function<void(const MPI_Status&)> callback; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + std::vector<state_type*> user_data (requests.size(), nullptr); + std::vector<void*> raw_user_data (requests.size(), nullptr); + for (auto i = 0; i < requests.size(); ++i) + { + if (requests.data()[i].trivial()) + { + native_requests[i] = requests.data()[i].trivial().get(); + user_data [i] = new state_type {callback}; + raw_user_data [i] = user_data[i]; + } + } + + MPIX_Detach_each_status(native_requests.size(), native_requests.data(), [ ] (void* user_data, MPI_Request* request, MPI_Status* status) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(*status); + delete state; + }, raw_user_data.data()); +} +template <typename type> +void detach_each_status(span<request&>& requests, const std::function<void(type&, const MPI_Status&)>& callback, const span<type>& data) +{ + struct state_type + { + std::function<void(type&, const MPI_Status&)> callback; + type data ; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + std::vector<state_type*> user_data (requests.size(), nullptr); + std::vector<void*> raw_user_data (requests.size(), nullptr); + for (auto i = 0; i < requests.size(); ++i) + { + if (requests.data()[i].trivial()) + { + native_requests[i] = requests.data()[i].trivial().get(); + user_data [i] = new state_type {callback, data.data_[i]}; + raw_user_data [i] = user_data[i]; + } + } + + MPIX_Detach_each_status(native_requests.size(), native_requests.data(), [ ] (void* user_data, MPI_Request* request, MPI_Status* status) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data, *status); + delete state; + }, raw_user_data.data()); +} + +inline void detach_all (span<request&>& requests, const std::function<void()>& callback) +{ + struct state_type + { + std::function<void()> callback; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + for (auto i = 0; i < requests.size(); ++i) + if (requests.data()[i].trivial()) + native_requests[i] = requests.data()[i].trivial().get(); + + MPIX_Detach_all(native_requests.size(), native_requests.data(), [ ] (void* user_data, std::int32_t count, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(); + delete state; + }, new state_type {callback}); +} +template <typename type> +void detach_all (span<request&>& requests, const std::function<void(type&)>& callback, const type& data) +{ + struct state_type + { + std::function<void(type&)> callback; + type data ; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + for (auto i = 0; i < requests.size(); ++i) + if (requests.data()[i].trivial()) + native_requests[i] = requests.data()[i].trivial().get(); + + MPIX_Detach_all(native_requests.size(), native_requests.data(), [ ] (void* user_data, std::int32_t count, MPI_Request* request) + { + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data); + delete state; + }, new state_type {callback, data}); +} + +inline void detach_all_status (span<request&>& requests, const std::function<void( const std::vector<MPI_Status>&)>& callback) +{ + struct state_type + { + std::function<void(const std::vector<MPI_Status>&)> callback; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + for (auto i = 0; i < requests.size(); ++i) + if (requests.data()[i].trivial()) + native_requests[i] = requests.data()[i].trivial().get(); + + MPIX_Detach_all_status(native_requests.size(), native_requests.data(), [ ] (void* user_data, std::int32_t count, MPI_Request* request, MPI_Status* status) + { + std::vector<MPI_Status> stati(count); + for (auto i = 0; i < count; ++i) + stati[i] = status[i]; + + auto* state = static_cast<state_type*>(user_data); + state->callback(stati); + delete state; + }, new state_type {callback}); +} +template <typename type> +void detach_all_status (span<request&>& requests, const std::function<void(type&, const std::vector<MPI_Status>&)>& callback, const type& data) +{ + struct state_type + { + std::function<void(type&, const std::vector<MPI_Status>&)> callback; + type data ; + }; + + std::vector<MPI_Request> native_requests(requests.size(), MPI_REQUEST_NULL); + for (auto i = 0; i < requests.size(); ++i) + if (requests.data()[i].trivial()) + native_requests[i] = requests.data()[i].trivial().get(); + + MPIX_Detach_all_status(native_requests.size(), native_requests.data(), [ ] (void* user_data, std::int32_t count, MPI_Request* request, MPI_Status* status) + { + std::vector<MPI_Status> stati(count); + for (auto i = 0; i < count; ++i) + stati[i] = status[i]; + + auto* state = static_cast<state_type*>(user_data); + state->callback(state->data, stati); + delete state; + }, new state_type {callback, data}); +} + +// Extension function imitators for request. +inline void operator/(request& request, const std::function<void()>& callback) +{ + detach (request, callback); +} +inline void operator/(request& request, const std::function<void(const MPI_Status&)>& callback) +{ + detach_status(request, callback); +} + +// Convenience for isend/irecv with unified call syntax for communicator. +inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void()>& callback) +{ + auto request = communicator.isend(destination, tag); + detach(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void()>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void()>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void()>& callback) +{ + auto request = communicator.isend(destination, tag, value, count); + detach(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void()>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach(request, callback); +} + +template <typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value, count); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach<user_data_type>(request, callback, user_data); +} + +inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.isend(destination, tag); + detach_status(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach_status(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach_status(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.isend(destination, tag, value, count); + detach_status(request, callback); +} +template <typename type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.isend(destination, tag, value); + detach_status(request, callback); +} + +template <typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value, count); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.isend(destination, tag, value); + detach_status<user_data_type>(request, callback, user_data); +} + +inline void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, const std::function<void()>& callback) +{ + auto request = communicator.irecv(source, tag); + detach(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type& value, const std::function<void()>& callback) +{ + auto request = communicator.irecv(source, tag, value); + detach(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type* value, std::int32_t count, const std::function<void()>& callback) +{ + auto request = communicator.irecv(source, tag, value, count); + detach(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, std::vector<type>& value, const std::function<void()>& callback) +{ + auto request = communicator.irecv(source, tag, value); + detach(request, callback); +} + +template <typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type& value, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type* value, std::int32_t count, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value, count); + detach<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, std::vector<type>& value, const std::function<void(user_data_type&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value); + detach<user_data_type>(request, callback, user_data); +} + +inline void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.irecv(source, tag); + detach_status(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type& value, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.irecv(source, tag, value); + detach_status(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.irecv(source, tag, value, count); + detach_status(request, callback); +} +template <typename type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback) +{ + auto request = communicator.irecv(source, tag, value); + detach_status(request, callback); +} + +template <typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type& value, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, type* value, std::int32_t count, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value, count); + detach_status<user_data_type>(request, callback, user_data); +} +template <typename type, typename user_data_type> +void irecv(const communicator& communicator, std::int32_t source , std::int32_t tag, std::vector<type>& value, const std::function<void(user_data_type&, const MPI_Status&)>& callback, const user_data_type& user_data) +{ + auto request = communicator.irecv(source, tag, value); + detach_status<user_data_type>(request, callback, user_data); +} + +// Future interface with unified call syntax for request. +inline future<void> detach (request& request ) +{ + auto* promise = new boost::promise<void>(); + auto future = promise->get_future(); + detach(request, [promise] () + { + promise->set_value(); + delete promise; + }); + return std::move(future); +} +template <typename type> +future<type> detach (request& request , const type& data) +{ + auto* promise = new boost::promise<type>(); + auto future = promise->get_future(); + detach<type>(request, [promise] (type& data) + { + promise->set_value(data); + delete promise; + }, data); + return std::move(future); +} + +inline future<MPI_Status> detach_status (request& request ) +{ + auto* promise = new boost::promise<MPI_Status>(); + auto future = promise->get_future(); + detach_status(request, [promise] (const MPI_Status& status) + { + promise->set_value(status); + delete promise; + }); + return std::move(future); +} +template <typename type> +future<std::pair<type, MPI_Status>> detach_status (request& request , const type& data) +{ + auto* promise = new boost::promise<std::pair<type, MPI_Status>>(); + auto future = promise->get_future(); + detach_status<type>(request, [promise] (type& data, const MPI_Status& status) + { + promise->set_value(std::make_pair(data, status)); + delete promise; + }, data); + return std::move(future); +} + +inline future<void> detach_each (span<request&>& requests) +{ + struct state_type + { + std::int32_t counter; + boost::promise<void> promise; + }; + + auto* state = new state_type {requests.size()}; + auto future = state->promise.get_future(); + detach_each(requests, [state] () + { + state->promise.set_value(); + if (--state->counter == 0) + delete state; + }); + return std::move(future); +} +template <typename type> +future<type> detach_each (span<request&>& requests, const span<type>& data) +{ + struct state_type + { + std::int32_t counter; + boost::promise<type> promise; + }; + + auto* state = new state_type {requests.size()}; + auto future = state->promise.get_future(); + detach_each<type>(requests, [state] (type& data) + { + state->promise.set_value(data); + if (--state->counter == 0) + delete state; + }, data); + return std::move(future); +} + +inline future<MPI_Status> detach_each_status(span<request&>& requests) +{ + struct state_type + { + std::int32_t counter; + boost::promise<MPI_Status> promise; + }; + + auto* state = new state_type {requests.size()}; + auto future = state->promise.get_future(); + detach_each_status(requests, [state] (const MPI_Status& status) + { + state->promise.set_value(status); + if (--state->counter == 0) + delete state; + }); + return std::move(future); +} +template <typename type> +future<std::pair<type, MPI_Status>> detach_each_status(span<request&>& requests, const span<type>& data) +{ + struct state_type + { + std::int32_t counter; + boost::promise<std::pair<type, MPI_Status>> promise; + }; + + auto* state = new state_type {requests.size()}; + auto future = state->promise.get_future(); + detach_each_status<type>(requests, [state] (type& data, const MPI_Status& status) + { + state->promise.set_value(std::make_pair(data, status)); + if (--state->counter == 0) + delete state; + }, data); + return std::move(future); +} + +inline future<void> detach_all (span<request&>& requests) +{ + auto* promise = new boost::promise<void>(); + auto future = promise->get_future(); + detach_all(requests, [promise] () + { + promise->set_value(); + delete promise; + }); + return std::move(future); +} +template <typename type> +future<type> detach_all (span<request&>& requests, const type& data) +{ + auto* promise = new boost::promise<type>(); + auto future = promise->get_future(); + detach_all<type>(requests, [promise] (type& data) + { + promise->set_value(data); + delete promise; + }, data); + return std::move(future); +} + +inline future<std::vector<MPI_Status>> detach_all_status (span<request&>& requests) +{ + auto* promise = new boost::promise<std::vector<MPI_Status>>(); + auto future = promise->get_future(); + detach_all_status(requests, [promise] (const std::vector<MPI_Status>& stati) + { + promise->set_value(stati); + delete promise; + }); + return std::move(future); +} +template <typename type> +future<std::pair<type, std::vector<MPI_Status>>> detach_all_status (span<request&>& requests, const type& data) +{ + auto* promise = new boost::promise<std::pair<type, std::vector<MPI_Status>>>(); + auto future = promise->get_future(); + detach_all_status<type>(requests, [promise] (type& data, const std::vector<MPI_Status>& stati) + { + promise->set_value(std::make_pair(data, stati)); + delete promise; + }, data); + return std::move(future); +} + +// Extension function imitators for request. +enum class future_type { }; +inline future<void> operator| (request& request, future_type future) +{ + return detach(request); +} +template <typename type> +future<type> operator| (request& request, const type& data ) +{ + return detach<type>(request, data); +} + +inline future<MPI_Status> operator||(request& request, future_type future) +{ + return detach_status(request); +} +template <typename type> +future<std::pair<type, MPI_Status>> operator||(request& request, const type& data ) +{ + return detach_status<type>(request, data); +} + +// ASIO detach service. +using namespace std::chrono_literals; + +class detach_service +{ +public: + template <typename representation, typename period> + explicit detach_service (const std::size_t thread_count = 1, const std::chrono::duration<representation, period>& interval = 16ms) + : work_guard_(make_work_guard(context_)) + , timer_ (context_, interval) + { + callback_ = [&, interval] (const boost::system::error_code& code) + { + MPIX_Progress(nullptr); + + // Periodicity. + timer_.expires_at(timer_.expiry() + interval); + timer_.async_wait(callback_); + }; + + timer_.async_wait(callback_); + + for (auto i = 0; i < thread_count; ++i) + threads_.create_thread([&] () { context_.run(); }); + } + detach_service (const detach_service& that) = delete ; + detach_service ( detach_service&& temp) = delete ; + ~detach_service () = default; + detach_service& operator=(const detach_service& that) = delete ; + detach_service& operator=( detach_service&& temp) = delete ; + +protected: + boost::thread_group threads_ ; + asio::io_context context_ ; + asio::executor_work_guard<asio::io_context::executor_type> work_guard_; + asio::steady_timer timer_ ; + std::function<void(const boost::system::error_code&)> callback_ ; +}; +} + +#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/detach_extensions_future.hpp b/include/boost_mpi_extensions/detach_extensions_future.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ba0744bd875c147ab5b2fb22bdb2a6d944cb2124 --- /dev/null +++ b/include/boost_mpi_extensions/detach_extensions_future.hpp @@ -0,0 +1,9 @@ +#ifndef BOOST_MPI_DETACH_EXTENSIONS_FUTURE_HPP +#define BOOST_MPI_DETACH_EXTENSIONS_FUTURE_HPP + +namespace boost::mpi +{ + +} + +#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/detach_service.hpp b/include/boost_mpi_extensions/detach_service.hpp new file mode 100644 index 0000000000000000000000000000000000000000..51cf3e7a4d820f0352cdd0270d6304f650edc747 --- /dev/null +++ b/include/boost_mpi_extensions/detach_service.hpp @@ -0,0 +1,9 @@ +#ifndef BOOST_MPI_DETACH_SERVICE_HPP +#define BOOST_MPI_DETACH_SERVICE_HPP + +namespace boost::mpi +{ + +} + +#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp b/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp deleted file mode 100644 index 91ac259dfc084b162b7e3799a3847cdb1d105e7c..0000000000000000000000000000000000000000 --- a/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp +++ /dev/null @@ -1,201 +0,0 @@ -#ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP -#define MPI_DETACH_BOOST_EXTENSIONS_HPP - -#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP -#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION -#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK -#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD - -#include <boost/asio.hpp> -#include <boost/thread.hpp> -#include <boost/mpi.hpp> - -#include "mpi_detach.hpp" -#include "mpi_detach_future.hpp" - -namespace boost -{ -namespace mpi -{ -// Callback interface. -// TODO: Implement for non-trivials. -inline void detach (request& request , const std::function<void()>& callback) -{ - if (request.trivial()) - MPI_Detach(&request.trivial().get(), [&callback] (void* user_data) { callback(); }, nullptr); -} -inline void detach_status (request& request , const std::function<void(const MPI_Status&)>& callback) -{ - if (request.trivial()) - MPI_Detach_status(&request.trivial().get(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, nullptr); -} -inline void detach_each (const std::vector<request&>& requests, const std::function<void()>& callback) -{ - std::vector<MPI_Request> native_requests(requests.size()); - std::vector<void*> user_data (requests.size(), nullptr); - for (auto i = 0; i < requests.size(); ++i) - native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL; - MPI_Detach_each(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, user_data.data()); -} -inline void detach_each_status(const std::vector<request&>& requests, const std::function<void(const MPI_Status&)>& callback) -{ - std::vector<MPI_Request> native_requests(requests.size()); - std::vector<void*> user_data (requests.size(), nullptr); - for (auto i = 0; i < requests.size(); ++i) - native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL; - MPI_Detach_each_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, user_data.data()); -} -inline void detach_all (const std::vector<request&>& requests, const std::function<void()>& callback) -{ - std::vector<MPI_Request> native_requests(requests.size()); - for (auto i = 0; i < requests.size(); ++i) - native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL; - MPI_Detach_all(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, nullptr); -} -inline void detach_all_status (const std::vector<request&>& requests, const std::function<void(const std::vector<MPI_Status>&)>& callback) -{ - std::vector<MPI_Request> native_requests(requests.size()); - for (auto i = 0; i < requests.size(); ++i) - native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL; - MPI_Detach_all_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, std::int32_t count, MPI_Status* status) - { - std::vector<MPI_Status> stati(count); - for (auto i = 0; i < count; ++ i) - stati[i] = status[i]; - callback(stati); - }, nullptr); -} - -// Convenience for isend and irecv. -// Discuss: User data is hidden, since the caller may instead capture any variable he/she likes in a lambda. We could instead provide it in a strong-typed (non-void*) manner through a template parameter if you like. -// Its just too many parameters already, and user_data concept is a non C++ in general. -inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void()>& callback) -{ - auto request = communicator.isend(destination, tag); - detach(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void()>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void()>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void()>& callback) -{ - auto request = communicator.isend(destination, tag, value, count); - detach(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void()>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach(request, callback); -} - -inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.isend(destination, tag); - detach_status(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach_status(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach_status(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.isend(destination, tag, value, count); - detach_status(request, callback); -} -template <typename type> -void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.isend(destination, tag, value); - detach_status(request, callback); -} - -inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void()>& callback) -{ - auto request = communicator.irecv(destination, tag); - detach(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type& value, const std::function<void()>& callback) -{ - auto request = communicator.irecv(destination, tag, value); - detach(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type* value, std::int32_t count, const std::function<void()>& callback) -{ - auto request = communicator.irecv(destination, tag, value, count); - detach(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, std::vector<type>& value, const std::function<void()>& callback) -{ - auto request = communicator.irecv(destination, tag, value); - detach(request, callback); -} - -inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.irecv(destination, tag); - detach_status(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type& value, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.irecv(destination, tag, value); - detach_status(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.irecv(destination, tag, value, count); - detach_status(request, callback); -} -template <typename type> -void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback) -{ - auto request = communicator.irecv(destination, tag, value); - detach_status(request, callback); -} - -// TODO: Future versions of isend/irecv. -// TODO: ASIO service interface. - -// Discuss: Promise is a freer packaged_task which enables us to transmit intermediate state in the unique_future before the function returns. -// Question is, does MPI_Test have any intermediate state worth passing to the user prior to completion of request? Should we implement it? - -// Discuss: std::async couples the creation and execution of the packaged_task. It could be used to remove the need for global state. -// The idea is to create one thread per detach call. But regular calls to the std::thread constructor is probably horrible. This idea is suboptimal to me, -// and Joachim probably has reasons to use a single thread for the whole thing, but nevertheless worth asking the HPC guys. - -// - Use the C interface directly for the Boost interface. Isend/irecv. Futures. -// - Boost IO service. -// - Writing. -// - Particle advector implementation using raw requests and . -void lol() -{ - std::call_once() -} -} -} - -#endif \ No newline at end of file diff --git a/include/boost_mpi_extensions/mpi_detach_future.hpp b/include/boost_mpi_extensions/mpi_detach_future.hpp deleted file mode 100644 index 3ea836ba71a893283e5f09ea3a0cb50c3621b126..0000000000000000000000000000000000000000 --- a/include/boost_mpi_extensions/mpi_detach_future.hpp +++ /dev/null @@ -1,243 +0,0 @@ -#ifndef MPI_DETACH_FUTURE_HPP -#define MPI_DETACH_FUTURE_HPP - -#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP -#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION -#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK -#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD - -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <cstdint> -#include <list> -#include <mutex> -#include <optional> -#include <thread> -#include <tuple> -#include <utility> -#include <variant> -#include <vector> - -#include <mpi.h> - -#include <boost/thread.hpp> -#include <boost/mpi.hpp> - -namespace mpi::detach::future -{ -namespace detail -{ -using namespace std::chrono_literals; - -struct request -{ - // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal. - using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, MPI_Status*>()>>; - - request (const MPI_Request& request, bool ignores_status = true, void* user_data = nullptr) - : native (request) - , user_data(user_data) - { - if (ignores_status) - task = boost::packaged_task<void*()>([&] ( ) { return request::user_data; }); - else - task = boost::packaged_task<std::tuple<void*, MPI_Status*>()>([&] ( ) { return std::make_tuple(request::user_data, &status); }); - } - request (const request& that) = delete ; - request ( request&& temp) = default; - ~request () = default; - request& operator=(const request& that) = delete ; - request& operator=( request&& temp) = default; - - [[nodiscard]] - bool ignores_status() const - { - return std::holds_alternative<boost::packaged_task<void*()>>(task); - } - - MPI_Request native; - task_variant task; - void* user_data; - MPI_Status status {}; -}; -struct collective_request -{ - // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal. - using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>; - - collective_request (std::vector<MPI_Request> requests, bool ignores_status = true, void* user_data = nullptr) - : native (std::move(requests)) - , user_data(user_data) - { - if (!ignores_status) - stati.resize(native.size()); // Bug prone: Resizing native post-constructor leads to inconsistent stati size. - - if (ignores_status) - task = boost::packaged_task<void*()>([&] ( ) { return collective_request::user_data; }); - else - task = boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>([&] ( ) { return std::make_tuple(collective_request::user_data, stati.size(), stati.data()); }); - } - collective_request (const collective_request& that) = delete ; - collective_request ( collective_request&& temp) = default; - ~collective_request () = default; - collective_request& operator=(const collective_request& that) = delete ; - collective_request& operator=( collective_request&& temp) = default; - - [[nodiscard]] - bool ignores_status() const - { - return std::holds_alternative<boost::packaged_task<void*()>>(task); - } - - std::vector<MPI_Request> native; - task_variant task; - void* user_data; - std::vector<MPI_Status> stati {}; -}; -struct state -{ - state () : detach_thread([this] { run(); }), detach_thread_running(true) - { - - } - state (const state& that) = delete; - state ( state&& temp) = delete; - ~state () - { - { - std::unique_lock<std::mutex> lock(container_mutex); - while (!requests.empty() || !collective_requests.empty()) - { - container_condition_variable.notify_one(); - lock.unlock(); - std::this_thread::sleep_for(2ms); - lock.lock (); - } - detach_thread_running = false; - container_condition_variable.notify_one(); - } - detach_thread.join(); - } - state& operator=(const state& that) = delete; - state& operator=( state&& temp) = delete; - - void run() - { - while (detach_thread_running || !active_requests.empty() || !active_collective_requests.empty()) - { - do - { - std::unique_lock<std::mutex> lock(container_mutex); - if (!requests .empty()) active_requests .splice(active_requests .begin(), requests); - if (!collective_requests.empty()) active_collective_requests.splice(active_collective_requests.begin(), collective_requests); - - if (active_requests.empty() && active_collective_requests.empty()) - while (detach_thread_running && requests.empty() && collective_requests.empty()) - container_condition_variable.wait(lock); - } - while (detach_thread_running && active_requests.empty() && active_collective_requests.empty()); - - if (!active_requests .empty()) - { - auto current = active_requests.begin(); - auto end = active_requests.end (); - while (current != end) - { - auto done {0}; - - MPI_Test(¤t->native, &done, current->ignores_status() ? MPI_STATUS_IGNORE : ¤t->status); - if (done) - { - current->ignores_status() - ? std::get<boost::packaged_task<void*()>> (current->task)() - : std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(current->task)(); - current = active_requests.erase(current); - } - else - ++current; - } - } - if (!active_collective_requests.empty()) - { - auto current = active_collective_requests.begin(); - auto end = active_collective_requests.end (); - while (current != end) - { - auto done {0}; - - MPI_Testall(static_cast<std::int32_t>(current->native.size()), current->native.data(), &done, current->ignores_status() ? MPI_STATUS_IGNORE : current->stati.data()); - if (done) - { - current->ignores_status() - ? std::get<boost::packaged_task<void*()>> (current->task)() - : std::get<boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>(current->task)(); - current = active_collective_requests.erase(current); - } - else - ++current; - } - } - - std::this_thread::sleep_for(2ms); - } - } - - std::thread detach_thread; - std::atomic_bool detach_thread_running; - - std::mutex container_mutex; - std::condition_variable container_condition_variable; - - std::list<request> requests {}; - std::list<collective_request> collective_requests {}; - std::list<request> active_requests {}; - std::list<collective_request> active_collective_requests {}; -}; - -inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize. -} - -// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. -inline boost::unique_future<void*> MPI_Detach_future ( MPI_Request* request , void* data) -{ - if (!detail::global_state) detail::global_state.emplace(); - - auto done {0}; - - MPI_Test(request, &done, MPI_STATUS_IGNORE); - if (done) - return boost::make_future(data); - - std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); - auto& instance = detail::global_state->requests.emplace_back(*request, true, data); - detail::global_state->container_condition_variable.notify_one(); - *request = MPI_REQUEST_NULL; - return std::get<boost::packaged_task<void*()>>(instance.task).get_future(); -} -// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it. -inline boost::unique_future<std::tuple<void*, MPI_Status*>> MPI_Detach_status_future ( MPI_Request* request , void* data) -{ - if (!detail::global_state) detail::global_state.emplace(); - - auto done {0}; - auto status {MPI_Status()}; - - MPI_Test(request, &done, &status); - if (done) - return boost::make_future(std::make_tuple(data, &status)); - - std::unique_lock<std::mutex> lock(detail::global_state->container_mutex); - auto& instance = detail::global_state->requests.emplace_back(*request, false, data); - detail::global_state->container_condition_variable.notify_one(); - *request = MPI_REQUEST_NULL; - return std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(instance.task).get_future(); -} -inline std::int32_t MPI_Finalize () -{ - detail::global_state.reset(); - return PMPI_Finalize(); -} -} - -#endif \ No newline at end of file diff --git a/tests/detach_boost_extensions_test.cpp b/tests/detach_boost_extensions_test.cpp deleted file mode 100644 index 3a08fedd59fe6d45bf9f8a0371629869b6f5d95d..0000000000000000000000000000000000000000 --- a/tests/detach_boost_extensions_test.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include "catch.hpp" - -#include <boost/serialization/vector.hpp> -#include <boost/mpi.hpp> - -#include <boost_mpi_extensions/mpi_detach_boost_extensions.hpp> - -struct data -{ - template<class archive_type> - void serialize(archive_type& archive, const std::uint32_t version) - { - archive& value; - } - - float value; -}; - -TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp") -{ - boost::mpi::environment environment ; - boost::mpi::communicator communicator; - - std::vector<data> outgoing(1000); - std::vector<data> incoming(1000); - - boost::optional<boost::unique_future<void*>> first, second, third, fourth; - first = mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr); - second = first->then([&] (boost::unique_future<void*>& first_result) - { - mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr); - }); - - // communicator.isend(0, 42, outgoing, [&] (void* user_data) - // { - // - // }, nullptr); - // communicator.isend(0, 42, outgoing, [&] (void* user_data, MPI_Status* status) - // { - // - // }, nullptr); - // Callbacks, note these can be chained to implement complex scenarios, - // but leads to the problem called "Callback Hell" in JavaScript for complex scenarios. - // http://callbackhell.com/ - // - // Example: - // -> Send/recv current particle counts - // -> send/recv load-balanced particles - // -> send/recv out-of-bounds particles. - // is 6 MPI_Detach_all callbacks within one another, damaging readability/maintainability. - // - // See alternative below. - // Identical implementation for irecv, iprobe. Boost::MPI does not support more async operations so that's all. - // Note that in modern C++ the void* user_data parameter is redundant since lambdas can capture context. - // We only use MPI_Detach and MPI_Detach_status in the wrapper. - - // communicator(isend, 0, 42, outgoing) - // .then (irecv, 0, 42, outgoing) - // .then (isend, 0, 42, outgoing) - // .then (irecv, 0, 42, outgoing) - // An alternative to callback chains: - // C++17 contains a std::future for (return) values obtained from an async operation. - // This encapsulates an object which may not be yet instantiated, but will be in the future, upon the completion of the async op. - // The user may query, wait or extract a value from the future. - // - // There are three standard ways to interact with std::futures: std::async, std::packaged_task and std::promise. - // - // Boost::ASIO integration through io_service. - - mpi::detach::MPI_Finalize(); -} \ No newline at end of file diff --git a/tests/detach_extensions_future_test.cpp b/tests/detach_extensions_future_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/detach_extensions_test.cpp b/tests/detach_extensions_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/detach_service_test.cpp b/tests/detach_service_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391