Skip to content
Snippets Groups Projects
Commit 8b6dc90a authored by Ali Can Demiralp's avatar Ali Can Demiralp
Browse files

Progress.

parent 41c9a87b
No related branches found
No related tags found
No related merge requests found
#ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP #ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP
#define MPI_DETACH_BOOST_EXTENSIONS_HPP #define MPI_DETACH_BOOST_EXTENSIONS_HPP
#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/mpi.hpp> #include <boost/mpi.hpp>
#include "mpi_detach.hpp" #include "mpi_detach.hpp"
#include "mpi_detach_future.hpp"
namespace boost namespace boost
{ {
namespace mpi namespace mpi
{ {
// Callback interface.
// TODO: Implement for non-trivials.
inline void detach (request& request , const std::function<void()>& callback)
{
if (request.trivial())
MPI_Detach(&request.trivial().get(), [&callback] (void* user_data) { callback(); }, nullptr);
}
inline void detach_status (request& request , const std::function<void(const MPI_Status&)>& callback)
{
if (request.trivial())
MPI_Detach_status(&request.trivial().get(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, nullptr);
}
inline void detach_each (const std::vector<request&>& requests, const std::function<void()>& callback)
{
std::vector<MPI_Request> native_requests(requests.size());
std::vector<void*> user_data (requests.size(), nullptr);
for (auto i = 0; i < requests.size(); ++i)
native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
MPI_Detach_each(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, user_data.data());
}
inline void detach_each_status(const std::vector<request&>& requests, const std::function<void(const MPI_Status&)>& callback)
{
std::vector<MPI_Request> native_requests(requests.size());
std::vector<void*> user_data (requests.size(), nullptr);
for (auto i = 0; i < requests.size(); ++i)
native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
MPI_Detach_each_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, user_data.data());
}
inline void detach_all (const std::vector<request&>& requests, const std::function<void()>& callback)
{
std::vector<MPI_Request> native_requests(requests.size());
for (auto i = 0; i < requests.size(); ++i)
native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
MPI_Detach_all(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, nullptr);
}
inline void detach_all_status (const std::vector<request&>& requests, const std::function<void(const std::vector<MPI_Status>&)>& callback)
{
std::vector<MPI_Request> native_requests(requests.size());
for (auto i = 0; i < requests.size(); ++i)
native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
MPI_Detach_all_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, std::int32_t count, MPI_Status* status)
{
std::vector<MPI_Status> stati(count);
for (auto i = 0; i < count; ++ i)
stati[i] = status[i];
callback(stati);
}, nullptr);
}
// Convenience for isend and irecv.
// Discuss: User data is hidden, since the caller may instead capture any variable he/she likes in a lambda. We could instead provide it in a strong-typed (non-void*) manner through a template parameter if you like.
// Its just too many parameters already, and user_data concept is a non C++ in general.
inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void()>& callback)
{
auto request = communicator.isend(destination, tag);
detach(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void()>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void()>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void()>& callback)
{
auto request = communicator.isend(destination, tag, value, count);
detach(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void()>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach(request, callback);
}
inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.isend(destination, tag);
detach_status(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type& value, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach_status(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach_status(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.isend(destination, tag, value, count);
detach_status(request, callback);
}
template <typename type>
void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.isend(destination, tag, value);
detach_status(request, callback);
}
inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void()>& callback)
{
auto request = communicator.irecv(destination, tag);
detach(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type& value, const std::function<void()>& callback)
{
auto request = communicator.irecv(destination, tag, value);
detach(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type* value, std::int32_t count, const std::function<void()>& callback)
{
auto request = communicator.irecv(destination, tag, value, count);
detach(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, std::vector<type>& value, const std::function<void()>& callback)
{
auto request = communicator.irecv(destination, tag, value);
detach(request, callback);
}
inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.irecv(destination, tag);
detach_status(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type& value, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.irecv(destination, tag, value);
detach_status(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, type* value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.irecv(destination, tag, value, count);
detach_status(request, callback);
}
template <typename type>
void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag, std::vector<type>& value, const std::function<void(const MPI_Status&)>& callback)
{
auto request = communicator.irecv(destination, tag, value);
detach_status(request, callback);
}
// TODO: Future versions of isend/irecv.
// TODO: ASIO service interface.
// Discuss: Promise is a freer packaged_task which enables us to transmit intermediate state in the unique_future before the function returns.
// Question is, does MPI_Test have any intermediate state worth passing to the user prior to completion of request? Should we implement it?
// Discuss: std::async couples the creation and execution of the packaged_task. It could be used to remove the need for global state.
// The idea is to create one thread per detach call. But regular calls to the std::thread constructor is probably horrible. This idea is suboptimal to me,
// and Joachim probably has reasons to use a single thread for the whole thing, but nevertheless worth asking the HPC guys.
// - Use the C interface directly for the Boost interface. Isend/irecv. Futures.
// - Boost IO service.
// - Writing.
// - Particle advector implementation using raw requests and .
void lol()
{
std::call_once()
}
} }
} }
......
#ifndef MPI_DETACH_FUTURE_HPP
#define MPI_DETACH_FUTURE_HPP
#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <list>
#include <mutex>
#include <optional>
#include <thread>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include <mpi.h>
#include <boost/thread.hpp>
#include <boost/mpi.hpp>
namespace mpi::detach::future
{
namespace detail
{
using namespace std::chrono_literals;
struct request
{
// Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, MPI_Status*>()>>;
request (const MPI_Request& request, bool ignores_status = true, void* user_data = nullptr)
: native (request)
, user_data(user_data)
{
if (ignores_status)
task = boost::packaged_task<void*()>([&] ( ) { return request::user_data; });
else
task = boost::packaged_task<std::tuple<void*, MPI_Status*>()>([&] ( ) { return std::make_tuple(request::user_data, &status); });
}
request (const request& that) = delete ;
request ( request&& temp) = default;
~request () = default;
request& operator=(const request& that) = delete ;
request& operator=( request&& temp) = default;
[[nodiscard]]
bool ignores_status() const
{
return std::holds_alternative<boost::packaged_task<void*()>>(task);
}
MPI_Request native;
task_variant task;
void* user_data;
MPI_Status status {};
};
struct collective_request
{
// Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>;
collective_request (std::vector<MPI_Request> requests, bool ignores_status = true, void* user_data = nullptr)
: native (std::move(requests))
, user_data(user_data)
{
if (!ignores_status)
stati.resize(native.size()); // Bug prone: Resizing native post-constructor leads to inconsistent stati size.
if (ignores_status)
task = boost::packaged_task<void*()>([&] ( ) { return collective_request::user_data; });
else
task = boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>([&] ( ) { return std::make_tuple(collective_request::user_data, stati.size(), stati.data()); });
}
collective_request (const collective_request& that) = delete ;
collective_request ( collective_request&& temp) = default;
~collective_request () = default;
collective_request& operator=(const collective_request& that) = delete ;
collective_request& operator=( collective_request&& temp) = default;
[[nodiscard]]
bool ignores_status() const
{
return std::holds_alternative<boost::packaged_task<void*()>>(task);
}
std::vector<MPI_Request> native;
task_variant task;
void* user_data;
std::vector<MPI_Status> stati {};
};
struct state
{
state () : detach_thread([this] { run(); }), detach_thread_running(true)
{
}
state (const state& that) = delete;
state ( state&& temp) = delete;
~state ()
{
{
std::unique_lock<std::mutex> lock(container_mutex);
while (!requests.empty() || !collective_requests.empty())
{
container_condition_variable.notify_one();
lock.unlock();
std::this_thread::sleep_for(2ms);
lock.lock ();
}
detach_thread_running = false;
container_condition_variable.notify_one();
}
detach_thread.join();
}
state& operator=(const state& that) = delete;
state& operator=( state&& temp) = delete;
void run()
{
while (detach_thread_running || !active_requests.empty() || !active_collective_requests.empty())
{
do
{
std::unique_lock<std::mutex> lock(container_mutex);
if (!requests .empty()) active_requests .splice(active_requests .begin(), requests);
if (!collective_requests.empty()) active_collective_requests.splice(active_collective_requests.begin(), collective_requests);
if (active_requests.empty() && active_collective_requests.empty())
while (detach_thread_running && requests.empty() && collective_requests.empty())
container_condition_variable.wait(lock);
}
while (detach_thread_running && active_requests.empty() && active_collective_requests.empty());
if (!active_requests .empty())
{
auto current = active_requests.begin();
auto end = active_requests.end ();
while (current != end)
{
auto done {0};
MPI_Test(&current->native, &done, current->ignores_status() ? MPI_STATUS_IGNORE : &current->status);
if (done)
{
current->ignores_status()
? std::get<boost::packaged_task<void*()>> (current->task)()
: std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(current->task)();
current = active_requests.erase(current);
}
else
++current;
}
}
if (!active_collective_requests.empty())
{
auto current = active_collective_requests.begin();
auto end = active_collective_requests.end ();
while (current != end)
{
auto done {0};
MPI_Testall(static_cast<std::int32_t>(current->native.size()), current->native.data(), &done, current->ignores_status() ? MPI_STATUS_IGNORE : current->stati.data());
if (done)
{
current->ignores_status()
? std::get<boost::packaged_task<void*()>> (current->task)()
: std::get<boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>(current->task)();
current = active_collective_requests.erase(current);
}
else
++current;
}
}
std::this_thread::sleep_for(2ms);
}
}
std::thread detach_thread;
std::atomic_bool detach_thread_running;
std::mutex container_mutex;
std::condition_variable container_condition_variable;
std::list<request> requests {};
std::list<collective_request> collective_requests {};
std::list<request> active_requests {};
std::list<collective_request> active_collective_requests {};
};
inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize.
}
// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.
inline boost::unique_future<void*> MPI_Detach_future ( MPI_Request* request , void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
MPI_Test(request, &done, MPI_STATUS_IGNORE);
if (done)
return boost::make_future(data);
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
auto& instance = detail::global_state->requests.emplace_back(*request, true, data);
detail::global_state->container_condition_variable.notify_one();
*request = MPI_REQUEST_NULL;
return std::get<boost::packaged_task<void*()>>(instance.task).get_future();
}
// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.
inline boost::unique_future<std::tuple<void*, MPI_Status*>> MPI_Detach_status_future ( MPI_Request* request , void* data)
{
if (!detail::global_state) detail::global_state.emplace();
auto done {0};
auto status {MPI_Status()};
MPI_Test(request, &done, &status);
if (done)
return boost::make_future(std::make_tuple(data, &status));
std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
auto& instance = detail::global_state->requests.emplace_back(*request, false, data);
detail::global_state->container_condition_variable.notify_one();
*request = MPI_REQUEST_NULL;
return std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(instance.task).get_future();
}
inline std::int32_t MPI_Finalize ()
{
detail::global_state.reset();
return PMPI_Finalize();
}
}
#endif
\ No newline at end of file
...@@ -24,10 +24,12 @@ TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp") ...@@ -24,10 +24,12 @@ TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp")
std::vector<data> outgoing(1000); std::vector<data> outgoing(1000);
std::vector<data> incoming(1000); std::vector<data> incoming(1000);
auto send_request = communicator.isend(0, 42, outgoing); boost::optional<boost::unique_future<void*>> first, second, third, fourth;
auto recv_request = communicator.irecv(0, 42, incoming); first = mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr);
auto send_status = send_request.test (); second = first->then([&] (boost::unique_future<void*>& first_result)
auto recv_status = recv_request.wait (); {
mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr);
});
// communicator.isend(0, 42, outgoing, [&] (void* user_data) // communicator.isend(0, 42, outgoing, [&] (void* user_data)
// { // {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment