Skip to content
Snippets Groups Projects
Commit fb26c9dd authored by Ali Can Demiralp's avatar Ali Can Demiralp
Browse files

Added initial sources.

parent 8b957a06
No related branches found
No related tags found
No related merge requests found
*build/*
\ No newline at end of file
################################################## Project ##################################################
cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
project (boost_mpi_extensions VERSION 1.0 LANGUAGES CXX)
list (APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)
set_property (GLOBAL PROPERTY USE_FOLDERS ON)
set (CMAKE_CXX_STANDARD 17)
include (set_max_warning_level)
set_max_warning_level ()
################################################## Options ##################################################
option(BUILD_TESTS "Build tests." OFF)
################################################## Sources ##################################################
file(GLOB_RECURSE PROJECT_HEADERS include/*.h include/*.hpp)
file(GLOB_RECURSE PROJECT_CMAKE_UTILS cmake/*.cmake)
file(GLOB_RECURSE PROJECT_MISC *.md *.txt)
set (PROJECT_FILES
${PROJECT_HEADERS}
${PROJECT_CMAKE_UTILS}
${PROJECT_MISC})
include (assign_source_group)
assign_source_group(${PROJECT_FILES})
################################################## Dependencies ##################################################
include(import_library)
find_package (Boost REQUIRED date_time mpi regex)
import_library(Boost_INCLUDE_DIRS Boost_DATE_TIME_LIBRARY_DEBUG Boost_DATE_TIME_LIBRARY_RELEASE)
import_library(Boost_INCLUDE_DIRS Boost_MPI_LIBRARY_DEBUG Boost_MPI_LIBRARY_RELEASE)
import_library(Boost_INCLUDE_DIRS Boost_REGEX_LIBRARY_DEBUG Boost_REGEX_LIBRARY_RELEASE)
import_library(Boost_INCLUDE_DIRS Boost_SERIALIZATION_LIBRARY_DEBUG Boost_SERIALIZATION_LIBRARY_RELEASE)
find_package (Catch2 REQUIRED)
list (APPEND PROJECT_LIBRARIES Catch2::Catch2)
################################################## Targets ##################################################
add_library(${PROJECT_NAME} INTERFACE)
target_include_directories(${PROJECT_NAME} INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
$<INSTALL_INTERFACE:include>)
target_include_directories(${PROJECT_NAME} INTERFACE ${PROJECT_INCLUDE_DIRS})
target_link_libraries (${PROJECT_NAME} INTERFACE ${PROJECT_LIBRARIES})
target_compile_definitions(${PROJECT_NAME} INTERFACE ${PROJECT_COMPILE_DEFINITIONS})
# Hack for header-only project to appear in the IDEs.
add_library(${PROJECT_NAME}_ STATIC ${PROJECT_FILES})
target_include_directories(${PROJECT_NAME}_ PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(${PROJECT_NAME}_ PUBLIC ${PROJECT_INCLUDE_DIRS})
target_link_libraries (${PROJECT_NAME}_ PUBLIC ${PROJECT_LIBRARIES})
target_compile_definitions(${PROJECT_NAME}_ PUBLIC ${PROJECT_COMPILE_DEFINITIONS})
set_target_properties (${PROJECT_NAME}_ PROPERTIES LINKER_LANGUAGE CXX)
################################################## Testing ##################################################
if(BUILD_TESTS)
enable_testing ()
set (TEST_MAIN_NAME catch_main)
set (TEST_MAIN_SOURCES tests/catch/main.cpp)
add_library (${TEST_MAIN_NAME} OBJECT ${TEST_MAIN_SOURCES})
set_property (TARGET ${TEST_MAIN_NAME} PROPERTY FOLDER tests/catch)
assign_source_group(${TEST_MAIN_SOURCES})
file(GLOB PROJECT_TEST_CPPS tests/*.cpp)
foreach(_SOURCE ${PROJECT_TEST_CPPS})
get_filename_component(_NAME ${_SOURCE} NAME_WE)
add_executable (${_NAME} ${_SOURCE} $<TARGET_OBJECTS:${TEST_MAIN_NAME}>)
target_link_libraries (${_NAME} ${PROJECT_NAME})
add_test (${_NAME} ${_NAME})
set_property (TARGET ${_NAME} PROPERTY FOLDER tests)
assign_source_group (${_SOURCE})
endforeach()
endif()
################################################## Installation ##################################################
install(TARGETS ${PROJECT_NAME} EXPORT ${PROJECT_NAME}-config)
install(DIRECTORY include/ DESTINATION include)
install(EXPORT ${PROJECT_NAME}-config DESTINATION cmake)
export (TARGETS ${PROJECT_NAME} FILE ${PROJECT_NAME}-config.cmake)
if not exist "build" mkdir build
cd build
if not exist "vcpkg" git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
if not exist "vcpkg.exe" call bootstrap-vcpkg.bat
set VCPKG_COMMAND=vcpkg install --recurse
set VCPKG_DEFAULT_TRIPLET=x64-windows
rem Add your library ports here.
%VCPKG_COMMAND% boost[mpi] catch2
cd ..
cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake ..
cmake --build . --target ALL_BUILD --config Release
cd ..
\ No newline at end of file
#!/bin/bash
if [ ! -d "build" ]; then mkdir build fi
cd build
if [ ! -d "vcpkg" ]; then git clone https://github.com/Microsoft/vcpkg.git fi
cd vcpkg
if [ ! -f "vcpkg" ]; then ./bootstrap-vcpkg.sh fi
VCPKG_COMMAND=vcpkg install --recurse
VCPKG_DEFAULT_TRIPLET=x64-linux
# Add your library ports here.
$VCPKG_COMMAND boost[mpi] catch2
cd ..
cmake -Ax64 -DCMAKE_TOOLCHAIN_FILE=./vcpkg/scripts/buildsystems/vcpkg.cmake ..
cmake --build . --target ALL_BUILD --config Release
cd ..
\ No newline at end of file
# Assigns the given files to source groups identical to their location.
function(assign_source_group)
foreach(_SOURCE IN ITEMS ${ARGN})
if (IS_ABSOLUTE "${_SOURCE}")
file(RELATIVE_PATH _SOURCE_REL "${CMAKE_CURRENT_SOURCE_DIR}" "${_SOURCE}")
else()
set(_SOURCE_REL "${_SOURCE}")
endif()
get_filename_component(_SOURCE_PATH "${_SOURCE_REL}" PATH)
if(WIN32)
string(REPLACE "/" "\\" _SOURCE_PATH_MSVC "${_SOURCE_PATH}")
source_group("${_SOURCE_PATH_MSVC}" FILES "${_SOURCE}")
else()
source_group("${_SOURCE_PATH}" FILES "${_SOURCE}")
endif()
endforeach()
endfunction(assign_source_group)
# Imports a library which is not built with cmake.
# The include directories are appended to the PROJECT_INCLUDE_DIRS variable.
# The libraries are appended to the PROJECT_LIBRARIES variable.
# Usage:
# Header Only:
# import_library(INCLUDE_DIRS)
# Identical Debug and Release:
# import_library(INCLUDE_DIRS LIBRARIES)
# Separate Debug and Release:
# import_library(INCLUDE_DIRS DEBUG_LIBRARIES RELEASE_LIBRARIES)
function(import_library INCLUDE_DIRS)
set (PROJECT_INCLUDE_DIRS ${PROJECT_INCLUDE_DIRS} ${${INCLUDE_DIRS}} PARENT_SCOPE)
set (_EXTRA_ARGS ${ARGN})
list(LENGTH _EXTRA_ARGS _EXTRA_ARGS_LENGTH)
if (_EXTRA_ARGS_LENGTH EQUAL 1)
list(GET _EXTRA_ARGS 0 _LIBRARIES)
set (PROJECT_LIBRARIES ${PROJECT_LIBRARIES} ${${_LIBRARIES}} PARENT_SCOPE)
elseif(_EXTRA_ARGS_LENGTH EQUAL 2)
list(GET _EXTRA_ARGS 0 _DEBUG_LIBRARIES )
list(GET _EXTRA_ARGS 1 _RELEASE_LIBRARIES)
set (PROJECT_LIBRARIES ${PROJECT_LIBRARIES} debug ${${_DEBUG_LIBRARIES}} optimized ${${_RELEASE_LIBRARIES}} PARENT_SCOPE)
endif ()
endfunction(import_library)
function(set_max_warning_level)
if(MSVC)
if(CMAKE_CXX_FLAGS MATCHES "/W[0-4]")
string(REGEX REPLACE "/W[0-4]" "/W4" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4")
endif()
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wno-long-long -pedantic")
endif()
endfunction()
\ No newline at end of file
#ifndef BOOST_MPI_EXTENSIONS_HPP
#define BOOST_MPI_EXTENSIONS_HPP
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <list>
#include <mutex>
#include <thread>
#include <mpi.h>
namespace boost::mpi
{
namespace detail
{
typedef void MPI_Detach_callback (void*);
typedef void MPI_Detach_callback_status (void*, MPI_Status*);
typedef void MPI_Detach_callback_statuses(void*, std::int32_t, MPI_Status*);
std::int32_t MPI_Detach ( MPI_Request* request, MPI_Detach_callback* callback, void* data);
std::int32_t MPI_Detach_status ( MPI_Request* request, MPI_Detach_callback_status* callback, void* data);
std::int32_t MPI_Detach_each (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* array_of_data[]);
std::int32_t MPI_Detach_each_status(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_status* callback, void* array_of_data[]);
std::int32_t MPI_Detach_all (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data);
std::int32_t MPI_Detach_all_status (std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data);
struct single_request
{
single_request(MPI_Request* request, MPI_Detach_callback* callback, void* data)
: request (*request)
, callback (callback)
, callback_status(nullptr)
, status ()
, data (data)
, status_ptr (MPI_STATUS_IGNORE)
{
*request = MPI_REQUEST_NULL;
}
single_request(MPI_Request* request, MPI_Detach_callback_status* callback, void* data)
: request (*request)
, callback (nullptr)
, callback_status(callback)
, status ()
, data (data)
, status_ptr (&status)
{
*request = MPI_REQUEST_NULL;
}
MPI_Request request;
MPI_Detach_callback* callback;
MPI_Detach_callback_status* callback_status;
MPI_Status status;
void* data;
MPI_Status* status_ptr; // Pointer to status or MPI_STATUS_IGNORE.
};
struct all_request
{
all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback* callback, void* data)
: count (count)
, request (new MPI_Request[count])
, callback (callback)
, callback_statuses(nullptr)
, statuses (MPI_STATUSES_IGNORE)
, data (data)
{
for (auto i = 0; i < count; i++)
{
request [i] = array_of_requests[i];
array_of_requests[i] = MPI_REQUEST_NULL;
}
}
all_request(std::int32_t count, MPI_Request* array_of_requests, MPI_Detach_callback_statuses* callback, void* data)
: count (count)
, request (new MPI_Request[count])
, callback (nullptr)
, callback_statuses(callback)
, statuses (new MPI_Status[count])
, data (data) {
for (auto i = 0; i < count; i++)
{
request [i] = array_of_requests[i];
array_of_requests[i] = MPI_REQUEST_NULL;
}
}
~all_request()
{
delete[] request;
if (statuses != MPI_STATUSES_IGNORE)
delete[] statuses;
}
std::int32_t count;
MPI_Request* request;
MPI_Detach_callback* callback;
MPI_Detach_callback_statuses* callback_statuses;
MPI_Status* statuses;
void* data;
};
struct state
{
std::atomic<std::int32_t> running {1};
std::thread detach_thread;
std::mutex list_mutex;
std::condition_variable list_condition_variable;
std::once_flag once_flag;
std::int32_t initialized {0};
std::list<single_request*> singleRequestsQueue {};
std::list<all_request*> allRequestsQueue {};
std::list<single_request*> singleRequests {};
std::list<all_request*> allRequests {};
};
static state global_state;
}
}
#endif
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
using namespace std::chrono_literals;
void run(void) {
while (running || !singleRequests.empty() || !allRequests.empty()) {
do {
std::unique_lock<std::mutex> lck(listMtx);
if (!singleRequestsQueue.empty())
singleRequests.splice(singleRequests.begin(), singleRequestsQueue);
if (!allRequestsQueue.empty())
allRequests.splice(allRequests.begin(), allRequestsQueue);
if (singleRequests.empty() && allRequests.empty())
while (running && singleRequestsQueue.empty() &&
allRequestsQueue.empty()) {
listCv.wait(lck);
}
} while (running && singleRequests.empty() && allRequests.empty());
if (!singleRequests.empty()) {
auto iter = singleRequests.begin();
auto end = singleRequests.end();
while (iter != end) {
int flag;
MPI_Test(&(*iter)->req, &flag, (*iter)->statusP);
if (flag) { //
if ((*iter)->callback)
(*iter)->callback((*iter)->data);
else
(*iter)->callback_status((*iter)->data, (*iter)->statusP);
delete (*iter);
iter = singleRequests.erase(iter);
} else {
iter++;
}
}
}
if (!allRequests.empty()){
auto iter = allRequests.begin();
auto end = allRequests.end();
while (iter != end) {
int flag;
MPI_Testall((*iter)->count, (*iter)->req, &flag, (*iter)->statuses);
if (flag) { //
if ((*iter)->callback)
(*iter)->callback((*iter)->data);
else
(*iter)->callback_statuses((*iter)->data, (*iter)->count, (*iter)->statuses);
delete (*iter);
iter = allRequests.erase(iter);
} else {
iter++;
}
}
}
std::this_thread::sleep_for(2ms);
}
}
// using __attribute__((constructor)) does not work,
// because c++ std library might not be initialized yet
void initDetach() {
initialized = 1;
detachThread = std::thread(run);
}
// This function is assigned to execute after
// main using __attribute__((destructor))
void finiDetach() {
{
std::unique_lock<std::mutex> lck(listMtx);
// make sure all requests get finally processed
// access to the *Queue lists is shared, so need to be locked
while (!singleRequestsQueue.empty() || !allRequestsQueue.empty())
{
listCv.notify_one();
lck.unlock();
std::this_thread::sleep_for(2ms);
lck.lock();
}// after while, lock is always set for changing running and notify
running = 0;
listCv.notify_one();
}// now wait for the progress thread to finish, i.e. all requests are handled
detachThread.join();
}
int MPI_Detach(MPI_Request *request, MPI_Detach_callback *callback,
void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Test(request, &flag, MPI_STATUS_IGNORE);
if (flag) {
callback(data);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(new singleRequest(request, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_status(MPI_Request *request,
MPI_Detach_callback_status *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status status;
MPI_Test(request, &flag, &status);
if (flag) {
callback(data, &status);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(new singleRequest(request, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_each(int count, MPI_Request* array_of_requests,
MPI_Detach_callback *callback, void *array_of_data[]) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
for (int i = 0; i < count; i++) {
MPI_Test(array_of_requests + i, &flag, MPI_STATUS_IGNORE);
if (flag) {
callback(array_of_data[i]);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(
new singleRequest(array_of_requests + i, callback, array_of_data[i]));
listCv.notify_one();
}
}
return MPI_SUCCESS;
}
int MPI_Detach_each_status(int count, MPI_Request* array_of_requests,
MPI_Detach_callback_status *callback,
void *array_of_data[]) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status status;
for (int i = 0; i < count; i++) {
MPI_Test(array_of_requests + i, &flag, &status);
if (flag) {
callback(array_of_data[i], &status);
} else {
std::unique_lock<std::mutex> lck(listMtx);
singleRequestsQueue.push_back(
new singleRequest(array_of_requests + i, callback, array_of_data[i]));
listCv.notify_one();
}
}
return MPI_SUCCESS;
}
int MPI_Detach_all(int count, MPI_Request* array_of_requests,
MPI_Detach_callback *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Testall(count, array_of_requests, &flag, MPI_STATUSES_IGNORE);
if (flag) {
callback(data);
} else {
std::unique_lock<std::mutex> lck(listMtx);
allRequestsQueue.push_back(
new allRequest(count, array_of_requests, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Detach_all_status(int count, MPI_Request* array_of_requests,
MPI_Detach_callback_statuses *callback, void *data) {
if (!initialized)
std::call_once(onceFlag, initDetach);
int flag;
MPI_Status statuses[count];
MPI_Testall(count, array_of_requests, &flag, statuses);
if (flag) {
callback(data, count, statuses);
} else {
std::unique_lock<std::mutex> lck(listMtx);
allRequestsQueue.push_back(
new allRequest(count, array_of_requests, callback, data));
listCv.notify_one();
}
return MPI_SUCCESS;
}
int MPI_Finalize(){
// we need to make sure, all communication is finished
// before calling MPI_Finalize
if (initialized)
finiDetach();
return PMPI_Finalize();
}
#define CATCH_CONFIG_MAIN
#include "catch.hpp"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment