Skip to content
Snippets Groups Projects
Commit 7b5f0743 authored by Ali Can Demiralp's avatar Ali Can Demiralp
Browse files

Progress.

parent 9daeac76
No related branches found
No related tags found
No related merge requests found
......@@ -34,11 +34,11 @@ public:
{
using particle_map = tbb::concurrent_hash_map<relative_direction, std::vector<particle<vector3, integer>>>;
std::size_t particle_count = 0;
std::size_t curve_stride = 0;
std::size_t vertex_count = 0;
particle_map out_of_bounds_particles {};
particle_map neighbor_out_of_bounds_particles {};
std::size_t particle_count = 0;
std::size_t curve_stride = 0;
std::size_t vertex_count = 0;
particle_map out_of_bounds_particles {};
particle_map neighbor_out_of_bounds_particles{};
};
struct load_balancing_info
{
......@@ -71,13 +71,13 @@ protected:
bool check_completion ( const std::vector<particle<vector3, integer>>& active_particles);
void load_balance_distribute ( std::vector<particle<vector3, integer>>& active_particles);
round_info compute_round_info ( const std::vector<particle<vector3, integer>>& active_particles, const integral_curves_3d& integral_curves);
void allocate_integral_curves( const std::vector<particle<vector3, integer>>& active_particles, integral_curves_3d& integral_curves, const round_info& round_info);
void advect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& active_particles, std::vector<particle<vector3, integer>>& inactive_particles, integral_curves_3d& integral_curves, round_info& round_info);
void load_balance_collect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& inactive_particles, round_info& round_info);
void out_of_bounds_distribute( std::vector<particle<vector3, integer>>& active_particles, const round_info& round_info);
round_info compute_round_info ( const std::vector<particle<vector3, integer>>& active_particles);
void allocate_integral_curves( integral_curves_3d& integral_curves, const round_info& round_info);
void advect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& active_particles, std::vector<particle<vector3, integer>>& inactive_particles, integral_curves_3d& integral_curves, round_info& round_info);
void load_balance_collect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, round_info& round_info);
void out_of_bounds_distribute( std::vector<particle<vector3, integer>>& active_particles, const round_info& round_info);
void gather_particles ( std::vector<particle<vector3, integer>>& inactive_particles);
void prune_integral_curves ( integral_curves_3d& integral_curves);
void prune_integral_curves ( integral_curves_3d& integral_curves);
domain_partitioner* partitioner_ {};
integer particles_per_round_ {};
......
......@@ -63,31 +63,31 @@ std::int32_t pipeline::run(std::int32_t argc, char** argv)
particle_advector::round_info round_info;
recorder.record("4.1." + std::to_string(rounds) + ".load_balance_distribute" , [&] ()
{
advector.load_balance_distribute ( particles );
advector.load_balance_distribute ( particles );
});
recorder.record("4.2." + std::to_string(rounds) + ".compute_round_info" , [&] ()
{
round_info = advector.compute_round_info ( particles, output.integral_curves );
round_info = advector.compute_round_info ( particles );
});
recorder.record("4.3." + std::to_string(rounds) + ".allocate_integral_curves", [&] ()
{
advector.allocate_integral_curves( particles, output.integral_curves, round_info);
advector.allocate_integral_curves( output.integral_curves, round_info);
});
recorder.record("4.4." + std::to_string(rounds) + ".advect" , [&] ()
{
advector.advect (vector_fields, particles, output.particles, output.integral_curves, round_info);
advector.advect (vector_fields, particles, output.particles, output.integral_curves, round_info);
});
recorder.record("4.5." + std::to_string(rounds) + ".load_balance_collect" , [&] ()
{
advector.load_balance_collect (vector_fields, output.particles, round_info);
advector.load_balance_collect (vector_fields, round_info);
});
recorder.record("4.6." + std::to_string(rounds) + ".out_of_bounds_distribute", [&] ()
{
advector.out_of_bounds_distribute( particles, round_info);
advector.out_of_bounds_distribute( particles, round_info);
});
recorder.record("4.7." + std::to_string(rounds) + ".check_completion" , [&] ()
{
complete = advector.check_completion ( particles );
complete = advector.check_completion ( particles );
});
rounds++;
}
......
......@@ -39,10 +39,10 @@ particle_advector::output particle_advector::advect (const
while (!check_completion(particles))
{
load_balance_distribute ( particles );
auto round_info = compute_round_info ( particles, output.integral_curves );
allocate_integral_curves( particles, output.integral_curves, round_info);
auto round_info = compute_round_info ( particles );
allocate_integral_curves( output.integral_curves, round_info);
advect (vector_fields, particles, output.particles, output.integral_curves, round_info);
load_balance_collect (vector_fields, output.particles, round_info);
load_balance_collect (vector_fields, round_info);
out_of_bounds_distribute( particles, round_info);
}
gather_particles (output.particles );
......@@ -111,7 +111,7 @@ void particle_advector::load_balance_distribute (
#endif
}
}
particle_advector::round_info particle_advector::compute_round_info ( const std::vector<particle<vector3, integer>>& particles, const integral_curves_3d& integral_curves)
particle_advector::round_info particle_advector::compute_round_info ( const std::vector<particle<vector3, integer>>& particles)
{
round_info round_info;
round_info.particle_count = std::min(std::size_t(particles_per_round_), particles.size());
......@@ -124,20 +124,17 @@ particle_advector::round_info particle_advector::compute_round_info (
}
for (auto& partition : partitioner_->partitions())
{
round_info.out_of_bounds_particles .emplace(partition.first, std::vector<particle<vector3, integer>>());
round_info.neighbor_out_of_bounds_particles.emplace(partition.first, std::vector<particle<vector3, integer>>());
}
round_info.out_of_bounds_particles.emplace(partition.first, std::vector<particle<vector3, integer>>());
return round_info;
}
void particle_advector::allocate_integral_curves( const std::vector<particle<vector3, integer>>& particles, integral_curves_3d& integral_curves, const round_info& round_info)
void particle_advector::allocate_integral_curves( integral_curves_3d& integral_curves, const round_info& round_info)
{
if (!record_) return;
integral_curves.emplace_back().resize(round_info.vertex_count, invalid_value<vector3>());
}
void particle_advector::advect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& particles, std::vector<particle<vector3, integer>>& inactive_particles, integral_curves_3d& integral_curves, round_info& round_info)
void particle_advector::advect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& particles, std::vector<particle<vector3, integer>>& inactive_particles, integral_curves_3d& integral_curves, round_info& round_info)
{
tbb::mutex mutex;
tbb::parallel_for(std::size_t(0), round_info.particle_count, std::size_t(1), [&] (const std::size_t particle_index)
......@@ -218,7 +215,7 @@ void particle_advector::advect (const
});
particles.resize(particles.size() - round_info.particle_count);
}
void particle_advector::load_balance_collect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, std::vector<particle<vector3, integer>>& inactive_particles, round_info& round_info)
void particle_advector::load_balance_collect (const std::unordered_map<relative_direction, regular_vector_field_3d>& vector_fields, round_info& round_info)
{
if (load_balancer_ == load_balancer::none) return;
......@@ -227,34 +224,65 @@ void particle_advector::load_balance_collect (const
#ifdef DPA_USE_NEIGHBORHOOD_COLLECTIVES
// TODO: Neighborhood collectives.
#else
auto communicator = partitioner_->cartesian_communicator();
auto& partitions = partitioner_->partitions ();
round_info::particle_map outgoing_particles;
std::vector<particle<vector3, integer>> incoming_particles;
std::vector<particle<vector3, integer>> collected_particles;
std::vector<boost::mpi::request> requests;
// TODO: Collect particles.
for (auto& request : requests)
request.wait();
// Prepare.
{
for (auto& partition : partitioner_->partitions())
outgoing_particles.emplace(partition.first, std::vector<particle<vector3, integer>>());
auto& vector_field = vector_fields.at(relative_direction::center);
auto lower_bounds = vector_field.offset;
auto upper_bounds = vector_field.offset + vector_field.size;
for (auto& partition : partitioner_->partitions())
{
round_info::particle_map::accessor input_accessor;
round_info.out_of_bounds_particles.find(input_accessor, partition.first);
tbb::parallel_for(std::size_t(0), input_accessor->second.size(), std::size_t(1), [&] (const std::size_t index)
{
auto& particle = input_accessor->second[index];
if (particle.relative_direction != relative_direction::center)
{
round_info::particle_map::accessor output_accessor;
outgoing_particles.find(output_accessor, particle.relative_direction); // TODO: Inverse of particle.relative_direction.
output_accessor->second.push_back(particle);
}
});
}
}
tbb::mutex mutex;
tbb::parallel_for(std::size_t(0), collected_particles.size(), std::size_t(1), [&] (const std::size_t particle_index)
// Transmit.
{
auto& particle = collected_particles[particle_index];
std::vector<boost::mpi::request> requests;
particle.relative_direction = relative_direction::center;
auto communicator = partitioner_->cartesian_communicator();
auto& partitions = partitioner_->partitions ();
if (particle.remaining_iterations == 0)
for (auto& neighbor : outgoing_particles)
requests.push_back(communicator->isend(partitions.at(neighbor.first).rank, 0, neighbor.second));
for (auto& neighbor : outgoing_particles)
{
tbb::mutex::scoped_lock lock(mutex);
inactive_particles.push_back(particle);
std::vector<particle<vector3, integer>> temporary_particles;
communicator->recv(partitions.at(neighbor.first).rank, 0, temporary_particles);
incoming_particles.insert(incoming_particles.end(), temporary_particles.begin(), temporary_particles.end());
}
else if (!vector_field.contains(particle.position))
for (auto& request : requests)
request.wait();
}
// Process.
{
auto& vector_field = vector_fields.at(relative_direction::center);
auto lower_bounds = vector_field.offset;
auto upper_bounds = vector_field.offset + vector_field.size;
tbb::mutex mutex;
tbb::parallel_for(std::size_t(0), incoming_particles.size(), std::size_t(1), [&] (const std::size_t particle_index)
{
auto& particle = incoming_particles[particle_index];
particle.relative_direction = relative_direction::center;
std::optional<relative_direction> direction;
if (particle.position[0] < lower_bounds[0]) direction = relative_direction::negative_x;
else if (particle.position[0] > upper_bounds[0]) direction = relative_direction::positive_x;
......@@ -266,13 +294,8 @@ void particle_advector::load_balance_collect (const
round_info::particle_map::accessor accessor;
if (direction && round_info.out_of_bounds_particles.find(accessor, direction.value()))
accessor->second.push_back(particle);
else
{
tbb::mutex::scoped_lock lock(mutex);
inactive_particles.push_back(particle);
}
}
});
});
}
#endif
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment