Skip to content
Snippets Groups Projects

Feature/limited particles output

Merged Berenger Bramas requested to merge feature/limited-particles-output into develop
6 files
+ 301
124
Compare changes
  • Side-by-side
  • Inline
Files
6
@@ -11,11 +11,12 @@
@@ -11,11 +11,12 @@
#include "particles_utils.hpp"
#include "particles_utils.hpp"
#include "alltoall_exchanger.hpp"
#include "alltoall_exchanger.hpp"
#include "scope_timer.hpp"
#include "scope_timer.hpp"
#include "env_utils.hpp"
template <class real_number, int size_particle_positions, int size_particle_rhs>
template <class real_number, int size_particle_positions, int size_particle_rhs>
class abstract_particles_output {
class abstract_particles_output {
MPI_Comm mpi_com;
MPI_Comm mpi_com;
 
MPI_Comm mpi_com_writer;
int my_rank;
int my_rank;
int nb_processes;
int nb_processes;
@@ -33,10 +34,15 @@ class abstract_particles_output {
@@ -33,10 +34,15 @@ class abstract_particles_output {
std::unique_ptr<int[]> buffer_indexes_recv;
std::unique_ptr<int[]> buffer_indexes_recv;
int size_buffers_recv;
int size_buffers_recv;
 
int nb_processes_involved;
 
bool current_is_involved;
 
int particles_chunk_per_process;
 
int particles_chunk_current_size;
 
int particles_chunk_current_offset;
protected:
protected:
MPI_Comm& getCom(){
MPI_Comm& getComWriter(){
return mpi_com;
return mpi_com_writer;
}
}
int getTotalNbParticles() const {
int getTotalNbParticles() const {
@@ -51,18 +57,75 @@ protected:
@@ -51,18 +57,75 @@ protected:
return this->my_rank;
return this->my_rank;
}
}
 
bool isInvolved() const{
 
return current_is_involved;
 
}
 
public:
public:
abstract_particles_output(MPI_Comm in_mpi_com, const int inTotalNbParticles, const int in_nb_rhs)
abstract_particles_output(MPI_Comm in_mpi_com, const int inTotalNbParticles, const int in_nb_rhs)
: mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
: mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
total_nb_particles(inTotalNbParticles), nb_rhs(in_nb_rhs),
total_nb_particles(inTotalNbParticles), nb_rhs(in_nb_rhs),
buffer_particles_rhs_send(in_nb_rhs), size_buffers_send(-1),
buffer_particles_rhs_send(in_nb_rhs), size_buffers_send(-1),
buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(-1){
buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(-1),
 
nb_processes_involved(0), current_is_involved(true), particles_chunk_per_process(0),
 
particles_chunk_current_size(0), particles_chunk_current_offset(0){
AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));
AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));
 
 
const size_t MinBytesPerProcess = env_utils::GetValue<size_t>("BFPS_PO_MIN_BYTES", 32 * 1024 * 1024); // Default 32MB
 
const size_t ChunkBytes = env_utils::GetValue<size_t>("BFPS_PO_CHUNK_BYTES", 8 * 1024 * 1024); // Default 8MB
 
const int MaxProcessesInvolved = std::min(nb_processes, env_utils::GetValue<int>("BFPS_PO_MAX_PROCESSES", 128));
 
 
// We split the processes using positions size only
 
const size_t totalBytesForPositions = total_nb_particles*size_particle_positions*sizeof(real_number);
 
 
 
if(MinBytesPerProcess*MaxProcessesInvolved < totalBytesForPositions){
 
size_t extraChunkBytes = 1;
 
while((MinBytesPerProcess+extraChunkBytes*ChunkBytes)*MaxProcessesInvolved < totalBytesForPositions){
 
extraChunkBytes += 1;
 
}
 
const size_t bytesPerProcess = (MinBytesPerProcess+extraChunkBytes*ChunkBytes);
 
particles_chunk_per_process = (bytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions);
 
nb_processes_involved = (total_nb_particles+particles_chunk_per_process-1)/particles_chunk_per_process;
 
}
 
// else limit based on minBytesPerProcess
 
else{
 
nb_processes_involved = std::max(1,std::min(MaxProcessesInvolved,int((totalBytesForPositions+MinBytesPerProcess-1)/MinBytesPerProcess)));
 
particles_chunk_per_process = (MinBytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions);
 
}
 
 
// Print out
 
if(my_rank == 0){
 
DEBUG_MSG("[INFO] Limit of processes involved in the particles ouput = %d (BFPS_PO_MAX_PROCESSES)\n", MaxProcessesInvolved);
 
DEBUG_MSG("[INFO] Minimum bytes per process to write = %llu (BFPS_PO_MIN_BYTES) for a complete output of = %llu for positions\n", MinBytesPerProcess, totalBytesForPositions);
 
DEBUG_MSG("[INFO] Consequently, there are %d processes that actually write data (%d particles per process)\n", nb_processes_involved, particles_chunk_per_process);
 
}
 
 
if(my_rank < nb_processes_involved){
 
current_is_involved = true;
 
particles_chunk_current_offset = my_rank*particles_chunk_per_process;
 
assert(particles_chunk_current_offset < total_nb_particles);
 
particles_chunk_current_size = std::min(particles_chunk_per_process, total_nb_particles-particles_chunk_current_offset);
 
assert(particles_chunk_current_offset + particles_chunk_current_size <= total_nb_particles);
 
assert(my_rank != nb_processes_involved-1 || particles_chunk_current_offset + particles_chunk_current_size == total_nb_particles);
 
}
 
else{
 
current_is_involved = false;
 
particles_chunk_current_size = 0;
 
particles_chunk_current_offset = total_nb_particles;
 
}
 
 
AssertMpi( MPI_Comm_split(mpi_com,
 
(current_is_involved ? 1 : MPI_UNDEFINED),
 
my_rank, &mpi_com_writer) );
}
}
virtual ~abstract_particles_output(){
virtual ~abstract_particles_output(){
 
if(current_is_involved){
 
AssertMpi( MPI_Comm_free(&mpi_com_writer) );
 
}
}
}
void releaseMemory(){
void releaseMemory(){
@@ -125,12 +188,12 @@ public:
@@ -125,12 +188,12 @@ public:
}
}
}
}
const particles_utils::IntervalSplitter<int> particles_splitter(total_nb_particles, nb_processes, my_rank);
int* buffer_indexes_send_tmp = reinterpret_cast<int*>(buffer_indexes_send.get());// trick re-use buffer_indexes_send memory
int* buffer_indexes_send_tmp = reinterpret_cast<int*>(buffer_indexes_send.get());// trick re-use buffer_indexes_send memory
std::vector<int> nb_particles_to_send(nb_processes, 0);
std::vector<int> nb_particles_to_send(nb_processes, 0);
for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
nb_particles_to_send[particles_splitter.getOwner(buffer_indexes_send[idx_part].second)] += 1;
const int dest_proc = buffer_indexes_send[idx_part].second/particles_chunk_per_process;
 
assert(dest_proc < nb_processes_involved);
 
nb_particles_to_send[dest_proc] += 1;
buffer_indexes_send_tmp[idx_part] = buffer_indexes_send[idx_part].second;
buffer_indexes_send_tmp[idx_part] = buffer_indexes_send[idx_part].second;
}
}
@@ -138,7 +201,7 @@ public:
@@ -138,7 +201,7 @@ public:
// nb_particles_to_send is invalid after here
// nb_particles_to_send is invalid after here
const int nb_to_receive = exchanger.getTotalToRecv();
const int nb_to_receive = exchanger.getTotalToRecv();
assert(nb_to_receive == particles_splitter.getMySize());
assert(nb_to_receive == particles_chunk_current_size);
if(size_buffers_recv < nb_to_receive && nb_to_receive){
if(size_buffers_recv < nb_to_receive && nb_to_receive){
buffer_indexes_recv.reset(new int[nb_to_receive]);
buffer_indexes_recv.reset(new int[nb_to_receive]);
@@ -159,6 +222,12 @@ public:
@@ -159,6 +222,12 @@ public:
}
}
}
}
 
// Stop here if not involved
 
if(current_is_involved == false){
 
assert(nb_to_receive == 0);
 
return;
 
}
 
if(size_buffers_send < nb_to_receive && nb_to_receive){
if(size_buffers_send < nb_to_receive && nb_to_receive){
buffer_indexes_send.reset(new std::pair<int,int>[nb_to_receive]);
buffer_indexes_send.reset(new std::pair<int,int>[nb_to_receive]);
buffer_particles_positions_send.reset(new real_number[nb_to_receive*size_particle_positions]);
buffer_particles_positions_send.reset(new real_number[nb_to_receive*size_particle_positions]);
@@ -172,9 +241,9 @@ public:
@@ -172,9 +241,9 @@ public:
TIMEZONE("copy-local-order");
TIMEZONE("copy-local-order");
for(int idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
for(int idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
const int src_idx = idx_part;
const int src_idx = idx_part;
const int dst_idx = buffer_indexes_recv[idx_part]-particles_splitter.getMyOffset();
const int dst_idx = buffer_indexes_recv[idx_part]-particles_chunk_current_offset;
assert(0 <= dst_idx);
assert(0 <= dst_idx);
assert(dst_idx < particles_splitter.getMySize());
assert(dst_idx < particles_chunk_current_size);
for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
@@ -190,7 +259,7 @@ public:
@@ -190,7 +259,7 @@ public:
}
}
write(idx_time_step, buffer_particles_positions_send.get(), buffer_particles_rhs_send.data(),
write(idx_time_step, buffer_particles_positions_send.get(), buffer_particles_rhs_send.data(),
nb_to_receive, particles_splitter.getMyOffset());
nb_to_receive, particles_chunk_current_offset);
}
}
virtual void write(const int idx_time_step, const real_number* positions, const std::unique_ptr<real_number[]>* rhs,
virtual void write(const int idx_time_step, const real_number* positions, const std::unique_ptr<real_number[]>* rhs,
Loading