/******************************************************************************
* *
* Copyright 2019 Max Planck Institute for Dynamics and Self-Organization *
* *
* This file is part of TurTLE. *
* *
* TurTLE is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published *
* by the Free Software Foundation, either version 3 of the License, *
* or (at your option) any later version. *
* *
* TurTLE is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with TurTLE. If not, see *
* *
* Contact: Cristian.Lalescu@ds.mpg.de *
* *
******************************************************************************/
#ifndef P2P_DISTR_MPI_HPP
#define P2P_DISTR_MPI_HPP
#include
#include
#include
#include
#include
#include
#include
#include "scope_timer.hpp"
#include "particles_utils.hpp"
#include "p2p_tree.hpp"
#include "lock_free_bool_array.hpp"
template
class p2p_distr_mpi {
protected:
static const int MaxNbRhs = 10;
enum MpiTag{
TAG_NB_PARTICLES,
TAG_POSITION_PARTICLES,
TAG_INDEX_PARTICLES,
TAG_RESULT_PARTICLES,
};
struct NeighborDescriptor{
partsize_t nbParticlesToExchange;
int destProc;
int nbLevelsToExchange;
bool isRecv;
int nbReceived;
std::unique_ptr toRecvAndMerge;
std::unique_ptr toCompute;
std::unique_ptr results;
std::unique_ptr indexes;
};
enum Action{
NOTHING_TODO = 512,
RECV_PARTICLES,
COMPUTE_PARTICLES,
RELEASE_BUFFER_PARTICLES,
MERGE_PARTICLES
};
MPI_Comm current_com;
int my_rank;
int nb_processes;
int nb_processes_involved;
const std::pair current_partition_interval;
const int current_partition_size;
const std::array field_grid_dim;
std::unique_ptr partition_interval_size_per_proc;
std::unique_ptr partition_interval_offset_per_proc;
std::unique_ptr current_offset_particles_for_partition;
std::vector> whatNext;
std::vector mpiRequests;
std::vector neigDescriptors;
std::array spatial_box_width;
std::array spatial_box_offset;
const real_number cutoff_radius_compute;
const int nb_cells_factor;
const real_number cutoff_radius;
std::array nb_cell_levels;
template
static void permute_copy(const partsize_t offsetIdx, const partsize_t nbElements,
const std::pair permutation[],
DataType data[], std::vector* buffer){
buffer->resize(nbElements*sizeof(DataType)*sizeElement);
DataType* dataBuffer = reinterpret_cast(buffer->data());
// Permute
for(partsize_t idxPart = 0 ; idxPart < nbElements ; ++idxPart){
const partsize_t srcData = permutation[idxPart].second;
const partsize_t destData = idxPart;
for(int idxVal = 0 ; idxVal < sizeElement ; ++idxVal){
dataBuffer[destData*sizeElement + idxVal]
= data[srcData*sizeElement + idxVal];
}
}
// Copy back
for(partsize_t idxPart = 0 ; idxPart < nbElements ; ++idxPart){
const partsize_t srcData = idxPart;
const partsize_t destData = idxPart+offsetIdx;
for(int idxVal = 0 ; idxVal < sizeElement ; ++idxVal){
data[destData*sizeElement + idxVal]
= dataBuffer[srcData*sizeElement + idxVal];
}
}
}
static int foundGridFactor(const real_number in_cutoff_radius, const std::array& in_spatial_box_width){
int idx_factor = 1;
while(in_cutoff_radius <= in_spatial_box_width[IDXC_Z]/real_number(idx_factor+1)){
idx_factor += 1;
}
return idx_factor;
}
public:
////////////////////////////////////////////////////////////////////////////
p2p_distr_mpi(MPI_Comm in_current_com,
const std::pair& in_current_partitions,
const std::array& in_field_grid_dim,
const std::array& in_spatial_box_width,
const std::array& in_spatial_box_offset,
const real_number in_cutoff_radius)
: current_com(in_current_com),
my_rank(-1), nb_processes(-1),nb_processes_involved(-1),
current_partition_interval(in_current_partitions),
current_partition_size(current_partition_interval.second-current_partition_interval.first),
field_grid_dim(in_field_grid_dim),
spatial_box_width(in_spatial_box_width), spatial_box_offset(in_spatial_box_offset),
cutoff_radius_compute(in_cutoff_radius),
nb_cells_factor(foundGridFactor(in_cutoff_radius, in_spatial_box_width)),
cutoff_radius(in_spatial_box_width[IDXC_Z]/real_number(nb_cells_factor)){
AssertMpi(MPI_Comm_rank(current_com, &my_rank));
AssertMpi(MPI_Comm_size(current_com, &nb_processes));
partition_interval_size_per_proc.reset(new int[nb_processes]);
AssertMpi( MPI_Allgather( const_cast(¤t_partition_size), 1, MPI_INT,
partition_interval_size_per_proc.get(), 1, MPI_INT,
current_com) );
assert(partition_interval_size_per_proc[my_rank] == current_partition_size);
partition_interval_offset_per_proc.reset(new int[nb_processes+1]);
partition_interval_offset_per_proc[0] = 0;
for(int idxProc = 0 ; idxProc < nb_processes ; ++idxProc){
partition_interval_offset_per_proc[idxProc+1] = partition_interval_offset_per_proc[idxProc] + partition_interval_size_per_proc[idxProc];
}
current_offset_particles_for_partition.reset(new partsize_t[current_partition_size+1]);
nb_processes_involved = nb_processes;
while(nb_processes_involved != 0 && partition_interval_size_per_proc[nb_processes_involved-1] == 0){
nb_processes_involved -= 1;
}
assert(nb_processes_involved != 0);
for(int idx_proc_involved = 0 ; idx_proc_involved < nb_processes_involved ; ++idx_proc_involved){
assert(partition_interval_size_per_proc[idx_proc_involved] != 0);
}
assert(int(field_grid_dim[IDXC_Z]) == partition_interval_offset_per_proc[nb_processes_involved]);
nb_cell_levels[IDXC_X] = nb_cells_factor;
nb_cell_levels[IDXC_Y] = nb_cells_factor;
nb_cell_levels[IDXC_Z] = nb_cells_factor;
}
virtual ~p2p_distr_mpi(){}
////////////////////////////////////////////////////////////////////////////
int getGridFactor() const{
return nb_cells_factor;
}
real_number getGridCutoff() const{
return cutoff_radius;
}
long int get_cell_coord_x_from_index(const long int index) const{
return index % nb_cell_levels[IDXC_X];
}
long int get_cell_coord_y_from_index(const long int index) const{
return (index % (nb_cell_levels[IDXC_X]*nb_cell_levels[IDXC_Y]))
/ nb_cell_levels[IDXC_X];
}
long int get_cell_coord_z_from_index(const long int index) const{
return index / (nb_cell_levels[IDXC_X]*nb_cell_levels[IDXC_Y]);
}
long int first_cell_level_proc(const int dest_proc) const{
const real_number field_section_width_z = spatial_box_width[IDXC_Z]/real_number(field_grid_dim[IDXC_Z]);
return static_cast((field_section_width_z*real_number(partition_interval_offset_per_proc[dest_proc]))/cutoff_radius);
}
long int last_cell_level_proc(const int dest_proc) const{
const real_number field_section_width_z = spatial_box_width[IDXC_Z]/real_number(field_grid_dim[IDXC_Z]);
const long int limite = static_cast((field_section_width_z*real_number(partition_interval_offset_per_proc[dest_proc+1])
- std::numeric_limits::epsilon())/cutoff_radius);
if(static_cast(limite)*cutoff_radius
== field_section_width_z*real_number(partition_interval_offset_per_proc[dest_proc+1])){
return limite-1;
}
return limite;
}
real_number apply_pbc(real_number pos, IDX_COMPONENT_3D dim) const{
while( pos < spatial_box_offset[dim] ){
pos += spatial_box_width[dim];
}
while( spatial_box_width[dim]+spatial_box_offset[dim] <= pos){
pos -= spatial_box_width[dim];
}
return pos;
}
std::array get_cell_coordinate(const real_number pos_x, const real_number pos_y,
const real_number pos_z) const {
const real_number diff_x = apply_pbc(pos_x,IDXC_X) - spatial_box_offset[IDXC_X];
const real_number diff_y = apply_pbc(pos_y,IDXC_Y) - spatial_box_offset[IDXC_Y];
const real_number diff_z = apply_pbc(pos_z,IDXC_Z) - spatial_box_offset[IDXC_Z];
std::array coord;
coord[IDXC_X] = static_cast(diff_x/cutoff_radius);
coord[IDXC_Y] = static_cast(diff_y/cutoff_radius);
coord[IDXC_Z] = static_cast(diff_z/cutoff_radius);
return coord;
}
long int get_cell_idx(const real_number pos_x, const real_number pos_y,
const real_number pos_z) const {
std::array coord = get_cell_coordinate(pos_x, pos_y, pos_z);
return ((coord[IDXC_Z]*nb_cell_levels[IDXC_Y])+coord[IDXC_Y])*nb_cell_levels[IDXC_X]+coord[IDXC_X];
}
real_number compute_distance_r2(const real_number x1, const real_number y1, const real_number z1,
const real_number x2, const real_number y2, const real_number z2,
const real_number xshift_coef, const real_number yshift_coef, const real_number zshift_coef) const {
real_number diff_x = std::abs(apply_pbc(x1,IDXC_X)-apply_pbc(x2,IDXC_X)+xshift_coef*spatial_box_width[IDXC_X]);
assert(diff_x <= 2*cutoff_radius);
real_number diff_y = std::abs(apply_pbc(y1,IDXC_X)-apply_pbc(y2,IDXC_X)+yshift_coef*spatial_box_width[IDXC_Y]);
assert(diff_y <= 2*cutoff_radius);
real_number diff_z = std::abs(apply_pbc(z1,IDXC_X)-apply_pbc(z2,IDXC_X)+zshift_coef*spatial_box_width[IDXC_Z]);
assert(diff_z <= 2*cutoff_radius);
return (diff_x*diff_x) + (diff_y*diff_y) + (diff_z*diff_z);
}
template
void compute_distr(computer_class& in_computer,
const partsize_t current_my_nb_particles_per_partition[],
real_number particles_positions[],
std::unique_ptr particles_current_rhs[], const int nb_rhs,
partsize_t inout_index_particles[]){
TIMEZONE("p2p_distr_mpi::compute_distr");
// Some processes might not be involved
if(nb_processes_involved <= my_rank){
return;
}
const long int my_top_z_cell_level = last_cell_level_proc(my_rank);
const long int my_down_z_cell_level = first_cell_level_proc(my_rank);
const long int my_nb_cell_levels = 1+my_top_z_cell_level-my_down_z_cell_level;
current_offset_particles_for_partition[0] = 0;
partsize_t myTotalNbParticles = 0;
for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){
myTotalNbParticles += current_my_nb_particles_per_partition[idxPartition];
current_offset_particles_for_partition[idxPartition+1] = current_offset_particles_for_partition[idxPartition] + current_my_nb_particles_per_partition[idxPartition];
}
// Compute box idx for each particle
std::unique_ptr particles_coord(new long int[current_offset_particles_for_partition[current_partition_size]]);
{
for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){
#pragma omp parallel for schedule(static)
for(partsize_t idxPart = current_offset_particles_for_partition[idxPartition] ; idxPart < current_offset_particles_for_partition[idxPartition+1] ; ++idxPart ){
particles_coord[idxPart] = get_cell_idx(particles_positions[(idxPart)*size_particle_positions + IDXC_X],
particles_positions[(idxPart)*size_particle_positions + IDXC_Y],
particles_positions[(idxPart)*size_particle_positions + IDXC_Z]);
assert(my_down_z_cell_level <= get_cell_coord_z_from_index(particles_coord[idxPart]));
assert(get_cell_coord_z_from_index(particles_coord[idxPart]) <= my_top_z_cell_level);
}
}
std::vector> part_to_sort;
// Sort each partition in cells
for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){
part_to_sort.clear();
for(partsize_t idxPart = current_offset_particles_for_partition[idxPartition] ; idxPart < current_offset_particles_for_partition[idxPartition+1] ; ++idxPart ){
part_to_sort.emplace_back();
part_to_sort.back().first = particles_coord[idxPart];
part_to_sort.back().second = idxPart;
}
assert(partsize_t(part_to_sort.size()) == (current_my_nb_particles_per_partition[idxPartition]));
std::sort(part_to_sort.begin(), part_to_sort.end(),
[](const std::pair& p1,
const std::pair& p2){
return p1.first < p2.first;
});
// Permute array using buffer
// permute 4th function parameter using buffer, based on information in first 3 parameters
std::vector buffer;
permute_copy(
current_offset_particles_for_partition[idxPartition],
current_my_nb_particles_per_partition[idxPartition],
part_to_sort.data(),
particles_positions,
&buffer);
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
permute_copy(
current_offset_particles_for_partition[idxPartition],
current_my_nb_particles_per_partition[idxPartition],
part_to_sort.data(),
particles_current_rhs[idx_rhs].get(),
&buffer);
}
permute_copy(
current_offset_particles_for_partition[idxPartition],
current_my_nb_particles_per_partition[idxPartition],
part_to_sort.data(),
inout_index_particles,
&buffer);
permute_copy(
current_offset_particles_for_partition[idxPartition],
current_my_nb_particles_per_partition[idxPartition],
part_to_sort.data(),
particles_coord.get(),
&buffer);
}
}
// Build the tree
p2p_tree>> my_tree(nb_cell_levels);
for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){
long int current_cell_idx = -1;
partsize_t current_nb_particles_in_cell = 0;
partsize_t current_cell_offset = 0;
for(partsize_t idx_part = current_offset_particles_for_partition[idxPartition] ;
idx_part != current_offset_particles_for_partition[idxPartition+1]; ++idx_part){
if(particles_coord[idx_part] != current_cell_idx){
if(current_nb_particles_in_cell){
my_tree.getCell(current_cell_idx).emplace_back(current_cell_offset,current_nb_particles_in_cell);
}
current_cell_idx = particles_coord[idx_part];
current_nb_particles_in_cell = 1;
current_cell_offset = idx_part;
}
else{
current_nb_particles_in_cell += 1;
}
}
if(current_nb_particles_in_cell){
my_tree.getCell(current_cell_idx).emplace_back(current_cell_offset,current_nb_particles_in_cell);
}
}
// Offset per cell layers
long int previous_index = 0;
variable_used_only_in_assert(previous_index);
std::unique_ptr particles_offset_layers(new partsize_t[my_nb_cell_levels+1]());
for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){
for(partsize_t idx_part = current_offset_particles_for_partition[idxPartition] ;
idx_part != current_offset_particles_for_partition[idxPartition+1]; ++idx_part){
const long int part_box_z_index = get_cell_coord_z_from_index(particles_coord[idx_part]);
assert(my_down_z_cell_level <= part_box_z_index);
assert(part_box_z_index <= my_top_z_cell_level);
particles_offset_layers[part_box_z_index+1-my_down_z_cell_level] += 1;
assert(previous_index <= part_box_z_index);
previous_index = part_box_z_index;
}
}
for(long int idx_layer = 0 ; idx_layer < my_nb_cell_levels ; ++idx_layer){
particles_offset_layers[idx_layer+1] += particles_offset_layers[idx_layer];
}
// Reset vectors
assert(whatNext.size() == 0);
assert(mpiRequests.size() == 0);
neigDescriptors.clear();
// Find process with at least one neighbor
{
int dest_proc = (my_rank+1)%nb_processes_involved;
while(dest_proc != my_rank
&& (my_top_z_cell_level == first_cell_level_proc(dest_proc)
|| (my_top_z_cell_level+1)%nb_cell_levels[IDXC_Z] == first_cell_level_proc(dest_proc))){
// Find if we have to send 1 or 2 cell levels
int nb_levels_to_send = 1;
if(my_nb_cell_levels > 1 // I have more than one level
&& (my_top_z_cell_level-1+2)%nb_cell_levels[IDXC_Z] <= last_cell_level_proc(dest_proc)){
nb_levels_to_send += 1;
}
NeighborDescriptor descriptor;
descriptor.destProc = dest_proc;
descriptor.nbLevelsToExchange = nb_levels_to_send;
descriptor.nbParticlesToExchange = particles_offset_layers[my_nb_cell_levels] - particles_offset_layers[my_nb_cell_levels-nb_levels_to_send];
descriptor.isRecv = false;
descriptor.nbReceived = 0;
neigDescriptors.emplace_back(std::move(descriptor));
dest_proc = (dest_proc+1)%nb_processes_involved;
}
int src_proc = (my_rank-1+nb_processes_involved)%nb_processes_involved;
while(src_proc != my_rank
&& (last_cell_level_proc(src_proc) == my_down_z_cell_level
|| (last_cell_level_proc(src_proc)+1)%nb_cell_levels[IDXC_Z] == my_down_z_cell_level)){
// Find if we have to send 1 or 2 cell levels
int nb_levels_to_recv = 1;
if(my_nb_cell_levels > 1 // I have more than one level
&& first_cell_level_proc(src_proc) <= (my_down_z_cell_level-1+2)%nb_cell_levels[IDXC_Z]){
nb_levels_to_recv += 1;
}
NeighborDescriptor descriptor;
descriptor.destProc = src_proc;
descriptor.nbLevelsToExchange = nb_levels_to_recv;
descriptor.nbParticlesToExchange = -1;
descriptor.isRecv = true;
descriptor.nbReceived = 0;
neigDescriptors.emplace_back(std::move(descriptor));
src_proc = (src_proc-1+nb_processes_involved)%nb_processes_involved;
}
}
//////////////////////////////////////////////////////////////////////
/// Exchange the number of particles in each partition
/// Could involve only here but I do not think it will be a problem
//////////////////////////////////////////////////////////////////////
assert(whatNext.size() == 0);
assert(mpiRequests.size() == 0);
#ifndef NDEBUG // Just for assertion
std::vector willsend(nb_processes_involved, 0);
std::vector willrecv(nb_processes_involved, 0);
#endif
for(int idxDescr = 0 ; idxDescr < int(neigDescriptors.size()) ; ++idxDescr){
NeighborDescriptor& descriptor = neigDescriptors[idxDescr];
if(descriptor.isRecv == false){
whatNext.emplace_back(std::pair{NOTHING_TODO, -1});
mpiRequests.emplace_back();
AssertMpi(MPI_Isend(const_cast(&descriptor.nbParticlesToExchange),
1, particles_utils::GetMpiType(partsize_t()),
descriptor.destProc, TAG_NB_PARTICLES,
current_com, &mpiRequests.back()));
#ifndef NDEBUG // Just for assertion
willsend[descriptor.destProc] += 1;
#endif
if(descriptor.nbParticlesToExchange){
whatNext.emplace_back(std::pair{NOTHING_TODO, -1});
mpiRequests.emplace_back();
assert(descriptor.nbParticlesToExchange*size_particle_positions < std::numeric_limits::max());
AssertMpi(MPI_Isend(const_cast(&particles_positions[particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]*size_particle_positions]),
int(descriptor.nbParticlesToExchange*size_particle_positions), particles_utils::GetMpiType(real_number()),
descriptor.destProc, TAG_POSITION_PARTICLES,
current_com, &mpiRequests.back()));
whatNext.emplace_back(std::pair{NOTHING_TODO, -1});
mpiRequests.emplace_back();
assert(descriptor.nbParticlesToExchange*size_particle_positions < std::numeric_limits::max());
AssertMpi(MPI_Isend(const_cast(&inout_index_particles[particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]]),
int(descriptor.nbParticlesToExchange), particles_utils::GetMpiType(partsize_t()),
descriptor.destProc, TAG_INDEX_PARTICLES,
current_com, &mpiRequests.back()));
assert(descriptor.toRecvAndMerge == nullptr);
descriptor.toRecvAndMerge.reset(new real_number[descriptor.nbParticlesToExchange*size_particle_rhs]);
whatNext.emplace_back(std::pair{MERGE_PARTICLES, idxDescr});
mpiRequests.emplace_back();
assert(descriptor.nbParticlesToExchange*size_particle_rhs < std::numeric_limits::max());
AssertMpi(MPI_Irecv(descriptor.toRecvAndMerge.get(), int(descriptor.nbParticlesToExchange*size_particle_rhs),
particles_utils::GetMpiType(real_number()), descriptor.destProc, TAG_RESULT_PARTICLES,
current_com, &mpiRequests.back()));
}
}
else{
#ifndef NDEBUG // Just for assertion
willrecv[descriptor.destProc] += 1;
#endif
whatNext.emplace_back(std::pair{RECV_PARTICLES, idxDescr});
mpiRequests.emplace_back();
AssertMpi(MPI_Irecv(&descriptor.nbParticlesToExchange,
1, particles_utils::GetMpiType(partsize_t()), descriptor.destProc, TAG_NB_PARTICLES,
current_com, &mpiRequests.back()));
}
}
#ifndef NDEBUG // Just for assertion
{
if(myrank == 0){
std::vector willsendall(nb_processes_involved*nb_processes_involved, 0);// TODO debug
std::vector willrecvall(nb_processes_involved*nb_processes_involved, 0);// TODO debug
MPI_Gather(willrecv.data(), nb_processes_involved, MPI_INT, willrecvall.data(),
nb_processes_involved, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Gather(willsend.data(), nb_processes_involved, MPI_INT, willsendall.data(),
nb_processes_involved, MPI_INT, 0, MPI_COMM_WORLD);
for(int idxproc = 0 ; idxproc < nb_processes_involved ; ++idxproc){
for(int idxtest = 0 ; idxtest < nb_processes_involved ; ++idxtest){
assert(willsendall[idxproc*nb_processes_involved + idxtest]
== willrecvall[idxtest*nb_processes_involved + idxproc]);
}
}
}
else{
MPI_Gather(willrecv.data(), nb_processes_involved, MPI_INT, nullptr,
0, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Gather(willsend.data(), nb_processes_involved, MPI_INT, nullptr,
0, MPI_INT, 0, MPI_COMM_WORLD);
}
}
#endif
lock_free_bool_array cells_locker(512);
std::vector> computer_for_all_threads(omp_get_max_threads()-1);
for(int idxThread = 1 ; idxThread < omp_get_max_threads() ; ++idxThread){
computer_for_all_threads[idxThread-1].reset(new computer_class(in_computer));
}
TIMEZONE_OMP_INIT_PREPARALLEL(omp_get_max_threads())
#pragma omp parallel default(shared)
{
computer_class& computer_thread = (omp_get_thread_num() == 0 ? in_computer : *computer_for_all_threads[omp_get_thread_num()-1]);
#pragma omp master
{
while(mpiRequests.size()){
TIMEZONE("wait-loop");
assert(mpiRequests.size() == whatNext.size());
int idxDone = int(mpiRequests.size());
{
TIMEZONE("wait");
AssertMpi(MPI_Waitany(int(mpiRequests.size()), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE));
}
const std::pair releasedAction = whatNext[idxDone];
std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]);
std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]);
mpiRequests.pop_back();
whatNext.pop_back();
//////////////////////////////////////////////////////////////////////
/// Data to exchange particles
//////////////////////////////////////////////////////////////////////
if(releasedAction.first == RECV_PARTICLES){
TIMEZONE("post-recv-particles");
NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second];
assert(descriptor.isRecv);
const int destProc = descriptor.destProc;
const partsize_t NbParticlesToReceive = descriptor.nbParticlesToExchange;
assert(NbParticlesToReceive != -1);
assert(descriptor.toCompute == nullptr);
assert(descriptor.indexes == nullptr);
if(NbParticlesToReceive){
descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]);
whatNext.emplace_back(std::pair{COMPUTE_PARTICLES, releasedAction.second});
mpiRequests.emplace_back();
assert(NbParticlesToReceive*size_particle_positions < std::numeric_limits::max());
AssertMpi(MPI_Irecv(descriptor.toCompute.get(), int(NbParticlesToReceive*size_particle_positions),
particles_utils::GetMpiType(real_number()), destProc, TAG_POSITION_PARTICLES,
current_com, &mpiRequests.back()));
descriptor.indexes.reset(new partsize_t[NbParticlesToReceive]);
whatNext.emplace_back(std::pair{COMPUTE_PARTICLES, releasedAction.second});
mpiRequests.emplace_back();
assert(NbParticlesToReceive*size_particle_positions < std::numeric_limits::max());
AssertMpi(MPI_Irecv(descriptor.indexes.get(), int(NbParticlesToReceive),
particles_utils::GetMpiType(partsize_t()), destProc, TAG_INDEX_PARTICLES,
current_com, &mpiRequests.back()));
}
}
//////////////////////////////////////////////////////////////////////
/// Computation
//////////////////////////////////////////////////////////////////////
if(releasedAction.first == COMPUTE_PARTICLES){
NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second];
descriptor.nbReceived += 1;
assert(descriptor.nbReceived <= 2);
if(descriptor.nbReceived == 2){
TIMEZONE("compute-particles");
NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second];
assert(descriptor.isRecv);
const partsize_t NbParticlesToReceive = descriptor.nbParticlesToExchange;
assert(descriptor.toCompute != nullptr);
assert(descriptor.indexes != nullptr);
descriptor.results.reset(new real_number[NbParticlesToReceive*size_particle_rhs]);
computer_thread.template init_result_array(descriptor.results.get(), NbParticlesToReceive);
// Compute
partsize_t idxPart = 0;
while(idxPart != NbParticlesToReceive){
const long int current_cell_idx = get_cell_idx(descriptor.toCompute[idxPart*size_particle_positions + IDXC_X],
descriptor.toCompute[idxPart*size_particle_positions + IDXC_Y],
descriptor.toCompute[idxPart*size_particle_positions + IDXC_Z]);
partsize_t nb_parts_in_cell = 1;
while(idxPart+nb_parts_in_cell != NbParticlesToReceive
&& current_cell_idx == get_cell_idx(descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_X],
descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_Y],
descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_Z])){
nb_parts_in_cell += 1;
}
#pragma omp task default(shared) firstprivate(idxPart, nb_parts_in_cell, current_cell_idx)
{
const std::vector>* neighbors[27];
long int neighbors_indexes[27];
std::array shift[27];
const int nbNeighbors = my_tree.getNeighbors(
current_cell_idx,
neighbors,
neighbors_indexes,
shift,
true);
// with other interval
for(int idx_neighbor = 0 ; idx_neighbor < nbNeighbors ; ++idx_neighbor){
cells_locker.lock(neighbors_indexes[idx_neighbor]);
for(size_t idx_2 = 0 ; idx_2 < (*neighbors[idx_neighbor]).size() ; ++idx_2){
for(partsize_t idx_p1 = 0 ; idx_p1 < nb_parts_in_cell ; ++idx_p1){
for(partsize_t idx_p2 = 0 ; idx_p2 < (*neighbors[idx_neighbor])[idx_2].second ; ++idx_p2){
const real_number dist_r2 = compute_distance_r2(
descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_X],
descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_Y],
descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_Z],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_X],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z],
shift[idx_neighbor][IDXC_X], shift[idx_neighbor][IDXC_Y], shift[idx_neighbor][IDXC_Z]);
if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){
computer_thread.template compute_interaction(
descriptor.indexes[(idxPart+idx_p1)],
&descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions],
&descriptor.results[(idxPart+idx_p1)*size_particle_rhs],
inout_index_particles[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)],
&particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions],
&particles_current_rhs[0][((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_rhs],
dist_r2,
cutoff_radius_compute,
shift[idx_neighbor][IDXC_X],
shift[idx_neighbor][IDXC_Y],
shift[idx_neighbor][IDXC_Z]);
}
}
}
}
cells_locker.unlock(neighbors_indexes[idx_neighbor]);
}
}
idxPart += nb_parts_in_cell;
}
#pragma omp taskwait
// Send back
const int destProc = descriptor.destProc;
whatNext.emplace_back(std::pair{RELEASE_BUFFER_PARTICLES, releasedAction.second});
mpiRequests.emplace_back();
assert(NbParticlesToReceive*size_particle_rhs < std::numeric_limits::max());
AssertMpi(MPI_Isend(descriptor.results.get(), int(NbParticlesToReceive*size_particle_rhs),
particles_utils::GetMpiType(real_number()), destProc, TAG_RESULT_PARTICLES,
current_com, &mpiRequests.back()));
delete[] descriptor.toCompute.release();
delete[] descriptor.indexes.release();
}
}
//////////////////////////////////////////////////////////////////////
/// Release memory that was sent back
//////////////////////////////////////////////////////////////////////
if(releasedAction.first == RELEASE_BUFFER_PARTICLES){
NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second];
assert(descriptor.results != nullptr);
assert(descriptor.isRecv);
delete[] descriptor.results.release();
}
//////////////////////////////////////////////////////////////////////
/// Merge
//////////////////////////////////////////////////////////////////////
if(releasedAction.first == MERGE_PARTICLES){
TIMEZONE("merge");
NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second];
assert(descriptor.isRecv == false);
assert(descriptor.toRecvAndMerge != nullptr);
computer_thread.template reduce_particles_rhs(&particles_current_rhs[0][particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]*size_particle_rhs],
descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToExchange);
delete[] descriptor.toRecvAndMerge.release();
}
}
}
}
assert(whatNext.size() == 0);
assert(mpiRequests.size() == 0);
{
computer_class& computer_thread = (omp_get_thread_num() == 0 ? in_computer : *computer_for_all_threads[omp_get_thread_num()-1]);
// Compute self data
for(const auto& iter_cell : my_tree){
TIMEZONE("proceed-leaf");
const long int currenct_cell_idx = iter_cell.first;
const std::vector>* intervals_ptr = &iter_cell.second;
#pragma omp task default(shared) firstprivate(currenct_cell_idx, intervals_ptr)
{
const std::vector>& intervals = (*intervals_ptr);
cells_locker.lock(currenct_cell_idx);
for(size_t idx_1 = 0 ; idx_1 < intervals.size() ; ++idx_1){
// self interval
for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){
for(partsize_t idx_p2 = idx_p1+1 ; idx_p2 < intervals[idx_1].second ; ++idx_p2){
const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z],
particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_X],
particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_Y],
particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_Z],
0, 0, 0);
if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){
computer_thread.template compute_interaction(
inout_index_particles[(intervals[idx_1].first+idx_p1)],
&particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions],
&particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs],
inout_index_particles[(intervals[idx_1].first+idx_p2)],
&particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions],
&particles_current_rhs[0][(intervals[idx_1].first+idx_p2)*size_particle_rhs],
dist_r2, cutoff_radius_compute, 0, 0, 0);
}
}
}
// with other interval
for(size_t idx_2 = idx_1+1 ; idx_2 < intervals.size() ; ++idx_2){
for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){
for(partsize_t idx_p2 = 0 ; idx_p2 < intervals[idx_2].second ; ++idx_p2){
const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z],
particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_X],
particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y],
particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z],
0, 0, 0);
if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){
computer_thread.template compute_interaction(
inout_index_particles[(intervals[idx_1].first+idx_p1)],
&particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions],
&particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs],
inout_index_particles[(intervals[idx_2].first+idx_p2)],
&particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions],
&particles_current_rhs[0][(intervals[idx_2].first+idx_p2)*size_particle_rhs],
dist_r2, cutoff_radius_compute, 0, 0, 0);
}
}
}
}
}
const std::vector>* neighbors[27];
long int neighbors_indexes[27];
std::array shift[27];
const int nbNeighbors = my_tree.getNeighbors(currenct_cell_idx, neighbors, neighbors_indexes, shift, false);
for(size_t idx_1 = 0 ; idx_1 < intervals.size() ; ++idx_1){
// with other interval
for(int idx_neighbor = 0 ; idx_neighbor < nbNeighbors ; ++idx_neighbor){
if(currenct_cell_idx < neighbors_indexes[idx_neighbor]){
cells_locker.lock(neighbors_indexes[idx_neighbor]);
for(size_t idx_2 = 0 ; idx_2 < (*neighbors[idx_neighbor]).size() ; ++idx_2){
for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){
for(partsize_t idx_p2 = 0 ; idx_p2 < (*neighbors[idx_neighbor])[idx_2].second ; ++idx_p2){
const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y],
particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_X],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y],
particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z],
shift[idx_neighbor][IDXC_X], shift[idx_neighbor][IDXC_Y], shift[idx_neighbor][IDXC_Z]);
if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){
computer_thread.template compute_interaction(
inout_index_particles[(intervals[idx_1].first+idx_p1)],
&particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions],
&particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs],
inout_index_particles[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)],
&particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions],
&particles_current_rhs[0][((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_rhs],
dist_r2, cutoff_radius_compute, shift[idx_neighbor][IDXC_X], shift[idx_neighbor][IDXC_Y], shift[idx_neighbor][IDXC_Z]);
}
}
}
}
cells_locker.unlock(neighbors_indexes[idx_neighbor]);
}
}
}
cells_locker.unlock(currenct_cell_idx);
}
}
}
for(int idxThread = 1 ; idxThread < omp_get_num_threads() ; ++idxThread){
in_computer.merge(*computer_for_all_threads[idxThread-1]);
}
}
};
#endif