diff --git a/bfps/cpp/particles/abstract_particles_distr.hpp b/bfps/cpp/particles/abstract_particles_distr.hpp index d6ca43a2fef4db3ed58c1e058d70c8133ca223fa..65c770c6b1a7f4a65660863f218190ebf1132b89 100644 --- a/bfps/cpp/particles/abstract_particles_distr.hpp +++ b/bfps/cpp/particles/abstract_particles_distr.hpp @@ -8,6 +8,7 @@ #include <cassert> #include <type_traits> +#include <omp.h> #include "scope_timer.hpp" #include "particles_utils.hpp" @@ -271,115 +272,189 @@ public: } } - while(mpiRequests.size()){ - assert(mpiRequests.size() == whatNext.size()); + const bool more_than_one_thread = (omp_get_max_threads() > 1); - int idxDone = mpiRequests.size(); + TIMEZONE_OMP_INIT_PREPARALLEL(omp_get_max_threads()) + #pragma omp parallel default(shared) + { + #pragma omp master { - TIMEZONE("wait"); - AssertMpi(MPI_Waitany(mpiRequests.size(), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE)); - } - const std::pair<Action, int> releasedAction = whatNext[idxDone]; - std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]); - std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]); - mpiRequests.pop_back(); - whatNext.pop_back(); - - ////////////////////////////////////////////////////////////////////// - /// Data to exchange particles - ////////////////////////////////////////////////////////////////////// - if(releasedAction.first == RECV_PARTICLES){ - NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; - - if(descriptor.isLower){ - //const int idxLower = descriptor.idxLowerUpper; - const int destProc = descriptor.destProc; - //const int nbPartitionsToRecv = descriptor.nbPartitionsToRecv; - const int NbParticlesToReceive = descriptor.nbParticlesToRecv; - assert(NbParticlesToReceive != -1); - assert(descriptor.toCompute == nullptr); - if(NbParticlesToReceive){ - descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]); - whatNext.emplace_back(std::pair<Action,int>{COMPUTE_PARTICLES, releasedAction.second}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(descriptor.toCompute.get(), NbParticlesToReceive*size_particle_positions, particles_utils::GetMpiType(real_number()), destProc, TAG_UP_LOW_PARTICLES, - current_com, &mpiRequests.back())); + while(mpiRequests.size()){ + assert(mpiRequests.size() == whatNext.size()); + + int idxDone = mpiRequests.size(); + { + TIMEZONE("wait"); + AssertMpi(MPI_Waitany(mpiRequests.size(), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE)); } - } - else{ - //const int idxUpper = descriptor.idxLowerUpper; - const int destProc = descriptor.destProc; - //const int nbPartitionsToRecv = descriptor.nbPartitionsToRecv; - const int NbParticlesToReceive = descriptor.nbParticlesToRecv; - assert(NbParticlesToReceive != -1); - assert(descriptor.toCompute == nullptr); - if(NbParticlesToReceive){ - descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]); - whatNext.emplace_back(std::pair<Action,int>{COMPUTE_PARTICLES, releasedAction.second}); + const std::pair<Action, int> releasedAction = whatNext[idxDone]; + std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]); + std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]); + mpiRequests.pop_back(); + whatNext.pop_back(); + + ////////////////////////////////////////////////////////////////////// + /// Data to exchange particles + ////////////////////////////////////////////////////////////////////// + if(releasedAction.first == RECV_PARTICLES){ + NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; + + if(descriptor.isLower){ + //const int idxLower = descriptor.idxLowerUpper; + const int destProc = descriptor.destProc; + //const int nbPartitionsToRecv = descriptor.nbPartitionsToRecv; + const int NbParticlesToReceive = descriptor.nbParticlesToRecv; + assert(NbParticlesToReceive != -1); + assert(descriptor.toCompute == nullptr); + if(NbParticlesToReceive){ + descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]); + whatNext.emplace_back(std::pair<Action,int>{COMPUTE_PARTICLES, releasedAction.second}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(descriptor.toCompute.get(), NbParticlesToReceive*size_particle_positions, particles_utils::GetMpiType(real_number()), destProc, TAG_UP_LOW_PARTICLES, + current_com, &mpiRequests.back())); + } + } + else{ + //const int idxUpper = descriptor.idxLowerUpper; + const int destProc = descriptor.destProc; + //const int nbPartitionsToRecv = descriptor.nbPartitionsToRecv; + const int NbParticlesToReceive = descriptor.nbParticlesToRecv; + assert(NbParticlesToReceive != -1); + assert(descriptor.toCompute == nullptr); + if(NbParticlesToReceive){ + descriptor.toCompute.reset(new real_number[NbParticlesToReceive*size_particle_positions]); + whatNext.emplace_back(std::pair<Action,int>{COMPUTE_PARTICLES, releasedAction.second}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(descriptor.toCompute.get(), NbParticlesToReceive*size_particle_positions, particles_utils::GetMpiType(real_number()), destProc, TAG_LOW_UP_PARTICLES, + current_com, &mpiRequests.back())); + } + } + } + + ////////////////////////////////////////////////////////////////////// + /// Computation + ////////////////////////////////////////////////////////////////////// + if(releasedAction.first == COMPUTE_PARTICLES){ + NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; + const int NbParticlesToReceive = descriptor.nbParticlesToRecv; + + assert(descriptor.toCompute != nullptr); + descriptor.results.reset(new real_number[NbParticlesToReceive*size_particle_rhs]); + init_result_array(descriptor.results.get(), NbParticlesToReceive); + + if(more_than_one_thread == false){ + apply_computation(descriptor.toCompute.get(), descriptor.results.get(), NbParticlesToReceive); + } + else{ + TIMEZONE_OMP_INIT_PRETASK(timeZoneTaskKey) + NeighborDescriptor* ptr_descriptor = &descriptor; + #pragma omp taskgroup + { + for(int idxPart = 0 ; idxPart < NbParticlesToReceive ; idxPart += 300){ + const int sizeToDo = std::min(300, NbParticlesToReceive-idxPart); + #pragma omp task default(shared) firstprivate(ptr_descriptor, idxPart, sizeToDo) priority(10) \ + TIMEZONE_OMP_PRAGMA_TASK_KEY(timeZoneTaskKey) + { + TIMEZONE_OMP_TASK("apply_computation", timeZoneTaskKey); + apply_computation(&ptr_descriptor->toCompute[idxPart*size_particle_positions], + &ptr_descriptor->results[idxPart*size_particle_rhs], sizeToDo); + } + } + } + } + + const int destProc = descriptor.destProc; + whatNext.emplace_back(std::pair<Action,int>{RELEASE_BUFFER_PARTICLES, releasedAction.second}); mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(descriptor.toCompute.get(), NbParticlesToReceive*size_particle_positions, particles_utils::GetMpiType(real_number()), destProc, TAG_LOW_UP_PARTICLES, + const int tag = descriptor.isLower? TAG_LOW_UP_RESULTS : TAG_UP_LOW_RESULTS; + AssertMpi(MPI_Isend(descriptor.results.get(), NbParticlesToReceive*size_particle_rhs, particles_utils::GetMpiType(real_number()), destProc, tag, current_com, &mpiRequests.back())); } + ////////////////////////////////////////////////////////////////////// + /// Computation + ////////////////////////////////////////////////////////////////////// + if(releasedAction.first == RELEASE_BUFFER_PARTICLES){ + NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; + assert(descriptor.toCompute != nullptr); + descriptor.toCompute.release(); + } + ////////////////////////////////////////////////////////////////////// + /// Merge + ////////////////////////////////////////////////////////////////////// + if(releasedAction.first == MERGE_PARTICLES){ + NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; + + // We can merge safely on first and last partitions since no one is working on it + if(descriptor.isLower){ + TIMEZONE("reduce"); + assert(descriptor.toRecvAndMerge != nullptr); + reduce_particles_rhs(&particles_current_rhs[0], descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToSend); + descriptor.toRecvAndMerge.release(); + } + else { + TIMEZONE("reduce"); + assert(descriptor.toRecvAndMerge != nullptr); + reduce_particles_rhs(&particles_current_rhs[(current_offset_particles_for_partition[current_partition_size]-descriptor.nbParticlesToSend)*size_particle_rhs], + descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToSend); + descriptor.toRecvAndMerge.release(); + } + } } - } - - ////////////////////////////////////////////////////////////////////// - /// Computation - ////////////////////////////////////////////////////////////////////// - if(releasedAction.first == COMPUTE_PARTICLES){ - NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; - const int NbParticlesToReceive = descriptor.nbParticlesToRecv; - - assert(descriptor.toCompute != nullptr); - descriptor.results.reset(new real_number[NbParticlesToReceive*size_particle_rhs]); - init_result_array(descriptor.results.get(), NbParticlesToReceive); - apply_computation(descriptor.toCompute.get(), descriptor.results.get(), NbParticlesToReceive); - const int destProc = descriptor.destProc; - whatNext.emplace_back(std::pair<Action,int>{RELEASE_BUFFER_PARTICLES, releasedAction.second}); - mpiRequests.emplace_back(); - const int tag = descriptor.isLower? TAG_LOW_UP_RESULTS : TAG_UP_LOW_RESULTS; - AssertMpi(MPI_Isend(descriptor.results.get(), NbParticlesToReceive*size_particle_rhs, particles_utils::GetMpiType(real_number()), destProc, tag, - current_com, &mpiRequests.back())); - } - ////////////////////////////////////////////////////////////////////// - /// Computation - ////////////////////////////////////////////////////////////////////// - if(releasedAction.first == RELEASE_BUFFER_PARTICLES){ - NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; - assert(descriptor.toCompute != nullptr); - descriptor.toCompute.release(); - } - ////////////////////////////////////////////////////////////////////// - /// Merge - ////////////////////////////////////////////////////////////////////// - if(releasedAction.first == MERGE_PARTICLES){ - NeighborDescriptor& descriptor = neigDescriptors[releasedAction.second]; - - if(descriptor.isLower){ - TIMEZONE("reduce"); - assert(descriptor.toRecvAndMerge != nullptr); - reduce_particles(&particles_positions[0], &particles_current_rhs[0], descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToSend); - descriptor.toRecvAndMerge.release(); + if(more_than_one_thread){ + TIMEZONE_OMP_INIT_PRETASK(timeZoneTaskKey) + #pragma omp taskgroup + { + // Do for first and last partition (or the single partition if current_partition_size is one) + for(int idxPartition = 0 ; idxPartition < current_partition_size ; idxPartition = std::max(idxPartition+1,current_partition_size-1)){ + for(int idxPart = current_offset_particles_for_partition[idxPartition] ; + idxPart < current_offset_particles_for_partition[idxPartition+1] ; idxPart += 300){ + + const int sizeToDo = std::min(300, current_offset_particles_for_partition[idxPartition+1]-idxPart); + + #pragma omp task default(shared) firstprivate(idxPart, sizeToDo) priority(0) TIMEZONE_OMP_PRAGMA_TASK_KEY(timeZoneTaskKey) + { + TIMEZONE_OMP_TASK("apply_computation_border", timeZoneTaskKey); + apply_computation(&particles_positions[idxPart*size_particle_positions], + &particles_current_rhs[idxPart*size_particle_rhs], + sizeToDo); + } + } + } + } } - else { - TIMEZONE("reduce"); - assert(descriptor.toRecvAndMerge != nullptr); - reduce_particles(&particles_positions[(current_offset_particles_for_partition[current_partition_size]-descriptor.nbParticlesToSend)*size_particle_positions], - &particles_current_rhs[(current_offset_particles_for_partition[current_partition_size]-descriptor.nbParticlesToSend)*size_particle_rhs], - descriptor.toRecvAndMerge.get(), descriptor.nbParticlesToSend); - descriptor.toRecvAndMerge.release(); + } + if(more_than_one_thread && omp_get_thread_num() == 1){ + TIMEZONE_OMP_INIT_PRETASK(timeZoneTaskKey) + #pragma omp taskgroup + { + // Do for all partitions except the first and last one + for(int idxPartition = 1 ; idxPartition < current_partition_size-1 ; ++idxPartition){ + for(int idxPart = current_offset_particles_for_partition[idxPartition] ; + idxPart < current_offset_particles_for_partition[idxPartition+1] ; idxPart += 300){ + + const int sizeToDo = std::min(300, current_offset_particles_for_partition[idxPartition+1]-idxPart); + + // Low priority to help master thread when possible + #pragma omp task default(shared) firstprivate(idxPart, sizeToDo) priority(0) TIMEZONE_OMP_PRAGMA_TASK_KEY(timeZoneTaskKey) + { + TIMEZONE_OMP_TASK("apply_computation", timeZoneTaskKey); + apply_computation(&particles_positions[idxPart*size_particle_positions], + &particles_current_rhs[idxPart*size_particle_rhs], + sizeToDo); + } + } + } } } } - { - // TODO compute only border sections and do the reste in parallel - TIMEZONE("compute"); + // Do my own computation + if(more_than_one_thread == false){ + TIMEZONE("compute-my_compute"); // Compute my particles if(myTotalNbParticles){ - TIMEZONE("my_compute"); apply_computation(particles_positions, particles_current_rhs, myTotalNbParticles); } } @@ -391,12 +466,11 @@ public: //////////////////////////////////////////////////////////////////////////// virtual void init_result_array(real_number particles_current_rhs[], - const int nb_particles) = 0; + const int nb_particles) const = 0; virtual void apply_computation(const real_number particles_positions[], real_number particles_current_rhs[], const int nb_particles) const = 0; - virtual void reduce_particles(const real_number particles_positions[], - real_number particles_current_rhs[], + virtual void reduce_particles_rhs(real_number particles_current_rhs[], const real_number extra_particles_current_rhs[], const int nb_particles) const = 0; @@ -476,153 +550,200 @@ public: std::vector<std::unique_ptr<real_number[]>> newParticlesLowRhs(in_nb_rhs); std::vector<std::unique_ptr<real_number[]>> newParticlesUpRhs(in_nb_rhs); - { - assert(whatNext.size() == 0); - assert(mpiRequests.size() == 0); - - whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_NB_LOW, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&nbNewFromLow, 1, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_NB_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); - eventsBeforeWaitall += 1; + const bool more_than_one_thread = (omp_get_max_threads() > 1); - whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(const_cast<int*>(&nbOutLower), 1, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_NB_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); + TIMEZONE_OMP_INIT_PREPARALLEL(omp_get_max_threads()) + #pragma omp parallel default(shared) + { + #pragma omp master + { + assert(whatNext.size() == 0); + assert(mpiRequests.size() == 0); - if(nbOutLower){ - whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_NB_LOW, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&(*inout_positions_particles)[0], nbOutLower*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES, + AssertMpi(MPI_Irecv(&nbNewFromLow, 1, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_NB_PARTICLES, MPI_COMM_WORLD, &mpiRequests.back())); + eventsBeforeWaitall += 1; + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&(*inout_index_particles)[0], nbOutLower, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_INDEXES, + AssertMpi(MPI_Isend(const_cast<int*>(&nbOutLower), 1, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_NB_PARTICLES, MPI_COMM_WORLD, &mpiRequests.back())); - for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ + if(nbOutLower){ whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&inout_rhs_particles[idx_rhs][0], nbOutLower*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_RHS+idx_rhs, + AssertMpi(MPI_Isend(&(*inout_positions_particles)[0], nbOutLower*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES, + MPI_COMM_WORLD, &mpiRequests.back())); + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Isend(&(*inout_index_particles)[0], nbOutLower, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_INDEXES, MPI_COMM_WORLD, &mpiRequests.back())); - } - } - - whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_NB_UP, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&nbNewFromUp, 1, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_NB_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); - eventsBeforeWaitall += 1; - whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(const_cast<int*>(&nbOutUpper), 1, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_NB_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); + for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Isend(&inout_rhs_particles[idx_rhs][0], nbOutLower*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_RHS+idx_rhs, + MPI_COMM_WORLD, &mpiRequests.back())); + } + } - if(nbOutUpper){ - whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_NB_UP, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&(*inout_positions_particles)[(myTotalNbParticles-nbOutUpper)*size_particle_positions], nbOutUpper*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES, + AssertMpi(MPI_Irecv(&nbNewFromUp, 1, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_NB_PARTICLES, MPI_COMM_WORLD, &mpiRequests.back())); + eventsBeforeWaitall += 1; + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&(*inout_index_particles)[(myTotalNbParticles-nbOutUpper)], nbOutUpper, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_INDEXES, + AssertMpi(MPI_Isend(const_cast<int*>(&nbOutUpper), 1, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_NB_PARTICLES, MPI_COMM_WORLD, &mpiRequests.back())); - - for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ + if(nbOutUpper){ whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Isend(&inout_rhs_particles[idx_rhs][(myTotalNbParticles-nbOutUpper)*size_particle_rhs], nbOutUpper*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_RHS+idx_rhs, + AssertMpi(MPI_Isend(&(*inout_positions_particles)[(myTotalNbParticles-nbOutUpper)*size_particle_positions], nbOutUpper*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES, + MPI_COMM_WORLD, &mpiRequests.back())); + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Isend(&(*inout_index_particles)[(myTotalNbParticles-nbOutUpper)], nbOutUpper, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_INDEXES, MPI_COMM_WORLD, &mpiRequests.back())); - } - } - while(mpiRequests.size() && eventsBeforeWaitall){ - int idxDone = mpiRequests.size(); - { - TIMEZONE("waitany_move"); - AssertMpi(MPI_Waitany(mpiRequests.size(), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE)); - } - const std::pair<Action, int> releasedAction = whatNext[idxDone]; - std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]); - std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]); - mpiRequests.pop_back(); - whatNext.pop_back(); - - if(releasedAction.first == RECV_MOVE_NB_LOW){ - if(nbNewFromLow){ - assert(newParticlesLow == nullptr); - newParticlesLow.reset(new real_number[nbNewFromLow*size_particle_positions]); - whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_LOW, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesLow[0], nbNewFromLow*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); - newParticlesLowIndexes.reset(new int[nbNewFromLow]); + for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesLowIndexes[0], nbNewFromLow, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_INDEXES, + AssertMpi(MPI_Isend(&inout_rhs_particles[idx_rhs][(myTotalNbParticles-nbOutUpper)*size_particle_rhs], nbOutUpper*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_RHS+idx_rhs, MPI_COMM_WORLD, &mpiRequests.back())); + } + } - for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ - newParticlesLowRhs[idx_rhs].reset(new real_number[nbNewFromLow*size_particle_rhs]); + while(mpiRequests.size() && eventsBeforeWaitall){ + int idxDone = mpiRequests.size(); + { + TIMEZONE("waitany_move"); + AssertMpi(MPI_Waitany(mpiRequests.size(), mpiRequests.data(), &idxDone, MPI_STATUSES_IGNORE)); + } + const std::pair<Action, int> releasedAction = whatNext[idxDone]; + std::swap(mpiRequests[idxDone], mpiRequests[mpiRequests.size()-1]); + std::swap(whatNext[idxDone], whatNext[mpiRequests.size()-1]); + mpiRequests.pop_back(); + whatNext.pop_back(); + + if(releasedAction.first == RECV_MOVE_NB_LOW){ + if(nbNewFromLow){ + assert(newParticlesLow == nullptr); + newParticlesLow.reset(new real_number[nbNewFromLow*size_particle_positions]); + whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_LOW, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(&newParticlesLow[0], nbNewFromLow*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES, + MPI_COMM_WORLD, &mpiRequests.back())); + + newParticlesLowIndexes.reset(new int[nbNewFromLow]); whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesLowRhs[idx_rhs][0], nbNewFromLow*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_RHS+idx_rhs, + AssertMpi(MPI_Irecv(&newParticlesLowIndexes[0], nbNewFromLow, MPI_INT, (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_INDEXES, MPI_COMM_WORLD, &mpiRequests.back())); + + for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ + newParticlesLowRhs[idx_rhs].reset(new real_number[nbNewFromLow*size_particle_rhs]); + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(&newParticlesLowRhs[idx_rhs][0], nbNewFromLow*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank-1+nb_processes_involved)%nb_processes_involved, TAG_UP_LOW_MOVED_PARTICLES_RHS+idx_rhs, + MPI_COMM_WORLD, &mpiRequests.back())); + } } + eventsBeforeWaitall -= 1; } - eventsBeforeWaitall -= 1; - } - else if(releasedAction.first == RECV_MOVE_NB_UP){ - if(nbNewFromUp){ - assert(newParticlesUp == nullptr); - newParticlesUp.reset(new real_number[nbNewFromUp*size_particle_positions]); - whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_UP, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesUp[0], nbNewFromUp*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES, - MPI_COMM_WORLD, &mpiRequests.back())); - - newParticlesUpIndexes.reset(new int[nbNewFromUp]); - whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); - mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesUpIndexes[0], nbNewFromUp, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_INDEXES, - MPI_COMM_WORLD, &mpiRequests.back())); + else if(releasedAction.first == RECV_MOVE_NB_UP){ + if(nbNewFromUp){ + assert(newParticlesUp == nullptr); + newParticlesUp.reset(new real_number[nbNewFromUp*size_particle_positions]); + whatNext.emplace_back(std::pair<Action,int>{RECV_MOVE_UP, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(&newParticlesUp[0], nbNewFromUp*size_particle_positions, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES, + MPI_COMM_WORLD, &mpiRequests.back())); - for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ - newParticlesUpRhs[idx_rhs].reset(new real_number[nbNewFromUp*size_particle_rhs]); + newParticlesUpIndexes.reset(new int[nbNewFromUp]); whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); mpiRequests.emplace_back(); - AssertMpi(MPI_Irecv(&newParticlesUpRhs[idx_rhs][0], nbNewFromUp*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_RHS+idx_rhs, + AssertMpi(MPI_Irecv(&newParticlesUpIndexes[0], nbNewFromUp, MPI_INT, (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_INDEXES, MPI_COMM_WORLD, &mpiRequests.back())); + + for(int idx_rhs = 0 ; idx_rhs < in_nb_rhs ; ++idx_rhs){ + newParticlesUpRhs[idx_rhs].reset(new real_number[nbNewFromUp*size_particle_rhs]); + whatNext.emplace_back(std::pair<Action,int>{NOTHING_TODO, -1}); + mpiRequests.emplace_back(); + AssertMpi(MPI_Irecv(&newParticlesUpRhs[idx_rhs][0], nbNewFromUp*size_particle_rhs, particles_utils::GetMpiType(real_number()), (my_rank+1)%nb_processes_involved, TAG_LOW_UP_MOVED_PARTICLES_RHS+idx_rhs, + MPI_COMM_WORLD, &mpiRequests.back())); + } } + eventsBeforeWaitall -= 1; } - eventsBeforeWaitall -= 1; } - } - } - if(mpiRequests.size()){ - // TODO Proceed when received - TIMEZONE("waitall-move"); - AssertMpi(MPI_Waitall(mpiRequests.size(), mpiRequests.data(), MPI_STATUSES_IGNORE)); - mpiRequests.clear(); - whatNext.clear(); + if(mpiRequests.size()){ + // TODO Proceed when received + TIMEZONE("waitall-move"); + AssertMpi(MPI_Waitall(mpiRequests.size(), mpiRequests.data(), MPI_STATUSES_IGNORE)); + mpiRequests.clear(); + whatNext.clear(); + } + + // If we use thread, insert task to proceed received data + if(more_than_one_thread == true){ + TIMEZONE_OMP_INIT_PRETASK(timeZoneTaskKey) + #pragma omp taskgroup + { + if(nbNewFromLow){ + assert(newParticlesLow.get() != nullptr); + #pragma omp task TIMEZONE_OMP_PRAGMA_TASK_KEY(timeZoneTaskKey) + { + TIMEZONE_OMP_TASK("task-pbc", timeZoneTaskKey); + apply_pbc_z_new_particles(newParticlesLow.get(), nbNewFromLow); + apply_pbc_xy(newParticlesLow.get(), nbNewFromLow); + } + } + if(nbNewFromUp){ + assert(newParticlesUp.get() != nullptr); + #pragma omp task TIMEZONE_OMP_PRAGMA_TASK_KEY(timeZoneTaskKey) + { + TIMEZONE_OMP_TASK("task-pbc", timeZoneTaskKey); + apply_pbc_z_new_particles(newParticlesUp.get(), nbNewFromUp); + apply_pbc_xy(newParticlesUp.get(), nbNewFromUp); + } + } + } + } + } + // if we use threads and we are not master thread than proceed local data (not send/recv) + if(more_than_one_thread == true && omp_get_thread_num() > 0){ + TIMEZONE("apply_pbc_xy"); + const int nbOldParticles = myTotalNbParticles - nbOutLower - nbOutUpper; + particles_utils::IntervalSplitter<int> interval(nbOldParticles, + omp_get_num_threads()-1, + omp_get_thread_num()-1); + + apply_pbc_xy(&(*inout_positions_particles)[(nbOutLower+interval.getMyOffset())*size_particle_positions], interval.getMySize()); + } } - // Exchange particles - { + // If we do not use thread, process all data sequentially + if(more_than_one_thread == false){ TIMEZONE("apply_pbc_z_new_particles"); if(nbNewFromLow){ assert(newParticlesLow.get() != nullptr); apply_pbc_z_new_particles(newParticlesLow.get(), nbNewFromLow); + apply_pbc_xy(newParticlesLow.get(), nbNewFromLow); } if(nbNewFromUp){ assert(newParticlesUp.get() != nullptr); apply_pbc_z_new_particles(newParticlesUp.get(), nbNewFromUp); + apply_pbc_xy(newParticlesUp.get(), nbNewFromUp); } + + apply_pbc_xy(&(*inout_positions_particles)[nbOutLower*size_particle_positions], myTotalNbParticles - nbOutLower - nbOutUpper); } // Realloc an merge @@ -677,11 +798,6 @@ public: myTotalNbParticles = myTotalNewNbParticles; } - { - TIMEZONE("apply_pbc_xy"); - apply_pbc_xy((*inout_positions_particles).get(), myTotalNbParticles); - } - // Partitions all particles { TIMEZONE("repartition"); diff --git a/bfps/cpp/particles/particles_adams_bashforth.hpp b/bfps/cpp/particles/particles_adams_bashforth.hpp index e769335820deae729325f31c4dbf9f148a48c5bd..aaa81e03515c4dad9e7468d3435a3bee0adc8487 100644 --- a/bfps/cpp/particles/particles_adams_bashforth.hpp +++ b/bfps/cpp/particles/particles_adams_bashforth.hpp @@ -2,7 +2,10 @@ #define PARTICLES_ADAMS_BASHFORTH_HPP #include <stdexcept> +#include <omp.h> + #include "scope_timer.hpp" +#include "particles_utils.hpp" template <class real_number, int size_particle_positions = 3, int size_particle_rhs = 3> class particles_adams_bashforth { @@ -14,81 +17,93 @@ public: const std::unique_ptr<real_number[]> particles_rhs[], const int nb_rhs, const real_number dt) const{ TIMEZONE("particles_adams_bashforth::move_particles"); - // TODO full unroll + blocking - switch (nb_rhs){ - case 1: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × [0] - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * particles_rhs[0][idx_part*size_particle_rhs + idx_dim]; + + if(Max_steps < nb_rhs){ + throw std::runtime_error("Error, in bfps particles_adams_bashforth.\n" + "Step in particles_adams_bashforth is too large," + "you must add formulation up this number or limit the number of steps."); + } + + // Not needed: TIMEZONE_OMP_INIT_PREPARALLEL(omp_get_max_threads()) + #pragma omp parallel default(shared) + { + particles_utils::IntervalSplitter<int> interval(nb_particles, + omp_get_num_threads(), + omp_get_thread_num()); + const int last_idx = interval.getMyOffset()+interval.getMySize(); + + // TODO full unroll + blocking + switch (nb_rhs){ + case 1: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × [0] + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * particles_rhs[0][idx_part*size_particle_rhs + idx_dim]; + } } - } - break; - case 2: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × (3[0] - [1])/2 - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * (3.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] - - particles_rhs[1][idx_part*size_particle_rhs + idx_dim])/2.; + break; + case 2: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × (3[0] - [1])/2 + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * (3.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] + - particles_rhs[1][idx_part*size_particle_rhs + idx_dim])/2.; + } } - } - break; - case 3: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × (23[0] - 16[1] + 5[2])/12 - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * (23.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] - - 16.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] - + 5.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim])/12.; + break; + case 3: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × (23[0] - 16[1] + 5[2])/12 + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * (23.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] + - 16.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] + + 5.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim])/12.; + } } - } - break; - case 4: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × (55[0] - 59[1] + 37[2] - 9[3])/24 - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * (55.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] - - 59.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] - + 37.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] - - 9.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim])/24.; + break; + case 4: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × (55[0] - 59[1] + 37[2] - 9[3])/24 + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * (55.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] + - 59.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] + + 37.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] + - 9.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim])/24.; + } } - } - break; - case 5: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × (1901[0] - 2774[1] + 2616[2] - 1274[3] + 251[4])/720 - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * (1901.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] - - 2774.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] - + 2616.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] - - 1274.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim] - + 251.*particles_rhs[4][idx_part*size_particle_rhs + idx_dim])/720.; + break; + case 5: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × (1901[0] - 2774[1] + 2616[2] - 1274[3] + 251[4])/720 + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * (1901.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] + - 2774.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] + + 2616.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] + - 1274.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim] + + 251.*particles_rhs[4][idx_part*size_particle_rhs + idx_dim])/720.; + } } - } - break; - case 6: - for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){ - for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ - // dt × (4277[0] - 7923[1] + 9982[2] - 7298[3] + 2877[4] - 475[5])/1440 - particles_positions[idx_part*size_particle_positions + idx_dim] - += dt * (4277.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] - - 7923.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] - + 9982.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] - - 7298.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim] - + 2877.*particles_rhs[4][idx_part*size_particle_rhs + idx_dim] - - 475.*particles_rhs[5][idx_part*size_particle_rhs + idx_dim])/1440.; + break; + case 6: + for(int idx_part = interval.getMyOffset() ; idx_part < last_idx ; ++idx_part){ + for(int idx_dim = 0 ; idx_dim < size_particle_positions ; ++idx_dim){ + // dt × (4277[0] - 7923[1] + 9982[2] - 7298[3] + 2877[4] - 475[5])/1440 + particles_positions[idx_part*size_particle_positions + idx_dim] + += dt * (4277.*particles_rhs[0][idx_part*size_particle_rhs + idx_dim] + - 7923.*particles_rhs[1][idx_part*size_particle_rhs + idx_dim] + + 9982.*particles_rhs[2][idx_part*size_particle_rhs + idx_dim] + - 7298.*particles_rhs[3][idx_part*size_particle_rhs + idx_dim] + + 2877.*particles_rhs[4][idx_part*size_particle_rhs + idx_dim] + - 475.*particles_rhs[5][idx_part*size_particle_rhs + idx_dim])/1440.; + } } + break; } - break; - default: - throw std::runtime_error("Error, in bfps particles_adams_bashforth.\n" - "Step in particles_adams_bashforth is too large," - "you must add formulation up this number or limit the number of steps."); } } }; diff --git a/bfps/cpp/particles/particles_field_computer.hpp b/bfps/cpp/particles/particles_field_computer.hpp index e912f99f9e49a790efacb5d5e0ab3ed119ad68b0..80f4745070953509efee5af587230d6941287d14 100644 --- a/bfps/cpp/particles/particles_field_computer.hpp +++ b/bfps/cpp/particles/particles_field_computer.hpp @@ -32,7 +32,7 @@ class particles_field_computer : public abstract_particles_distr<real_number, 3, //////////////////////////////////////////////////////////////////////// virtual void init_result_array(real_number particles_current_rhs[], - const int nb_particles) final{ + const int nb_particles) const final{ // Set values to zero initialy std::fill(particles_current_rhs, particles_current_rhs+nb_particles*3, 0); } @@ -132,8 +132,7 @@ class particles_field_computer : public abstract_particles_distr<real_number, 3, } } - virtual void reduce_particles(const real_number /*particles_positions*/[], - real_number particles_current_rhs[], + virtual void reduce_particles_rhs(real_number particles_current_rhs[], const real_number extra_particles_current_rhs[], const int nb_particles) const final{ TIMEZONE("particles_field_computer::reduce_particles"); diff --git a/bfps/cpp/scope_timer.cpp b/bfps/cpp/scope_timer.cpp index 969b94432d75946aa680d75322b9ddfb782bbebc..61ddd89583fe8d53cee328c4267df603e128d417 100644 --- a/bfps/cpp/scope_timer.cpp +++ b/bfps/cpp/scope_timer.cpp @@ -4,5 +4,5 @@ #ifdef USE_TIMINGOUTPUT -scope_timer_manager global_timer_manager("BFPS", std::cout); +EventManager global_timer_manager("BFPS", std::cout); #endif diff --git a/bfps/cpp/scope_timer.hpp b/bfps/cpp/scope_timer.hpp index 5fd6578c9e9f593191a44049c4f425448a53b21a..88fc76e8c7950f535f3e7cfd4459992984cef6cf 100644 --- a/bfps/cpp/scope_timer.hpp +++ b/bfps/cpp/scope_timer.hpp @@ -37,145 +37,226 @@ #include <mpi.h> #include <cstring> #include <stdexcept> +#include <omp.h> #include "base.hpp" #include "bfps_timer.hpp" -//< To add it as friend of scope_timer_manager -class scope_timer; +//< To add it as friend of EventManager +class ScopeEvent; -class scope_timer_manager { +class EventManager { protected: class CoreEvent { - protected: - //< Name of the event (from the user) - const std::string name; - //< Previous events (stack of parents) - std::stack<std::shared_ptr<CoreEvent>> parentStack; - //< Current event children - std::vector<std::shared_ptr<CoreEvent>> children; - - //< Total execution time - double totalTime; - //< Minimum execution time - double minTime; - //< Maximum execution time - double maxTime; - //< Number of occurrence for this event - int occurrence; - - public: - /** Create a core-event from the name and the current stack */ - CoreEvent(const std::string& inName, - const std::stack<std::shared_ptr<CoreEvent>>& inParentStack) - : name(inName), - parentStack(inParentStack), - totalTime(0), - minTime(std::numeric_limits<double>::max()), - maxTime(std::numeric_limits<double>::min()), - occurrence(0) {} - - /** Add a record */ - void addRecord(const double inDuration) { - totalTime += inDuration; - occurrence += 1; - minTime = std::min(minTime, inDuration); - maxTime = std::max(maxTime, inDuration); + protected: + //< Name of the event (from the user) + const std::string m_name; + //< Previous events (stack of parents) + std::stack<CoreEvent*> m_parentStack; + //< Current event children + std::vector<CoreEvent*> m_children; + + //< Total execution time + double m_totalTime; + //< Minimum execution time + double m_minTime; + //< Maximum execution time + double m_maxTime; + //< Number of occurrence for this event + int m_occurrence; + //< Number of occurrence that are tasks for this event + int m_nbTasks; + //< Children lock + omp_lock_t m_childrenLock; + //< Children lock + omp_lock_t m_updateLock; + + public: + /** Create a core-event from the name and the current stack */ + CoreEvent(const std::string& inName, + const std::stack<CoreEvent*>& inParentStack) + : m_name(inName), + m_parentStack(inParentStack), + m_totalTime(0), + m_minTime(std::numeric_limits<double>::max()), + m_maxTime(std::numeric_limits<double>::min()), + m_occurrence(0), + m_nbTasks(0) { + omp_init_lock(&m_childrenLock); + omp_init_lock(&m_updateLock); + } + + ~CoreEvent() { + omp_destroy_lock(&m_childrenLock); + omp_destroy_lock(&m_updateLock); + } + + /** Add a record */ + void addRecord(const double inDuration, const bool isTask) { + #pragma omp atomic update + m_totalTime += inDuration; + #pragma omp atomic update + m_occurrence += 1; + #pragma omp flush // (m_minTime, m_maxTime) + if (inDuration < m_minTime || m_maxTime < inDuration) { + omp_set_lock(&m_updateLock); + m_minTime = std::min(m_minTime, inDuration); + m_maxTime = std::max(m_maxTime, inDuration); + omp_unset_lock(&m_updateLock); } - - const std::stack<std::shared_ptr<CoreEvent>>& getParents() const { - return parentStack; + if (isTask) { + #pragma omp atomic update + m_nbTasks += 1; } + } - void addChild(std::shared_ptr<CoreEvent>& inChild) { - children.push_back(inChild); - } + const std::stack<CoreEvent*>& getParents() const { return m_parentStack; } - const std::vector<std::shared_ptr<CoreEvent>>& getChildren() const { - return children; - } + std::stack<CoreEvent*>& getParents() { return m_parentStack; } - const std::string& getName() const { return name; } + void addChild(CoreEvent* inChild) { + omp_set_lock(&m_childrenLock); + m_children.push_back(inChild); + omp_unset_lock(&m_childrenLock); + } - double getMin() const { return minTime; } + //! Must not be called during a paralle execution + const std::vector<CoreEvent*>& getChildren() const { + assert(omp_in_parallel() == 0); + return m_children; + } - double getMax() const { return maxTime; } + const std::string& getName() const { return m_name; } - int getOccurrence() const { return occurrence; } + double getMin() const { return m_minTime; } - double getAverage() const { - return totalTime / static_cast<double>(occurrence); - } + double getMax() const { return m_maxTime; } + + int getOccurrence() const { return m_occurrence; } + + double getAverage() const { + return m_totalTime / static_cast<double>(m_occurrence); + } + + double getDuration() const { return m_totalTime; } - double getDuration() const { return totalTime; } + int getNbTasks() const { return m_nbTasks; } }; /////////////////////////////////////////////////////////////// - //< First event (root of all stacks) - std::shared_ptr<CoreEvent> root; + //< The main node + std::unique_ptr<CoreEvent> m_root; //< Output stream to print out - std::ostream& outputStream; + std::ostream& m_outputStream; - //< Current stack - std::stack<std::shared_ptr<CoreEvent>> currentEventsStack; - //< All recorded events - std::unordered_multimap<std::string, std::shared_ptr<CoreEvent>> records; + //< Current stack, there are one stack of stack per thread + std::vector<std::stack<std::stack<CoreEvent*>>> m_currentEventsStackPerThread; + //< All recorded events (that will then be delete at the end) + std::unordered_multimap<std::string, CoreEvent*> m_records; + //< Lock for m_records + omp_lock_t m_recordsLock; /** Find a event from its name. If such even does not exist * the function creates one. If an event with the same name exists * but with a different stack, a new one is created. * It pushes the returned event in the stack. */ - CoreEvent& getEvent(const std::string& inName, + CoreEvent* getEvent(const std::string& inName, const std::string& inUniqueKey) { const std::string completeName = inName + inUniqueKey; - std::shared_ptr<CoreEvent> foundEvent; + CoreEvent* foundEvent = nullptr; - auto range = records.equal_range(completeName); + omp_set_lock(&m_recordsLock); + // find all events with this name + auto range = m_records.equal_range(completeName); for (auto iter = range.first; iter != range.second; ++iter) { - if ((*iter).second->getParents() == currentEventsStack) { - foundEvent = (*iter).second; - break; - } + // events are equal if same name and same parents + if ((*iter).second->getParents() == + m_currentEventsStackPerThread[omp_get_thread_num()].top()) { + foundEvent = (*iter).second; + break; + } } + // Keep the lock to ensure that not two threads create the same event + if (!foundEvent) { - foundEvent.reset(new CoreEvent(inName, currentEventsStack)); - currentEventsStack.top()->addChild(foundEvent); - records.insert({completeName, foundEvent}); + // create this event + foundEvent = new CoreEvent( + inName, m_currentEventsStackPerThread[omp_get_thread_num()].top()); + m_currentEventsStackPerThread[omp_get_thread_num()].top().top()->addChild( + foundEvent); + m_records.insert({completeName, foundEvent}); } + omp_unset_lock(&m_recordsLock); + + m_currentEventsStackPerThread[omp_get_thread_num()].top().push(foundEvent); + return foundEvent; + } - currentEventsStack.push(foundEvent); - return (*foundEvent); + CoreEvent* getEventFromContext(const std::string& inName, + const std::string& inUniqueKey, + const std::stack<CoreEvent*>& inParentStack) { + m_currentEventsStackPerThread[omp_get_thread_num()].push(inParentStack); + return getEvent(inName, inUniqueKey); } /** Pop current event */ - void popEvent(const CoreEvent& eventToRemove) { - // Poped to many events, root event cannot be poped - assert(currentEventsStack.size() > 1); + void popEvent(const CoreEvent* eventToRemove) { + assert(m_currentEventsStackPerThread[omp_get_thread_num()].top().size() > 1); // Comparing address is cheaper - if (currentEventsStack.top().get() != &eventToRemove) { - throw std::runtime_error( - "You must end events (scope_timer/TIMEZONE) in order.\n" - "Please make sure that you only ask to the last event to finish."); + if (m_currentEventsStackPerThread[omp_get_thread_num()].top().top() != + eventToRemove) { + throw std::runtime_error( + "You must end events (ScopeEvent/TIMEZONE) in order.\n" + "Please make sure that you only ask to the last event to finish."); } - currentEventsStack.pop(); + m_currentEventsStackPerThread[omp_get_thread_num()].top().pop(); + } + + /** Pop current context */ + void popContext(const CoreEvent* eventToRemove) { + assert(m_currentEventsStackPerThread[omp_get_thread_num()].size() > 1); + assert(m_currentEventsStackPerThread[omp_get_thread_num()].top().size() > 1); + // Comparing address is cheaper + if (m_currentEventsStackPerThread[omp_get_thread_num()].top().top() != + eventToRemove) { + throw std::runtime_error( + "You must end events (ScopeEvent/TIMEZONE) in order.\n" + "Please make sure that you only ask to the last event to finish."); + } + m_currentEventsStackPerThread[omp_get_thread_num()].pop(); } public: /** Create an event manager */ - scope_timer_manager(const std::string& inAppName, std::ostream& inOutputStream) - : root( - new CoreEvent(inAppName, std::stack<std::shared_ptr<CoreEvent>>())), - outputStream(inOutputStream) { - currentEventsStack.push(root); + EventManager(const std::string& inAppName, std::ostream& inOutputStream) + : m_root(new CoreEvent(inAppName, std::stack<CoreEvent*>())), + m_outputStream(inOutputStream), + m_currentEventsStackPerThread(1) { + m_currentEventsStackPerThread[0].emplace(); + m_currentEventsStackPerThread[0].top().push(m_root.get()); + omp_init_lock(&m_recordsLock); + } + + ~EventManager() throw() { + assert(m_currentEventsStackPerThread[0].size() == 1); + + assert(m_currentEventsStackPerThread[0].top().size() == 1); + + omp_destroy_lock(&m_recordsLock); + + for (auto event : m_records) { + delete event.second; + } } - ~scope_timer_manager() throw() { - // Oups, the event-stack is corrupted, should be 1 - assert(currentEventsStack.size() == 1); + void startParallelRegion(const int inNbThreads) { + m_currentEventsStackPerThread.resize(1); + m_currentEventsStackPerThread.resize(inNbThreads, + m_currentEventsStackPerThread[0]); } void showDistributed(const MPI_Comm inComm) const { @@ -185,42 +266,42 @@ public: retMpi = MPI_Comm_size( inComm, &nbProcess); assert(retMpi == MPI_SUCCESS); - if((&outputStream == &std::cout || &outputStream == &std::clog) && myrank != nbProcess-1){ + if((&m_outputStream == &std::cout || &m_outputStream == &std::clog) && myrank != nbProcess-1){ // Print in reverse order char tmp; retMpi = MPI_Recv(&tmp, 1, MPI_BYTE, myrank+1, 99, inComm, MPI_STATUS_IGNORE); assert(retMpi == MPI_SUCCESS); } - outputStream.flush(); + m_outputStream.flush(); - std::stack<std::pair<int, const std::shared_ptr<CoreEvent>>> events; + std::stack<std::pair<int, const CoreEvent*>> events; - for (int idx = static_cast<int>(root->getChildren().size()) - 1; idx >= 0; --idx) { - events.push({0, root->getChildren()[idx]}); + for (int idx = static_cast<int>(m_root->getChildren().size()) - 1; idx >= 0; --idx) { + events.push({0, m_root->getChildren()[idx]}); } - outputStream << "[TIMING-" << myRank<< "] Local times.\n"; - outputStream << "[TIMING-" << myRank<< "] :" << root->getName() << "\n"; + m_outputStream << "[TIMING-" << myRank<< "] Local times.\n"; + m_outputStream << "[TIMING-" << myRank<< "] :" << m_root->getName() << "\n"; while (events.size()) { - const std::pair<int, const std::shared_ptr<CoreEvent>> eventToShow = + const std::pair<int, const CoreEvent*> eventToShow = events.top(); events.pop(); - outputStream << "[TIMING-" << myRank<< "] "; + m_outputStream << "[TIMING-" << myRank<< "] "; int offsetTab = eventToShow.first; while (offsetTab--) { - outputStream << "\t"; + m_outputStream << "\t"; } - outputStream << "@" << eventToShow.second->getName() << " = " << eventToShow.second->getDuration() << "s"; + m_outputStream << "@" << eventToShow.second->getName() << " = " << eventToShow.second->getDuration() << "s"; if (eventToShow.second->getOccurrence() != 1) { - outputStream << " (Min = " << eventToShow.second->getMin() << "s ; Max = " << eventToShow.second->getMax() + m_outputStream << " (Min = " << eventToShow.second->getMin() << "s ; Max = " << eventToShow.second->getMax() << "s ; Average = " << eventToShow.second->getAverage() << "s ; Occurrence = " << eventToShow.second->getOccurrence() << ")"; } - outputStream << "\n"; + m_outputStream << "\n"; for (int idx = static_cast<int>(eventToShow.second->getChildren().size()) - 1; idx >= 0; --idx) { @@ -228,9 +309,9 @@ public: {eventToShow.first + 1, eventToShow.second->getChildren()[idx]}); } } - outputStream.flush(); + m_outputStream.flush(); - if((&outputStream == &std::cout || &outputStream == &std::clog) && myrank != 0){ + if((&m_outputStream == &std::cout || &m_outputStream == &std::clog) && myrank != 0){ // Print in reverse order char tmp; retMpi = MPI_Send(&tmp, 1, MPI_BYTE, myrank-1, 99, inComm); @@ -238,26 +319,30 @@ public: } } - void show(const MPI_Comm inComm) const { + void show(const MPI_Comm inComm, const bool onlyP0 = true) const { int myRank, nbProcess; int retMpi = MPI_Comm_rank( inComm, &myRank); assert(retMpi == MPI_SUCCESS); retMpi = MPI_Comm_size( inComm, &nbProcess); assert(retMpi == MPI_SUCCESS); + if(onlyP0 && myRank != 0){ + return; + } + std::stringstream myResults; - std::stack<std::pair<int, const std::shared_ptr<CoreEvent>>> events; + std::stack<std::pair<int, const CoreEvent*>> events; - for (int idx = static_cast<int>(root->getChildren().size()) - 1; idx >= 0; --idx) { - events.push({0, root->getChildren()[idx]}); + for (int idx = static_cast<int>(m_root->getChildren().size()) - 1; idx >= 0; --idx) { + events.push({0, m_root->getChildren()[idx]}); } myResults << "[TIMING-" << myRank<< "] Local times.\n"; - myResults << "[TIMING-" << myRank<< "] :" << root->getName() << "\n"; + myResults << "[TIMING-" << myRank<< "] :" << m_root->getName() << "\n"; while (events.size()) { - const std::pair<int, const std::shared_ptr<CoreEvent>> eventToShow = + const std::pair<int, const CoreEvent*> eventToShow = events.top(); events.pop(); @@ -292,19 +377,21 @@ public: assert(retMpi == MPI_SUCCESS); } else{ - std::vector<char> buffer; - for(int idxProc = nbProcess-1 ; idxProc > 0 ; --idxProc){ - int sizeRecv; - retMpi = MPI_Recv(&sizeRecv, 1, MPI_INT, idxProc, 99, inComm, MPI_STATUS_IGNORE); - assert(retMpi == MPI_SUCCESS); - buffer.resize(sizeRecv+1); - retMpi = MPI_Recv(buffer.data(), sizeRecv, MPI_CHAR, idxProc, 100, inComm, MPI_STATUS_IGNORE); - assert(retMpi == MPI_SUCCESS); - buffer[sizeRecv]='\0'; - outputStream << buffer.data(); - } - outputStream << myResults.str(); - outputStream.flush(); + if(onlyP0 == false){ + std::vector<char> buffer; + for(int idxProc = nbProcess-1 ; idxProc > 0 ; --idxProc){ + int sizeRecv; + retMpi = MPI_Recv(&sizeRecv, 1, MPI_INT, idxProc, 99, inComm, MPI_STATUS_IGNORE); + assert(retMpi == MPI_SUCCESS); + buffer.resize(sizeRecv+1); + retMpi = MPI_Recv(buffer.data(), sizeRecv, MPI_CHAR, idxProc, 100, inComm, MPI_STATUS_IGNORE); + assert(retMpi == MPI_SUCCESS); + buffer[sizeRecv]='\0'; + m_outputStream << buffer.data(); + } + } + m_outputStream << myResults.str(); + m_outputStream.flush(); } } @@ -321,9 +408,9 @@ public: // Convert my events into sendable object std::vector<SerializedEvent> myEvents; - myEvents.reserve(records.size()); + myEvents.reserve(m_records.size()); - for(const std::pair<std::string, std::shared_ptr<CoreEvent>>& event : records){ + for(const std::pair<std::string, const CoreEvent*>& event : m_records){ myEvents.emplace_back(); SerializedEvent& current_event = myEvents.back(); @@ -334,7 +421,7 @@ public: strncpy(current_event.name, event.second->getName().c_str(), 128); std::stringstream path; - std::stack<std::shared_ptr<CoreEvent>> parents = event.second->getParents(); + std::stack<CoreEvent*> parents = event.second->getParents(); while(parents.size()){ path << parents.top()->getName() << " << "; parents.pop(); @@ -426,25 +513,29 @@ public: } } - outputStream << "[MPI-TIMING] Mpi times.\n"; + m_outputStream << "[MPI-TIMING] Mpi times.\n"; for(const auto& iter : mapEvents){ const GlobalEvent& gevent = iter.second; - outputStream << "[MPI-TIMING] @" << gevent.name << "\n"; - outputStream << "[MPI-TIMING] Stack => " << gevent.path << "\n"; - outputStream << "[MPI-TIMING] \t Done by " << gevent.nbProcess << " processes\n"; - outputStream << "[MPI-TIMING] \t Total time for all " << gevent.totalTime + m_outputStream << "[MPI-TIMING] @" << gevent.name << "\n"; + m_outputStream << "[MPI-TIMING] Stack => " << gevent.path << "\n"; + m_outputStream << "[MPI-TIMING] \t Done by " << gevent.nbProcess << " processes\n"; + m_outputStream << "[MPI-TIMING] \t Total time for all " << gevent.totalTime << "s (average per process " << gevent.totalTime/gevent.nbProcess << "s)\n"; - outputStream << "[MPI-TIMING] \t Min time for a process " << gevent.minTimeProcess + m_outputStream << "[MPI-TIMING] \t Min time for a process " << gevent.minTimeProcess << "s Max time for a process " << gevent.maxTimeProcess << "s\n"; - outputStream << "[MPI-TIMING] \t The same call has been done " << gevent.occurrence + m_outputStream << "[MPI-TIMING] \t The same call has been done " << gevent.occurrence << " times by all process (duration min " << gevent.minTime << "s max " << gevent.maxTime << "s avg " << gevent.totalTime/gevent.occurrence << "s)\n"; } } - outputStream.flush(); + m_outputStream.flush(); + } + + std::stack<CoreEvent*> getCurrentThreadEvent() const { + return m_currentEventsStackPerThread[omp_get_thread_num()].top(); } - friend scope_timer; + friend ScopeEvent; }; /////////////////////////////////////////////////////////////// @@ -457,31 +548,49 @@ public: * The object cannot be copied/moved to ensure coherency in the * events hierarchy. */ -class scope_timer { +class ScopeEvent { protected: //< The manager to refer to - scope_timer_manager& manager; + EventManager& m_manager; //< The core event - scope_timer_manager::CoreEvent& event; + EventManager::CoreEvent* m_event; //< Time to get elapsed time - bfps_timer timer; + bfps_timer m_timer; + //< Is true if it has been created for task + bool m_isTask; public: - scope_timer(const std::string& inName, scope_timer_manager& inManager, - const std::string& inUniqueKey) - : manager(inManager), event(inManager.getEvent(inName, inUniqueKey)) { - timer.start(); + ScopeEvent(const std::string& inName, EventManager& inManager, + const std::string& inUniqueKey) + : m_manager(inManager), + m_event(inManager.getEvent(inName, inUniqueKey)), + m_isTask(false) { + m_timer.start(); } - ~scope_timer() { - event.addRecord(timer.stopAndGetElapsed()); - manager.popEvent(event); + ScopeEvent(const std::string& inName, EventManager& inManager, + const std::string& inUniqueKey, + const std::stack<EventManager::CoreEvent*>& inParentStack) + : m_manager(inManager), + m_event( + inManager.getEventFromContext(inName, inUniqueKey, inParentStack)), + m_isTask(true) { + m_timer.start(); } - scope_timer(const scope_timer&) = delete; - scope_timer& operator=(const scope_timer&) = delete; - scope_timer(scope_timer&&) = delete; - scope_timer& operator=(scope_timer&&) = delete; + ~ScopeEvent() { + m_event->addRecord(m_timer.stopAndGetElapsed(), m_isTask); + if (m_isTask == false) { + m_manager.popEvent(m_event); + } else { + m_manager.popContext(m_event); + } + } + + ScopeEvent(const ScopeEvent&) = delete; + ScopeEvent& operator=(const ScopeEvent&) = delete; + ScopeEvent(ScopeEvent&&) = delete; + ScopeEvent& operator=(ScopeEvent&&) = delete; }; #define ScopeEventUniqueKey_Core_To_Str_Ext(X) #X @@ -493,22 +602,38 @@ public: #ifdef USE_TIMINGOUTPUT -extern scope_timer_manager global_timer_manager; +extern EventManager global_timer_manager; #define TIMEZONE_Core_Merge(x, y) x##y #define TIMEZONE_Core_Pre_Merge(x, y) TIMEZONE_Core_Merge(x, y) #define TIMEZONE(NAME) \ - scope_timer TIMEZONE_Core_Pre_Merge(____TIMEZONE_AUTO_ID, __LINE__)( \ - NAME, global_timer_manager, ScopeEventUniqueKey); + ScopeEvent TIMEZONE_Core_Pre_Merge(____TIMEZONE_AUTO_ID, __LINE__)( \ + NAME, global_timer_manager, ScopeEventUniqueKey); #define TIMEZONE_MULTI_REF(NAME) \ - scope_timer TIMEZONE_Core_Pre_Merge(____TIMEZONE_AUTO_ID, __LINE__)( \ - NAME, global_timer_manager, ScopeEventMultiRefKey); + ScopeEvent TIMEZONE_Core_Pre_Merge(____TIMEZONE_AUTO_ID, __LINE__)( \ + NAME, global_timer_manager, ScopeEventMultiRefKey); + +#define TIMEZONE_OMP_INIT_PRETASK(VARNAME) \ + auto VARNAME##core = global_timer_manager.getCurrentThreadEvent(); \ + auto VARNAME = &VARNAME##core; +#define TIMEZONE_OMP_TASK(NAME, VARNAME) \ + ScopeEvent TIMEZONE_Core_Pre_Merge(____TIMEZONE_AUTO_ID, __LINE__)( \ + NAME, global_timer_manager, ScopeEventUniqueKey, *VARNAME); +#define TIMEZONE_OMP_PRAGMA_TASK_KEY(VARNAME) \ + shared(global_timer_manager) firstprivate(VARNAME) + +#define TIMEZONE_OMP_INIT_PREPARALLEL(NBTHREADS) \ + global_timer_manager.startParallelRegion(NBTHREADS); #else #define TIMEZONE(NAME) #define TIMEZONE_MULTI_REF(NAME) +#define TIMEZONE_OMP_INIT_PRETASK(VARNAME) +#define TIMEZONE_OMP_TASK(NAME, VARNAME) +#define TIMEZONE_OMP_PRAGMA_TASK_KEY(VARNAME) +#define TIMEZONE_OMP_INIT_PREPARALLEL(NBTHREADS) #endif