From 8b6dc90ad05e4e090a0bb5770d5fb34d7b358ce8 Mon Sep 17 00:00:00 2001
From: acdemiralp <demiralpali@gmail.com>
Date: Thu, 7 May 2020 23:42:17 +0200
Subject: [PATCH] Progress.

---
 .../mpi_detach_boost_extensions.hpp           | 185 +++++++++++++
 .../mpi_detach_future.hpp                     | 243 ++++++++++++++++++
 tests/detach_boost_extensions_test.cpp        |  10 +-
 3 files changed, 434 insertions(+), 4 deletions(-)
 create mode 100644 include/boost_mpi_extensions/mpi_detach_future.hpp

diff --git a/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp b/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp
index 54f4699..91ac259 100644
--- a/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp
+++ b/include/boost_mpi_extensions/mpi_detach_boost_extensions.hpp
@@ -1,15 +1,200 @@
 #ifndef MPI_DETACH_BOOST_EXTENSIONS_HPP
 #define MPI_DETACH_BOOST_EXTENSIONS_HPP
 
+#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP
+#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
+#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
+#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD
+
+#include <boost/asio.hpp>
+#include <boost/thread.hpp>
 #include <boost/mpi.hpp>
 
 #include "mpi_detach.hpp"
+#include "mpi_detach_future.hpp"
 
 namespace boost
 {
 namespace mpi
 {
+// Callback interface.
+// TODO: Implement for non-trivials.
+inline void detach            (request&                     request , const std::function<void()>&                               callback)
+{
+  if (request.trivial())
+    MPI_Detach(&request.trivial().get(), [&callback] (void* user_data) { callback(); }, nullptr);
+}
+inline void detach_status     (request&                     request , const std::function<void(const MPI_Status&)>&              callback)
+{
+  if (request.trivial())
+    MPI_Detach_status(&request.trivial().get(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, nullptr);
+}
+inline void detach_each       (const std::vector<request&>& requests, const std::function<void()>&                               callback)
+{
+  std::vector<MPI_Request> native_requests(requests.size());
+  std::vector<void*>       user_data      (requests.size(), nullptr);
+  for (auto i = 0; i < requests.size(); ++i)
+    native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
+  MPI_Detach_each(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, user_data.data());
+}
+inline void detach_each_status(const std::vector<request&>& requests, const std::function<void(const MPI_Status&)>&              callback)
+{
+  std::vector<MPI_Request> native_requests(requests.size());
+  std::vector<void*>       user_data      (requests.size(), nullptr);
+  for (auto i = 0; i < requests.size(); ++i)
+    native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
+  MPI_Detach_each_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, MPI_Status* status) { callback(*status); }, user_data.data());
+}
+inline void detach_all        (const std::vector<request&>& requests, const std::function<void()>&                               callback)
+{
+  std::vector<MPI_Request> native_requests(requests.size());
+  for (auto i = 0; i < requests.size(); ++i)
+    native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
+  MPI_Detach_all(native_requests.size(), native_requests.data(), [&callback] (void* user_data) { callback(); }, nullptr);
+}
+inline void detach_all_status (const std::vector<request&>& requests, const std::function<void(const std::vector<MPI_Status>&)>& callback)
+{
+  std::vector<MPI_Request> native_requests(requests.size());
+  for (auto i = 0; i < requests.size(); ++i)
+    native_requests[i] = requests[i].trivial() ? requests[i].trivial().get() : MPI_REQUEST_NULL;
+  MPI_Detach_all_status(native_requests.size(), native_requests.data(), [&callback] (void* user_data, std::int32_t count, MPI_Status* status)
+  {
+    std::vector<MPI_Status> stati(count);
+    for (auto i = 0; i < count; ++ i)
+      stati[i] = status[i];
+    callback(stati);
+  }, nullptr);
+}
+
+// Convenience for isend and irecv.
+// Discuss: User data is hidden, since the caller may instead capture any variable he/she likes in a lambda. We could instead provide it in a strong-typed (non-void*) manner through a template parameter if you like.
+// Its just too many parameters already, and user_data concept is a non C++ in general.
+inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag,                                                        const std::function<void()>&                  callback)
+{
+  auto request = communicator.isend(destination, tag);
+  detach(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type&                 value,                     const std::function<void()>&                  callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value,                     const std::function<void()>&                  callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type*                 value, std::int32_t count, const std::function<void()>&                  callback)
+{
+  auto request = communicator.isend(destination, tag, value, count);
+  detach(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>&    value,                     const std::function<void()>&                  callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach(request, callback);
+}
+
+inline void isend(const communicator& communicator, std::int32_t destination, std::int32_t tag,                                                        const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.isend(destination, tag);
+  detach_status(request, callback);
+}
+template <typename type>
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type&                 value,                     const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach_status(request, callback);
+}
+template <typename type>
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const skeleton_proxy<type>& value,                     const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach_status(request, callback);
+}
+template <typename type>
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const type*                 value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.isend(destination, tag, value, count);
+  detach_status(request, callback);
+}
+template <typename type>
+void        isend(const communicator& communicator, std::int32_t destination, std::int32_t tag, const std::vector<type>&    value,                     const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.isend(destination, tag, value);
+  detach_status(request, callback);
+}
 
+inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,                                                        const std::function<void()>&                  callback)
+{
+  auto request = communicator.irecv(destination, tag);
+  detach(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       type&                 value,                     const std::function<void()>&                  callback)
+{                                                                                                     
+  auto request = communicator.irecv(destination, tag, value);                                         
+  detach(request, callback);                                                                          
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       type*                 value, std::int32_t count, const std::function<void()>&                  callback)
+{                                                                                                     
+  auto request = communicator.irecv(destination, tag, value, count);                                  
+  detach(request, callback);                                                                          
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       std::vector<type>&    value,                     const std::function<void()>&                  callback)
+{
+  auto request = communicator.irecv(destination, tag, value);
+  detach(request, callback);
+}
+
+inline void irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,                                                        const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.irecv(destination, tag);
+  detach_status(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       type&                 value,                     const std::function<void(const MPI_Status&)>& callback)
+{                                                                                                     
+  auto request = communicator.irecv(destination, tag, value);                                         
+  detach_status(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       type*                 value, std::int32_t count, const std::function<void(const MPI_Status&)>& callback)
+{                                                                                                     
+  auto request = communicator.irecv(destination, tag, value, count);                                  
+  detach_status(request, callback);
+}
+template <typename type>                                                                                                                                                                             
+void        irecv(const communicator& communicator, std::int32_t destination, std::int32_t tag,       std::vector<type>&    value,                     const std::function<void(const MPI_Status&)>& callback)
+{
+  auto request = communicator.irecv(destination, tag, value);
+  detach_status(request, callback);
+}
+
+// TODO: Future versions of isend/irecv.
+// TODO: ASIO service interface.
+
+// Discuss: Promise is a freer packaged_task which enables us to transmit intermediate state in the unique_future before the function returns.
+// Question is, does MPI_Test have any intermediate state worth passing to the user prior to completion of request? Should we implement it?
+
+// Discuss: std::async couples the creation and execution of the packaged_task. It could be used to remove the need for global state.
+// The idea is to create one thread per detach call. But regular calls to the std::thread constructor is probably horrible. This idea is suboptimal to me,
+// and Joachim probably has reasons to use a single thread for the whole thing, but nevertheless worth asking the HPC guys.
+
+// - Use the C interface directly for the Boost interface. Isend/irecv. Futures.
+// - Boost IO service.
+// - Writing.
+// - Particle advector implementation using raw requests and .
+void lol()
+{
+  std::call_once()
+}
 }
 }
 
diff --git a/include/boost_mpi_extensions/mpi_detach_future.hpp b/include/boost_mpi_extensions/mpi_detach_future.hpp
new file mode 100644
index 0000000..3ea836b
--- /dev/null
+++ b/include/boost_mpi_extensions/mpi_detach_future.hpp
@@ -0,0 +1,243 @@
+#ifndef MPI_DETACH_FUTURE_HPP
+#define MPI_DETACH_FUTURE_HPP
+
+#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP
+#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
+#define BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
+#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <cstdint>
+#include <list>
+#include <mutex>
+#include <optional>
+#include <thread>
+#include <tuple>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include <mpi.h>
+
+#include <boost/thread.hpp>
+#include <boost/mpi.hpp>
+
+namespace mpi::detach::future
+{
+namespace detail
+{
+using namespace std::chrono_literals;
+
+struct request
+{
+  // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
+  using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, MPI_Status*>()>>;
+
+  request           (const MPI_Request& request, bool ignores_status = true, void* user_data = nullptr)
+  : native   (request)
+  , user_data(user_data)
+  {
+    if (ignores_status)
+      task = boost::packaged_task<void*()>([&] ( ) { return request::user_data; });
+    else
+      task = boost::packaged_task<std::tuple<void*, MPI_Status*>()>([&] ( ) { return std::make_tuple(request::user_data, &status); });
+  }
+  request           (const request&  that) = delete ;
+  request           (      request&& temp) = default;
+ ~request           ()                     = default;
+  request& operator=(const request&  that) = delete ;
+  request& operator=(      request&& temp) = default;
+
+  [[nodiscard]]
+  bool ignores_status() const
+  {
+    return std::holds_alternative<boost::packaged_task<void*()>>(task);
+  }
+
+  MPI_Request  native;
+  task_variant task;
+  void*        user_data;
+  MPI_Status   status {};
+};
+struct collective_request
+{
+  // Note: The void* is outdated practice, yet the alternative in this case requires std::any in state or the reflection proposal.
+  using task_variant = std::variant<boost::packaged_task<void*()>, boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>;
+
+  collective_request           (std::vector<MPI_Request> requests, bool ignores_status = true, void* user_data = nullptr)
+  : native   (std::move(requests))
+  , user_data(user_data)
+  {
+    if (!ignores_status)
+      stati.resize(native.size()); // Bug prone: Resizing native post-constructor leads to inconsistent stati size.
+
+    if (ignores_status)
+      task = boost::packaged_task<void*()>([&] ( ) { return collective_request::user_data; });
+    else
+      task = boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>([&] ( ) { return std::make_tuple(collective_request::user_data, stati.size(), stati.data()); });
+  }
+  collective_request           (const collective_request&  that) = delete ;
+  collective_request           (      collective_request&& temp) = default;
+ ~collective_request           ()                                = default;
+  collective_request& operator=(const collective_request&  that) = delete ;
+  collective_request& operator=(      collective_request&& temp) = default;
+
+  [[nodiscard]]
+  bool ignores_status() const
+  {
+    return std::holds_alternative<boost::packaged_task<void*()>>(task);
+  }
+
+  std::vector<MPI_Request> native;
+  task_variant             task;
+  void*                    user_data;
+  std::vector<MPI_Status>  stati {};
+};
+struct state
+{
+  state           () : detach_thread([this] { run(); }), detach_thread_running(true)
+  {
+
+  }
+  state           (const state&  that) = delete;
+  state           (      state&& temp) = delete;
+ ~state           ()
+  {
+    {
+      std::unique_lock<std::mutex> lock(container_mutex);
+      while (!requests.empty() || !collective_requests.empty())
+      {
+        container_condition_variable.notify_one();
+        lock.unlock();
+        std::this_thread::sleep_for(2ms);
+        lock.lock  ();
+      }
+      detach_thread_running = false;
+      container_condition_variable.notify_one();
+    }
+    detach_thread.join();
+  }
+  state& operator=(const state&  that) = delete;
+  state& operator=(      state&& temp) = delete;
+
+  void run()
+  {
+    while (detach_thread_running || !active_requests.empty() || !active_collective_requests.empty()) 
+    {
+      do 
+      {
+        std::unique_lock<std::mutex> lock(container_mutex);
+        if (!requests           .empty()) active_requests           .splice(active_requests           .begin(), requests);
+        if (!collective_requests.empty()) active_collective_requests.splice(active_collective_requests.begin(), collective_requests);
+
+        if (active_requests.empty() && active_collective_requests.empty())
+          while (detach_thread_running && requests.empty() && collective_requests.empty())
+            container_condition_variable.wait(lock);
+      }
+      while (detach_thread_running && active_requests.empty() && active_collective_requests.empty());
+
+      if (!active_requests           .empty()) 
+      {
+        auto current = active_requests.begin();
+        auto end     = active_requests.end  ();
+        while (current != end) 
+        {
+          auto done {0};
+
+          MPI_Test(&current->native, &done, current->ignores_status() ? MPI_STATUS_IGNORE : &current->status);
+          if (done) 
+          {
+            current->ignores_status()
+              ? std::get<boost::packaged_task<void*()>>                         (current->task)()
+              : std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(current->task)();
+            current = active_requests.erase(current);
+          }
+          else
+            ++current;
+        }
+      }
+      if (!active_collective_requests.empty()) 
+      {
+        auto current = active_collective_requests.begin();
+        auto end     = active_collective_requests.end  ();
+        while (current != end) 
+        {
+          auto done {0};
+
+          MPI_Testall(static_cast<std::int32_t>(current->native.size()), current->native.data(), &done, current->ignores_status() ? MPI_STATUS_IGNORE : current->stati.data());
+          if (done) 
+          {
+            current->ignores_status()
+              ? std::get<boost::packaged_task<void*()>>                                       (current->task)()
+              : std::get<boost::packaged_task<std::tuple<void*, std::int32_t, MPI_Status*>()>>(current->task)();
+            current = active_collective_requests.erase(current);
+          }
+          else
+            ++current;
+        }
+      }
+
+      std::this_thread::sleep_for(2ms);
+    }
+  }
+                                    
+  std::thread                   detach_thread;
+  std::atomic_bool              detach_thread_running;
+
+  std::mutex                    container_mutex;
+  std::condition_variable       container_condition_variable;
+
+  std::list<request>            requests                   {};
+  std::list<collective_request> collective_requests        {};
+  std::list<request>            active_requests            {};
+  std::list<collective_request> active_collective_requests {};
+};
+
+inline std::optional<state> global_state; // Note: External-linkage optional used as a lazy-initialized stack variable. Must be reset prior to MPI_Finalize.
+}
+
+// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.
+inline boost::unique_future<void*>                                        MPI_Detach_future            (                    MPI_Request* request , void*  data)
+{
+  if (!detail::global_state) detail::global_state.emplace();
+
+  auto done {0};
+
+  MPI_Test(request, &done, MPI_STATUS_IGNORE);
+  if (done)
+    return boost::make_future(data);
+
+  std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
+  auto& instance = detail::global_state->requests.emplace_back(*request, true, data);
+  detail::global_state->container_condition_variable.notify_one();
+  *request = MPI_REQUEST_NULL;
+  return std::get<boost::packaged_task<void*()>>(instance.task).get_future();
+}
+// Note: If the test does not succeed immediately, takes the ownership of the request and invalidates it.                                   
+inline boost::unique_future<std::tuple<void*, MPI_Status*>>               MPI_Detach_status_future     (                    MPI_Request* request , void*  data)
+{
+  if (!detail::global_state) detail::global_state.emplace();
+
+  auto done   {0};
+  auto status {MPI_Status()};
+
+  MPI_Test(request, &done, &status);
+  if (done)
+    return boost::make_future(std::make_tuple(data, &status));
+
+  std::unique_lock<std::mutex> lock(detail::global_state->container_mutex);
+  auto& instance = detail::global_state->requests.emplace_back(*request, false, data);
+  detail::global_state->container_condition_variable.notify_one();
+  *request = MPI_REQUEST_NULL;
+  return std::get<boost::packaged_task<std::tuple<void*, MPI_Status*>()>>(instance.task).get_future();
+}
+inline std::int32_t                                                       MPI_Finalize                 ()
+{
+  detail::global_state.reset();
+  return PMPI_Finalize();
+}
+}
+
+#endif
\ No newline at end of file
diff --git a/tests/detach_boost_extensions_test.cpp b/tests/detach_boost_extensions_test.cpp
index db933aa..3a08fed 100644
--- a/tests/detach_boost_extensions_test.cpp
+++ b/tests/detach_boost_extensions_test.cpp
@@ -24,10 +24,12 @@ TEST_CASE("Detach boost extensions test", "mpi_detach_boost_extensions.hpp")
   std::vector<data> outgoing(1000);
   std::vector<data> incoming(1000);
 
-  auto send_request = communicator.isend(0, 42, outgoing);
-  auto recv_request = communicator.irecv(0, 42, incoming);
-  auto send_status  = send_request.test ();
-  auto recv_status  = recv_request.wait ();
+  boost::optional<boost::unique_future<void*>> first, second, third, fourth;
+  first  = mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr);
+  second = first->then([&] (boost::unique_future<void*>& first_result)
+  {
+    mpi::detach::future::MPI_Detach_future(&communicator.isend(0, 42, outgoing).trivial().get(), nullptr);
+  });
 
   // communicator.isend(0, 42, outgoing, [&] (void* user_data)
   // {
-- 
GitLab