/****************************************************************************** * * * Copyright 2019 Max Planck Institute for Dynamics and Self-Organization * * * * This file is part of TurTLE. * * * * TurTLE is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published * * by the Free Software Foundation, either version 3 of the License, * * or (at your option) any later version. * * * * TurTLE is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with TurTLE. If not, see * * * * Contact: Cristian.Lalescu@ds.mpg.de * * * ******************************************************************************/ #ifndef P2P_DISTR_MPI_HPP #define P2P_DISTR_MPI_HPP #include #include #include #include #include #include #include #include "scope_timer.hpp" #include "particles/particles_utils.hpp" #include "particles/p2p/p2p_tree.hpp" #include "particles/lock_free_bool_array.hpp" template class p2p_distr_mpi { protected: static const int MaxNbRhs = 10; enum MpiTag{ TAG_NB_PARTICLES, TAG_POSITION_PARTICLES, TAG_INDEX_PARTICLES, TAG_RESULT_PARTICLES, }; struct NeighborDescriptor{ partsize_t nbParticlesToExchange; int destProc; int nbLevelsToExchange; bool isRecv; int nbReceived; std::unique_ptr toRecvAndMerge; std::unique_ptr toCompute; std::unique_ptr results; std::unique_ptr indexes; }; enum Action{ NOTHING_TODO = 512, RECV_PARTICLES, COMPUTE_PARTICLES, RELEASE_BUFFER_PARTICLES, MERGE_PARTICLES }; MPI_Comm current_com; int my_rank; int nb_processes; int nb_processes_involved; const std::pair current_partition_interval; const int current_partition_size; const std::array field_grid_dim; std::unique_ptr partition_interval_size_per_proc; std::unique_ptr partition_interval_offset_per_proc; std::unique_ptr current_offset_particles_for_partition; std::vector> whatNext; std::vector mpiRequests; std::vector neigDescriptors; std::array spatial_box_width; std::array spatial_box_offset; const long double cutoff_radius_compute; const int nb_cells_factor; const long double cutoff_radius; std::array nb_cell_levels; template static void permute_copy(const partsize_t offsetIdx, const partsize_t nbElements, const std::pair permutation[], DataType data[], std::vector* buffer){ buffer->resize(nbElements*sizeof(DataType)*sizeElement); DataType* dataBuffer = reinterpret_cast(buffer->data()); // Permute for(partsize_t idxPart = 0 ; idxPart < nbElements ; ++idxPart){ const partsize_t srcData = permutation[idxPart].second; const partsize_t destData = idxPart; for(int idxVal = 0 ; idxVal < sizeElement ; ++idxVal){ dataBuffer[destData*sizeElement + idxVal] = data[srcData*sizeElement + idxVal]; } } // Copy back for(partsize_t idxPart = 0 ; idxPart < nbElements ; ++idxPart){ const partsize_t srcData = idxPart; const partsize_t destData = idxPart+offsetIdx; for(int idxVal = 0 ; idxVal < sizeElement ; ++idxVal){ data[destData*sizeElement + idxVal] = dataBuffer[srcData*sizeElement + idxVal]; } // Clean up memory buffer->resize(0); } buffer->resize(0); } static int foundGridFactor(const real_number in_cutoff_radius, const std::array& in_spatial_box_width){ int idx_factor = 1; DEBUG_MSG("in_cutoff_radius is %g, in_spatial_box_width[IDXC_Z] = %g\n", in_cutoff_radius, in_spatial_box_width[IDXC_Z]); while(in_cutoff_radius <= in_spatial_box_width[IDXC_Z]/(long double)(idx_factor+1)){ idx_factor += 1; } return idx_factor; } public: //////////////////////////////////////////////////////////////////////////// p2p_distr_mpi(MPI_Comm in_current_com, const std::pair& in_current_partitions, const std::array& in_field_grid_dim, const std::array& in_spatial_box_width, const std::array& in_spatial_box_offset, const real_number in_cutoff_radius) : current_com(in_current_com), my_rank(-1), nb_processes(-1),nb_processes_involved(-1), current_partition_interval(in_current_partitions), current_partition_size(current_partition_interval.second-current_partition_interval.first), field_grid_dim(in_field_grid_dim), spatial_box_width(in_spatial_box_width), spatial_box_offset(in_spatial_box_offset), cutoff_radius_compute(in_cutoff_radius), nb_cells_factor(foundGridFactor(in_cutoff_radius, in_spatial_box_width)), cutoff_radius(in_spatial_box_width[IDXC_Z]/real_number(nb_cells_factor)){ AssertMpi(MPI_Comm_rank(current_com, &my_rank)); AssertMpi(MPI_Comm_size(current_com, &nb_processes)); partition_interval_size_per_proc.reset(new int[nb_processes]); AssertMpi( MPI_Allgather( const_cast(¤t_partition_size), 1, MPI_INT, partition_interval_size_per_proc.get(), 1, MPI_INT, current_com) ); assert(partition_interval_size_per_proc[my_rank] == current_partition_size); partition_interval_offset_per_proc.reset(new int[nb_processes+1]); partition_interval_offset_per_proc[0] = 0; for(int idxProc = 0 ; idxProc < nb_processes ; ++idxProc){ partition_interval_offset_per_proc[idxProc+1] = partition_interval_offset_per_proc[idxProc] + partition_interval_size_per_proc[idxProc]; } current_offset_particles_for_partition.reset(new partsize_t[current_partition_size+1]); nb_processes_involved = nb_processes; while(nb_processes_involved != 0 && partition_interval_size_per_proc[nb_processes_involved-1] == 0){ nb_processes_involved -= 1; } assert(nb_processes_involved != 0); for(int idx_proc_involved = 0 ; idx_proc_involved < nb_processes_involved ; ++idx_proc_involved){ assert(partition_interval_size_per_proc[idx_proc_involved] != 0); } assert(int(field_grid_dim[IDXC_Z]) == partition_interval_offset_per_proc[nb_processes_involved]); nb_cell_levels[IDXC_X] = nb_cells_factor; nb_cell_levels[IDXC_Y] = nb_cells_factor; nb_cell_levels[IDXC_Z] = nb_cells_factor; } virtual ~p2p_distr_mpi(){} //////////////////////////////////////////////////////////////////////////// int getGridFactor() const{ return nb_cells_factor; } real_number getGridCutoff() const{ return cutoff_radius; } long int get_cell_coord_x_from_index(const long int index) const{ return index % nb_cell_levels[IDXC_X]; } long int get_cell_coord_y_from_index(const long int index) const{ return (index % (nb_cell_levels[IDXC_X]*nb_cell_levels[IDXC_Y])) / nb_cell_levels[IDXC_X]; } long int get_cell_coord_z_from_index(const long int index) const{ return index / (nb_cell_levels[IDXC_X]*nb_cell_levels[IDXC_Y]); } long int first_cell_level_proc(const int dest_proc) const{ const real_number field_section_width_z = spatial_box_width[IDXC_Z]/real_number(field_grid_dim[IDXC_Z]); return static_cast((field_section_width_z*real_number(partition_interval_offset_per_proc[dest_proc]))/cutoff_radius); } long int last_cell_level_proc(const int dest_proc) const{ const long double field_section_width_z = spatial_box_width[IDXC_Z]/(long double)(field_grid_dim[IDXC_Z]); const long int limite = static_cast((field_section_width_z*(long double)(partition_interval_offset_per_proc[dest_proc+1]) - std::numeric_limits::epsilon())/cutoff_radius); if(static_cast(limite)*cutoff_radius == field_section_width_z*(long double)(partition_interval_offset_per_proc[dest_proc+1]) || limite == nb_cell_levels[IDXC_Z]){ return limite-1; } return limite; } real_number apply_pbc(real_number pos, IDX_COMPONENT_3D dim) const{ while( pos < spatial_box_offset[dim] ){ pos += spatial_box_width[dim]; } while( spatial_box_width[dim]+spatial_box_offset[dim] <= pos){ pos -= spatial_box_width[dim]; } return pos; } std::array get_cell_coordinate(const real_number pos_x, const real_number pos_y, const real_number pos_z) const { const real_number diff_x = apply_pbc(pos_x,IDXC_X) - spatial_box_offset[IDXC_X]; const real_number diff_y = apply_pbc(pos_y,IDXC_Y) - spatial_box_offset[IDXC_Y]; const real_number diff_z = apply_pbc(pos_z,IDXC_Z) - spatial_box_offset[IDXC_Z]; std::array coord; coord[IDXC_X] = static_cast(diff_x/cutoff_radius); coord[IDXC_Y] = static_cast(diff_y/cutoff_radius); coord[IDXC_Z] = static_cast(diff_z/cutoff_radius); return coord; } long int get_cell_idx(const real_number pos_x, const real_number pos_y, const real_number pos_z) const { std::array coord = get_cell_coordinate(pos_x, pos_y, pos_z); return ((coord[IDXC_Z]*nb_cell_levels[IDXC_Y])+coord[IDXC_Y])*nb_cell_levels[IDXC_X]+coord[IDXC_X]; } real_number compute_distance_r2(const real_number x1, const real_number y1, const real_number z1, const real_number x2, const real_number y2, const real_number z2, const real_number xshift_coef, const real_number yshift_coef, const real_number zshift_coef, real_number &diff_x, real_number &diff_y, real_number &diff_z) const { diff_x = apply_pbc(x1,IDXC_X)-apply_pbc(x2,IDXC_X)+xshift_coef*spatial_box_width[IDXC_X]; assert(std::abs(diff_x) <= 2*cutoff_radius); diff_y = apply_pbc(y1,IDXC_X)-apply_pbc(y2,IDXC_X)+yshift_coef*spatial_box_width[IDXC_Y]; assert(std::abs(diff_y) <= 2*cutoff_radius); diff_z = apply_pbc(z1,IDXC_X)-apply_pbc(z2,IDXC_X)+zshift_coef*spatial_box_width[IDXC_Z]; assert(std::abs(diff_z) <= 2*cutoff_radius); return (diff_x*diff_x) + (diff_y*diff_y) + (diff_z*diff_z); } template void compute_distr(computer_class& in_computer, const partsize_t current_my_nb_particles_per_partition[], real_number particles_positions[], std::unique_ptr particles_current_rhs[], const int nb_rhs, partsize_t inout_index_particles[]){ TIMEZONE("p2p_distr_mpi::compute_distr"); // Some processes might not be involved if(nb_processes_involved <= my_rank){ DEBUG_MSG("warning: nb_processes_involved <= my_rank, and this process is exiting p2p_distr_mpi::compute_distr now.\nHowever, there is a check below which calls an MPI_Gather over MPI_COMM_WORLD.\n"); return; } const long int my_top_z_cell_level = last_cell_level_proc(my_rank); assert(my_top_z_cell_level < nb_cell_levels[IDXC_Z]); const long int my_down_z_cell_level = first_cell_level_proc(my_rank); assert(0 <= my_down_z_cell_level); const long int my_nb_cell_levels = 1+my_top_z_cell_level-my_down_z_cell_level; current_offset_particles_for_partition[0] = 0; partsize_t myTotalNbParticles = 0; for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){ myTotalNbParticles += current_my_nb_particles_per_partition[idxPartition]; current_offset_particles_for_partition[idxPartition+1] = current_offset_particles_for_partition[idxPartition] + current_my_nb_particles_per_partition[idxPartition]; } // Compute box idx for each particle std::unique_ptr particles_coord(new long int[current_offset_particles_for_partition[current_partition_size]]); { for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){ #pragma omp parallel for schedule(static) for(partsize_t idxPart = current_offset_particles_for_partition[idxPartition] ; idxPart < current_offset_particles_for_partition[idxPartition+1] ; ++idxPart ){ particles_coord[idxPart] = get_cell_idx(particles_positions[(idxPart)*size_particle_positions + IDXC_X], particles_positions[(idxPart)*size_particle_positions + IDXC_Y], particles_positions[(idxPart)*size_particle_positions + IDXC_Z]); assert(my_down_z_cell_level <= get_cell_coord_z_from_index(particles_coord[idxPart])); assert(get_cell_coord_z_from_index(particles_coord[idxPart]) <= my_top_z_cell_level); } } std::vector> part_to_sort; // Sort each partition in cells for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){ part_to_sort.clear(); for(partsize_t idxPart = current_offset_particles_for_partition[idxPartition] ; idxPart < current_offset_particles_for_partition[idxPartition+1] ; ++idxPart ){ part_to_sort.emplace_back(); part_to_sort.back().first = particles_coord[idxPart]; part_to_sort.back().second = idxPart; } assert(partsize_t(part_to_sort.size()) == (current_my_nb_particles_per_partition[idxPartition])); std::sort(part_to_sort.begin(), part_to_sort.end(), [](const std::pair& p1, const std::pair& p2){ return p1.first < p2.first; }); // Permute array using buffer // permute 4th function parameter using buffer, based on information in first 3 parameters std::vector buffer; permute_copy( current_offset_particles_for_partition[idxPartition], current_my_nb_particles_per_partition[idxPartition], part_to_sort.data(), particles_positions, &buffer); for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs) { permute_copy( current_offset_particles_for_partition[idxPartition], current_my_nb_particles_per_partition[idxPartition], part_to_sort.data(), particles_current_rhs[idx_rhs].get(), &buffer); } permute_copy( current_offset_particles_for_partition[idxPartition], current_my_nb_particles_per_partition[idxPartition], part_to_sort.data(), inout_index_particles, &buffer); permute_copy( current_offset_particles_for_partition[idxPartition], current_my_nb_particles_per_partition[idxPartition], part_to_sort.data(), particles_coord.get(), &buffer); } } // Build the tree p2p_tree>> my_tree(nb_cell_levels); for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){ long int current_cell_idx = -1; partsize_t current_nb_particles_in_cell = 0; partsize_t current_cell_offset = 0; for(partsize_t idx_part = current_offset_particles_for_partition[idxPartition] ; idx_part != current_offset_particles_for_partition[idxPartition+1]; ++idx_part){ if(particles_coord[idx_part] != current_cell_idx){ if(current_nb_particles_in_cell){ my_tree.getCell(current_cell_idx).emplace_back(current_cell_offset,current_nb_particles_in_cell); } current_cell_idx = particles_coord[idx_part]; current_nb_particles_in_cell = 1; current_cell_offset = idx_part; } else{ current_nb_particles_in_cell += 1; } } if(current_nb_particles_in_cell){ my_tree.getCell(current_cell_idx).emplace_back(current_cell_offset,current_nb_particles_in_cell); } } // Offset per cell layers long int previous_index = 0; variable_used_only_in_assert(previous_index); std::unique_ptr particles_offset_layers(new partsize_t[my_nb_cell_levels+1]()); for(int idxPartition = 0 ; idxPartition < current_partition_size ; ++idxPartition){ for(partsize_t idx_part = current_offset_particles_for_partition[idxPartition] ; idx_part != current_offset_particles_for_partition[idxPartition+1]; ++idx_part){ const long int part_box_z_index = get_cell_coord_z_from_index(particles_coord[idx_part]); assert(my_down_z_cell_level <= part_box_z_index); assert(part_box_z_index <= my_top_z_cell_level); particles_offset_layers[part_box_z_index+1-my_down_z_cell_level] += 1; assert(previous_index <= part_box_z_index); previous_index = part_box_z_index; } } for(long int idx_layer = 0 ; idx_layer < my_nb_cell_levels ; ++idx_layer){ particles_offset_layers[idx_layer+1] += particles_offset_layers[idx_layer]; } // Reset vectors assert(whatNext.size() == 0); assert(mpiRequests.size() == 0); neigDescriptors.clear(); DEBUG_MSG("my_top_z_cell_level = %d\n", int(my_top_z_cell_level)); DEBUG_MSG("my_down_z_cell_level = %d\n", int(my_down_z_cell_level)); DEBUG_MSG("first_cell_level_proc(0) = %d\n", int(first_cell_level_proc(0))); DEBUG_MSG("first_cell_level_proc(1) = %d\n", int(first_cell_level_proc(1))); DEBUG_MSG("first_cell_level_proc(2) = %d\n", int(first_cell_level_proc(2))); DEBUG_MSG("first_cell_level_proc(3) = %d\n", int(first_cell_level_proc(3))); DEBUG_MSG("nb_cell_levels[IDXC_Z] = %d\n", int(nb_cell_levels[IDXC_Z])); DEBUG_MSG("last_cell_level_proc(0) = %d\n", int(last_cell_level_proc(0))); DEBUG_MSG("last_cell_level_proc(1) = %d\n", int(last_cell_level_proc(1))); DEBUG_MSG("last_cell_level_proc(2) = %d\n", int(last_cell_level_proc(2))); DEBUG_MSG("last_cell_level_proc(3) = %d\n", int(last_cell_level_proc(3))); // Find process with at least one neighbor { int dest_proc = (my_rank+1)%nb_processes_involved; while(dest_proc != my_rank && (my_top_z_cell_level == first_cell_level_proc(dest_proc) || (my_top_z_cell_level+1)%nb_cell_levels[IDXC_Z] == first_cell_level_proc(dest_proc))){ // Find if we have to send 1 or 2 cell levels int nb_levels_to_send = 1; if(my_nb_cell_levels > 1 // I have more than one level && (my_top_z_cell_level-1+2)%nb_cell_levels[IDXC_Z] <= last_cell_level_proc(dest_proc)){ nb_levels_to_send += 1; } DEBUG_MSG("looking at dest_proc = %d ; nb_levels_to_send = %d\n", dest_proc, nb_levels_to_send); NeighborDescriptor descriptor; descriptor.destProc = dest_proc; descriptor.nbLevelsToExchange = nb_levels_to_send; descriptor.nbParticlesToExchange = particles_offset_layers[my_nb_cell_levels] - particles_offset_layers[my_nb_cell_levels-nb_levels_to_send]; descriptor.isRecv = false; descriptor.nbReceived = 0; neigDescriptors.emplace_back(std::move(descriptor)); dest_proc = (dest_proc+1)%nb_processes_involved; } int src_proc = (my_rank-1+nb_processes_involved)%nb_processes_involved; while(src_proc != my_rank && (last_cell_level_proc(src_proc) == my_down_z_cell_level || (last_cell_level_proc(src_proc)+1)%nb_cell_levels[IDXC_Z] == my_down_z_cell_level)){ // Find if we have to send 1 or 2 cell levels int nb_levels_to_recv = 1; if(my_nb_cell_levels > 1 // I have more than one level && first_cell_level_proc(src_proc) <= (my_down_z_cell_level-1+2)%nb_cell_levels[IDXC_Z]){ nb_levels_to_recv += 1; } DEBUG_MSG("looking at src_proc = %d ; nb_levels_to_recv = %d\n", src_proc, nb_levels_to_recv); NeighborDescriptor descriptor; descriptor.destProc = src_proc; descriptor.nbLevelsToExchange = nb_levels_to_recv; descriptor.nbParticlesToExchange = -1; descriptor.isRecv = true; descriptor.nbReceived = 0; neigDescriptors.emplace_back(std::move(descriptor)); src_proc = (src_proc-1+nb_processes_involved)%nb_processes_involved; } } DEBUG_MSG_WAIT(current_com, "hello, found processes with neighbours.\n"); ////////////////////////////////////////////////////////////////////// /// Exchange the number of particles in each partition /// Could involve only here but I do not think it will be a problem ////////////////////////////////////////////////////////////////////// assert(whatNext.size() == 0); assert(mpiRequests.size() == 0); #ifndef NDEBUG // Just for assertion std::vector willsend(nb_processes_involved, 0); std::vector willrecv(nb_processes_involved, 0); #endif for(int idxDescr = 0 ; idxDescr < int(neigDescriptors.size()) ; ++idxDescr){ NeighborDescriptor& descriptor = neigDescriptors[idxDescr]; if(descriptor.isRecv == false){ whatNext.emplace_back(std::pair{NOTHING_TODO, -1}); mpiRequests.emplace_back(); AssertMpi(MPI_Isend( const_cast(&descriptor.nbParticlesToExchange), 1, particles_utils::GetMpiType(partsize_t()), descriptor.destProc, TAG_NB_PARTICLES, current_com, &mpiRequests.back())); #ifndef NDEBUG // Just for assertion willsend[descriptor.destProc] += 1; #endif if(descriptor.nbParticlesToExchange){ whatNext.emplace_back(std::pair{NOTHING_TODO, -1}); mpiRequests.emplace_back(); assert(descriptor.nbParticlesToExchange*size_particle_positions < std::numeric_limits::max()); AssertMpi(MPI_Isend( const_cast(&particles_positions[particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]*size_particle_positions]), int(descriptor.nbParticlesToExchange*size_particle_positions), particles_utils::GetMpiType(real_number()), descriptor.destProc, TAG_POSITION_PARTICLES, current_com, &mpiRequests.back())); whatNext.emplace_back(std::pair{NOTHING_TODO, -1}); mpiRequests.emplace_back(); assert(descriptor.nbParticlesToExchange*size_particle_positions < std::numeric_limits::max()); AssertMpi(MPI_Isend( const_cast(&inout_index_particles[particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]]), int(descriptor.nbParticlesToExchange), particles_utils::GetMpiType(partsize_t()), descriptor.destProc, TAG_INDEX_PARTICLES, current_com, &mpiRequests.back())); assert(descriptor.toRecvAndMerge == nullptr); descriptor.toRecvAndMerge.reset(new real_number[descriptor.nbParticlesToExchange*size_particle_rhs]); whatNext.emplace_back(std::pair{MERGE_PARTICLES, idxDescr}); mpiRequests.emplace_back(); assert(descriptor.nbParticlesToExchange*size_particle_rhs < std::numeric_limits::max()); AssertMpi(MPI_Irecv( descriptor.toRecvAndMerge.get(), int(descriptor.nbParticlesToExchange*size_particle_rhs), particles_utils::GetMpiType(real_number()), descriptor.destProc, TAG_RESULT_PARTICLES, current_com, &mpiRequests.back())); } } else{ #ifndef NDEBUG // Just for assertion willrecv[descriptor.destProc] += 1; #endif whatNext.emplace_back(std::pair{RECV_PARTICLES, idxDescr}); mpiRequests.emplace_back(); AssertMpi(MPI_Irecv( &descriptor.nbParticlesToExchange, 1, particles_utils::GetMpiType(partsize_t()), descriptor.destProc, TAG_NB_PARTICLES, current_com, &mpiRequests.back())); } } #ifndef NDEBUG // Just for assertion { if(myrank == 0){ std::vector willsendall(nb_processes_involved*nb_processes_involved, 0);// TODO debug std::vector willrecvall(nb_processes_involved*nb_processes_involved, 0);// TODO debug MPI_Gather(willrecv.data(), nb_processes_involved, MPI_INT, willrecvall.data(), nb_processes_involved, MPI_INT, 0, MPI_COMM_WORLD); MPI_Gather(willsend.data(), nb_processes_involved, MPI_INT, willsendall.data(), nb_processes_involved, MPI_INT, 0, MPI_COMM_WORLD); for(int idxproc = 0 ; idxproc < nb_processes_involved ; ++idxproc){ for(int idxtest = 0 ; idxtest < nb_processes_involved ; ++idxtest){ DEBUG_MSG("p2p_distr_mpi::compute_distribution, comparing willsendall(%d, %d)=%d with willrecvall(%d, %d) = %d\n", idxproc, idxtest, willsendall[idxproc*nb_processes_involved + idxtest], idxtest, idxproc, willrecvall[idxtest*nb_processes_involved + idxproc]); assert(willsendall[idxproc*nb_processes_involved + idxtest] == willrecvall[idxtest*nb_processes_involved + idxproc]); } } } else{ MPI_Gather(willrecv.data(), nb_processes_involved, MPI_INT, nullptr, 0, MPI_INT, 0, MPI_COMM_WORLD); MPI_Gather(willsend.data(), nb_processes_involved, MPI_INT, nullptr, 0, MPI_INT, 0, MPI_COMM_WORLD); } } #endif lock_free_bool_array cells_locker(512); std::vector> computer_for_all_threads(omp_get_max_threads()-1); for(int idxThread = 1 ; idxThread < omp_get_max_threads() ; ++idxThread){ computer_for_all_threads[idxThread-1].reset(new computer_class(in_computer)); } TIMEZONE_OMP_INIT_PREPARALLEL(omp_get_max_threads()) #pragma omp parallel default(shared) { computer_class& computer_thread = (omp_get_thread_num() == 0 ? in_computer : *computer_for_all_threads[omp_get_thread_num()-1]); #pragma omp master { while(mpiRequests.size()){ TIMEZONE("wait-loop"); assert(mpiRequests.size() == whatNext.size()); int idxDone = int(mpiRequests.size()); { TIMEZONE("wait"); AssertMpi(MPI_Waitany(int(mpiRequests.size()), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE)); } const std::pair releasedAction = whatNext[idxDone]; std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]); std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]); mpiRequests.pop_back(); whatNext.pop_back(); ////////////////////////////////////////////////////////////////////// /// Data to exchange particles ////////////////////////////////////////////////////////////////////// if(releasedAction.first == RECV_PARTICLES){ TIMEZONE("post-recv-particles"); NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; assert(descriptor.isRecv); const int destProc = descriptor.destProc; const partsize_t NbParticlesToReceive = descriptor.nbParticlesToExchange; assert(NbParticlesToReceive != -1); assert(descriptor.toCompute == nullptr); assert(descriptor.indexes == nullptr); if(NbParticlesToReceive){ descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]); whatNext.emplace_back(std::pair{COMPUTE_PARTICLES, releasedAction.second}); mpiRequests.emplace_back(); assert(NbParticlesToReceive*size_particle_positions < std::numeric_limits::max()); AssertMpi(MPI_Irecv( descriptor.toCompute.get(), int(NbParticlesToReceive*size_particle_positions), particles_utils::GetMpiType(real_number()), destProc, TAG_POSITION_PARTICLES, current_com, &mpiRequests.back())); descriptor.indexes.reset(new partsize_t[NbParticlesToReceive]); whatNext.emplace_back(std::pair{COMPUTE_PARTICLES, releasedAction.second}); mpiRequests.emplace_back(); assert(NbParticlesToReceive*size_particle_positions < std::numeric_limits::max()); AssertMpi(MPI_Irecv( descriptor.indexes.get(), int(NbParticlesToReceive), particles_utils::GetMpiType(partsize_t()), destProc, TAG_INDEX_PARTICLES, current_com, &mpiRequests.back())); } } ////////////////////////////////////////////////////////////////////// /// Computation ////////////////////////////////////////////////////////////////////// if(releasedAction.first == COMPUTE_PARTICLES){ NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; descriptor.nbReceived += 1; assert(descriptor.nbReceived <= 2); if(descriptor.nbReceived == 2){ TIMEZONE("compute-particles"); NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; assert(descriptor.isRecv); const partsize_t NbParticlesToReceive = descriptor.nbParticlesToExchange; assert(descriptor.toCompute != nullptr); assert(descriptor.indexes != nullptr); // allocate rhs buffer descriptor.results.reset(new real_number[NbParticlesToReceive*size_particle_rhs]); // clean up rhs buffer set_particle_data_to_zero( descriptor.results.get(), NbParticlesToReceive); // Compute partsize_t idxPart = 0; while(idxPart != NbParticlesToReceive){ const long int current_cell_idx = get_cell_idx(descriptor.toCompute[idxPart*size_particle_positions + IDXC_X], descriptor.toCompute[idxPart*size_particle_positions + IDXC_Y], descriptor.toCompute[idxPart*size_particle_positions + IDXC_Z]); partsize_t nb_parts_in_cell = 1; while(idxPart+nb_parts_in_cell != NbParticlesToReceive && current_cell_idx == get_cell_idx(descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_X], descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_Y], descriptor.toCompute[(idxPart+nb_parts_in_cell)*size_particle_positions + IDXC_Z])){ nb_parts_in_cell += 1; } #pragma omp task default(shared) firstprivate(idxPart, nb_parts_in_cell, current_cell_idx) { computer_class& computer_thread_task = (omp_get_thread_num() == 0 ? in_computer : *computer_for_all_threads[omp_get_thread_num()-1]); const std::vector>* neighbors[27]; long int neighbors_indexes[27]; std::array shift[27]; real_number diff_x, diff_y, diff_z; const int nbNeighbors = my_tree.getNeighbors( current_cell_idx, neighbors, neighbors_indexes, shift, true); // with other interval for(int idx_neighbor = 0 ; idx_neighbor < nbNeighbors ; ++idx_neighbor){ cells_locker.lock(neighbors_indexes[idx_neighbor]); for(size_t idx_2 = 0 ; idx_2 < (*neighbors[idx_neighbor]).size() ; ++idx_2){ for(partsize_t idx_p1 = 0 ; idx_p1 < nb_parts_in_cell ; ++idx_p1){ for(partsize_t idx_p2 = 0 ; idx_p2 < (*neighbors[idx_neighbor])[idx_2].second ; ++idx_p2){ const real_number dist_r2 = compute_distance_r2( descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_X], descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_Y], descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions + IDXC_Z], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_X], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z], shift[idx_neighbor][IDXC_X], shift[idx_neighbor][IDXC_Y], shift[idx_neighbor][IDXC_Z], diff_x, diff_y, diff_z); if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){ computer_thread_task.template compute_interaction( descriptor.indexes[(idxPart+idx_p1)], &descriptor.toCompute[(idxPart+idx_p1)*size_particle_positions], &descriptor.results[(idxPart+idx_p1)*size_particle_rhs], inout_index_particles[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)], &particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions], &particles_current_rhs[0][((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_rhs], dist_r2, cutoff_radius_compute, diff_x, diff_y, diff_z); } } } } cells_locker.unlock(neighbors_indexes[idx_neighbor]); } } idxPart += nb_parts_in_cell; } #pragma omp taskwait // Send back const int destProc = descriptor.destProc; whatNext.emplace_back(std::pair{RELEASE_BUFFER_PARTICLES, releasedAction.second}); mpiRequests.emplace_back(); assert(NbParticlesToReceive*size_particle_rhs < std::numeric_limits::max()); AssertMpi(MPI_Isend( descriptor.results.get(), int(NbParticlesToReceive*size_particle_rhs), particles_utils::GetMpiType(real_number()), destProc, TAG_RESULT_PARTICLES, current_com, &mpiRequests.back())); delete[] descriptor.toCompute.release(); delete[] descriptor.indexes.release(); } } ////////////////////////////////////////////////////////////////////// /// Release memory that was sent back ////////////////////////////////////////////////////////////////////// if(releasedAction.first == RELEASE_BUFFER_PARTICLES){ NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; assert(descriptor.results != nullptr); assert(descriptor.isRecv); delete[] descriptor.results.release(); } ////////////////////////////////////////////////////////////////////// /// Merge ////////////////////////////////////////////////////////////////////// if(releasedAction.first == MERGE_PARTICLES){ TIMEZONE("merge"); NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; assert(descriptor.isRecv == false); assert(descriptor.toRecvAndMerge != nullptr); computer_thread.template reduce_particles_rhs( &particles_current_rhs[0][particles_offset_layers[my_nb_cell_levels-descriptor.nbLevelsToExchange]*size_particle_rhs], descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToExchange); delete[] descriptor.toRecvAndMerge.release(); } } } } assert(whatNext.size() == 0); assert(mpiRequests.size() == 0); { computer_class& computer_thread = (omp_get_thread_num() == 0 ? in_computer : *computer_for_all_threads[omp_get_thread_num()-1]); // Compute self data for(const auto& iter_cell : my_tree){ TIMEZONE("proceed-leaf"); const long int currenct_cell_idx = iter_cell.first; const std::vector>* intervals_ptr = &iter_cell.second; #pragma omp task default(shared) firstprivate(currenct_cell_idx, intervals_ptr) { const std::vector>& intervals = (*intervals_ptr); cells_locker.lock(currenct_cell_idx); for(size_t idx_1 = 0 ; idx_1 < intervals.size() ; ++idx_1){ // self interval for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){ for(partsize_t idx_p2 = idx_p1+1 ; idx_p2 < intervals[idx_1].second ; ++idx_p2){ real_number diff_x, diff_y, diff_z; const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z], particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_X], particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_Y], particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions + IDXC_Z], 0, 0, 0, diff_x, diff_y, diff_z); if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){ computer_thread.template compute_interaction( inout_index_particles[(intervals[idx_1].first+idx_p1)], &particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions], &particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs], inout_index_particles[(intervals[idx_1].first+idx_p2)], &particles_positions[(intervals[idx_1].first+idx_p2)*size_particle_positions], &particles_current_rhs[0][(intervals[idx_1].first+idx_p2)*size_particle_rhs], dist_r2, cutoff_radius_compute, diff_x, diff_y, diff_z); } } } // with other interval for(size_t idx_2 = idx_1+1 ; idx_2 < intervals.size() ; ++idx_2){ for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){ for(partsize_t idx_p2 = 0 ; idx_p2 < intervals[idx_2].second ; ++idx_p2){ real_number diff_x, diff_y, diff_z; const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z], particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_X], particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y], particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z], 0, 0, 0, diff_x, diff_y, diff_z); if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){ computer_thread.template compute_interaction( inout_index_particles[(intervals[idx_1].first+idx_p1)], &particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions], &particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs], inout_index_particles[(intervals[idx_2].first+idx_p2)], &particles_positions[(intervals[idx_2].first+idx_p2)*size_particle_positions], &particles_current_rhs[0][(intervals[idx_2].first+idx_p2)*size_particle_rhs], dist_r2, cutoff_radius_compute, diff_x, diff_y, diff_z); } } } } } const std::vector>* neighbors[27]; long int neighbors_indexes[27]; std::array shift[27]; const int nbNeighbors = my_tree.getNeighbors(currenct_cell_idx, neighbors, neighbors_indexes, shift, false); for(size_t idx_1 = 0 ; idx_1 < intervals.size() ; ++idx_1){ // with other interval for(int idx_neighbor = 0 ; idx_neighbor < nbNeighbors ; ++idx_neighbor){ if(currenct_cell_idx < neighbors_indexes[idx_neighbor]){ cells_locker.lock(neighbors_indexes[idx_neighbor]); for(size_t idx_2 = 0 ; idx_2 < (*neighbors[idx_neighbor]).size() ; ++idx_2){ for(partsize_t idx_p1 = 0 ; idx_p1 < intervals[idx_1].second ; ++idx_p1){ for(partsize_t idx_p2 = 0 ; idx_p2 < (*neighbors[idx_neighbor])[idx_2].second ; ++idx_p2){ real_number diff_x, diff_y, diff_z; const real_number dist_r2 = compute_distance_r2(particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_X], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Y], particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions + IDXC_Z], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_X], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Y], particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions + IDXC_Z], shift[idx_neighbor][IDXC_X], shift[idx_neighbor][IDXC_Y], shift[idx_neighbor][IDXC_Z], diff_x, diff_y, diff_z); if(dist_r2 < cutoff_radius_compute*cutoff_radius_compute){ computer_thread.template compute_interaction( inout_index_particles[(intervals[idx_1].first+idx_p1)], &particles_positions[(intervals[idx_1].first+idx_p1)*size_particle_positions], &particles_current_rhs[0][(intervals[idx_1].first+idx_p1)*size_particle_rhs], inout_index_particles[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)], &particles_positions[((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_positions], &particles_current_rhs[0][((*neighbors[idx_neighbor])[idx_2].first+idx_p2)*size_particle_rhs], dist_r2, cutoff_radius_compute, diff_x, diff_y, diff_z); } } } } cells_locker.unlock(neighbors_indexes[idx_neighbor]); } } } cells_locker.unlock(currenct_cell_idx); } } } for(int idxThread = 1 ; idxThread < omp_get_num_threads() ; ++idxThread){ in_computer.merge(*computer_for_all_threads[idxThread-1]); } } }; #endif