-
Cristian Lalescu authored
We now initialize the particle file layout as required.
Cristian Lalescu authoredWe now initialize the particle file layout as required.
abstract_particles_output.hpp 15.71 KiB
/******************************************************************************
* *
* Copyright 2019 Max Planck Institute for Dynamics and Self-Organization *
* *
* This file is part of TurTLE. *
* *
* TurTLE is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published *
* by the Free Software Foundation, either version 3 of the License, *
* or (at your option) any later version. *
* *
* TurTLE is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with TurTLE. If not, see <http://www.gnu.org/licenses/> *
* *
* Contact: Cristian.Lalescu@ds.mpg.de *
* *
******************************************************************************/
#ifndef ABSTRACT_PARTICLES_OUTPUT
#define ABSTRACT_PARTICLES_OUTPUT
#include <memory>
#include <vector>
#include <cassert>
#include <algorithm>
#include <cstddef>
#include "base.hpp"
#include "particles_utils.hpp"
#include "alltoall_exchanger.hpp"
#include "scope_timer.hpp"
#include "env_utils.hpp"
template <class partsize_t, class real_number, int size_particle_positions>
class abstract_particles_output {
MPI_Comm mpi_com;
MPI_Comm mpi_com_writer;
int my_rank;
int nb_processes;
const partsize_t total_nb_particles;
const int nb_rhs;
std::unique_ptr<std::pair<partsize_t,partsize_t>[]> buffer_indexes_send;
std::unique_ptr<real_number[]> buffer_particles_positions_send;
std::vector<std::unique_ptr<real_number[]>> buffer_particles_rhs_send;
partsize_t size_buffers_send;
int buffers_size_particle_rhs_send;
std::unique_ptr<real_number[]> buffer_particles_positions_recv;
std::vector<std::unique_ptr<real_number[]>> buffer_particles_rhs_recv;
std::unique_ptr<partsize_t[]> buffer_indexes_recv;
partsize_t size_buffers_recv;
int buffers_size_particle_rhs_recv;
int nb_processes_involved;
bool current_is_involved;
partsize_t particles_chunk_per_process;
partsize_t particles_chunk_current_size;
partsize_t particles_chunk_current_offset;
protected:
MPI_Comm& getCom(){
return mpi_com;
}
MPI_Comm& getComWriter(){
return mpi_com_writer;
}
int getNbRhs() const {
return nb_rhs;
}
int getMyRank(){
return this->my_rank;
}
bool isInvolved() const{
return current_is_involved;
}
public:
abstract_particles_output(MPI_Comm in_mpi_com, const partsize_t inTotalNbParticles, const int in_nb_rhs) throw()
: mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
total_nb_particles(inTotalNbParticles), nb_rhs(in_nb_rhs),
buffer_particles_rhs_send(in_nb_rhs), size_buffers_send(0),
buffers_size_particle_rhs_send(0),
buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(0),
buffers_size_particle_rhs_recv(0),
nb_processes_involved(0), current_is_involved(true), particles_chunk_per_process(0),
particles_chunk_current_size(0), particles_chunk_current_offset(0) {
AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));
const size_t MinBytesPerProcess = env_utils::GetValue<size_t>("BFPS_PO_MIN_BYTES", 32 * 1024 * 1024); // Default 32MB
const size_t ChunkBytes = env_utils::GetValue<size_t>("BFPS_PO_CHUNK_BYTES", 8 * 1024 * 1024); // Default 8MB
const int MaxProcessesInvolved = std::min(nb_processes, env_utils::GetValue<int>("BFPS_PO_MAX_PROCESSES", 128));
// We split the processes using positions size only
const size_t totalBytesForPositions = total_nb_particles*size_particle_positions*sizeof(real_number);
if(MinBytesPerProcess*MaxProcessesInvolved < totalBytesForPositions){
size_t extraChunkBytes = 1;
while((MinBytesPerProcess+extraChunkBytes*ChunkBytes)*MaxProcessesInvolved < totalBytesForPositions){
extraChunkBytes += 1;
}
const size_t bytesPerProcess = (MinBytesPerProcess+extraChunkBytes*ChunkBytes);
particles_chunk_per_process = partsize_t((bytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions));
nb_processes_involved = int((total_nb_particles+particles_chunk_per_process-1)/particles_chunk_per_process);
}
// else limit based on minBytesPerProcess
else{
nb_processes_involved = std::max(1,std::min(MaxProcessesInvolved,int((totalBytesForPositions+MinBytesPerProcess-1)/MinBytesPerProcess)));
particles_chunk_per_process = partsize_t((MinBytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions));
}
// Print out
if(my_rank == 0){
DEBUG_MSG("[INFO] Limit of processes involved in the particles ouput = %d (BFPS_PO_MAX_PROCESSES)\n", MaxProcessesInvolved);
DEBUG_MSG("[INFO] Minimum bytes per process to write = %llu (BFPS_PO_MIN_BYTES) for a complete output of = %llu for positions\n", MinBytesPerProcess, totalBytesForPositions);
DEBUG_MSG("[INFO] Consequently, there are %d processes that actually write data (%d particles per process)\n", nb_processes_involved, particles_chunk_per_process);
}
if(my_rank < nb_processes_involved){
current_is_involved = true;
particles_chunk_current_offset = my_rank*particles_chunk_per_process;
assert(particles_chunk_current_offset < total_nb_particles);
particles_chunk_current_size = std::min(particles_chunk_per_process, total_nb_particles-particles_chunk_current_offset);
assert(particles_chunk_current_offset + particles_chunk_current_size <= total_nb_particles);
assert(my_rank != nb_processes_involved-1 || particles_chunk_current_offset + particles_chunk_current_size == total_nb_particles);
}
else{
current_is_involved = false;
particles_chunk_current_size = 0;
particles_chunk_current_offset = total_nb_particles;
}
AssertMpi( MPI_Comm_split(mpi_com,
(current_is_involved ? 1 : MPI_UNDEFINED),
my_rank, &mpi_com_writer) );
}
virtual ~abstract_particles_output() noexcept(false){
if(current_is_involved){
AssertMpi( MPI_Comm_free(&mpi_com_writer) );
}
}
partsize_t getTotalNbParticles() const {
return total_nb_particles;
}
void releaseMemory(){
delete[] buffer_indexes_send.release();
delete[] buffer_particles_positions_send.release();
size_buffers_send = 0;
delete[] buffer_indexes_recv.release();
delete[] buffer_particles_positions_recv.release();
size_buffers_recv = 0;
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
delete[] buffer_particles_rhs_send[idx_rhs].release();
delete[] buffer_particles_rhs_recv[idx_rhs].release();
}
buffers_size_particle_rhs_send = 0;
buffers_size_particle_rhs_recv = 0;
}
template <int size_particle_rhs>
void save(
const real_number input_particles_positions[],
const std::unique_ptr<real_number[]> input_particles_rhs[],
const partsize_t index_particles[],
const partsize_t nb_particles,
const int idx_time_step){
TIMEZONE("abstract_particles_output::save");
assert(total_nb_particles != -1);
{
TIMEZONE("sort-to-distribute");
if(size_buffers_send < nb_particles){
size_buffers_send = nb_particles;
buffer_indexes_send.reset(new std::pair<partsize_t,partsize_t>[size_buffers_send]);
buffer_particles_positions_send.reset(new real_number[size_buffers_send*size_particle_positions]);
if(buffers_size_particle_rhs_send < size_particle_rhs){
buffers_size_particle_rhs_send = size_particle_rhs;
}
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
}
}
else if(buffers_size_particle_rhs_send < size_particle_rhs){
buffers_size_particle_rhs_send = size_particle_rhs;
if(size_buffers_send > 0){
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
}
}
}
for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
buffer_indexes_send[idx_part].first = idx_part;
buffer_indexes_send[idx_part].second = index_particles[idx_part];
}
std::sort(&buffer_indexes_send[0], &buffer_indexes_send[nb_particles], [](const std::pair<partsize_t,partsize_t>& p1,
const std::pair<partsize_t,partsize_t>& p2){
return p1.second < p2.second;
});
for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
const partsize_t src_idx = buffer_indexes_send[idx_part].first;
const partsize_t dst_idx = idx_part;
for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
= input_particles_positions[src_idx*size_particle_positions + idx_val];
}
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
for(int idx_val = 0 ; idx_val < int(size_particle_rhs) ; ++idx_val){
buffer_particles_rhs_send[idx_rhs][dst_idx*size_particle_rhs + idx_val]
= input_particles_rhs[idx_rhs][src_idx*size_particle_rhs + idx_val];
}
}
}
}
partsize_t* buffer_indexes_send_tmp = reinterpret_cast<partsize_t*>(buffer_indexes_send.get());// trick re-use buffer_indexes_send memory
std::vector<partsize_t> nb_particles_to_send(nb_processes, 0);
for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
const int dest_proc = int(buffer_indexes_send[idx_part].second/particles_chunk_per_process);
assert(dest_proc < nb_processes_involved);
nb_particles_to_send[dest_proc] += 1;
buffer_indexes_send_tmp[idx_part] = buffer_indexes_send[idx_part].second;
}
alltoall_exchanger exchanger(mpi_com, std::move(nb_particles_to_send));
// nb_particles_to_send is invalid after here
const int nb_to_receive = exchanger.getTotalToRecv();
assert(nb_to_receive == particles_chunk_current_size);
if(size_buffers_recv < nb_to_receive){
size_buffers_recv = nb_to_receive;
buffer_indexes_recv.reset(new partsize_t[size_buffers_recv]);
buffer_particles_positions_recv.reset(new real_number[size_buffers_recv*size_particle_positions]);
buffers_size_particle_rhs_recv = size_particle_rhs;
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
buffer_particles_rhs_recv[idx_rhs].reset(new real_number[size_buffers_recv*buffers_size_particle_rhs_recv]);
}
}
else if(buffers_size_particle_rhs_recv < size_particle_rhs){
buffers_size_particle_rhs_recv = size_particle_rhs;
if(size_buffers_recv > 0){
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
buffer_particles_rhs_recv[idx_rhs].reset(new real_number[size_buffers_recv*buffers_size_particle_rhs_recv]);
}
}
}
{
TIMEZONE("exchange");
// Could be done with multiple asynchronous coms
exchanger.alltoallv<partsize_t>(buffer_indexes_send_tmp, buffer_indexes_recv.get());
exchanger.alltoallv<real_number>(buffer_particles_positions_send.get(), buffer_particles_positions_recv.get(), size_particle_positions);
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
exchanger.alltoallv<real_number>(buffer_particles_rhs_send[idx_rhs].get(), buffer_particles_rhs_recv[idx_rhs].get(), size_particle_rhs);
}
}
// Stop here if not involved
if(current_is_involved == false){
assert(nb_to_receive == 0);
return;
}
if(size_buffers_send < nb_to_receive){
size_buffers_send = nb_to_receive;
buffer_indexes_send.reset(new std::pair<partsize_t,partsize_t>[size_buffers_send]);
buffer_particles_positions_send.reset(new real_number[size_buffers_send*size_particle_positions]);
buffers_size_particle_rhs_send = size_particle_rhs;
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
}
}
{
TIMEZONE("copy-local-order");
for(partsize_t idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
const partsize_t src_idx = idx_part;
const partsize_t dst_idx = buffer_indexes_recv[idx_part]-particles_chunk_current_offset;
assert(0 <= dst_idx);
assert(dst_idx < particles_chunk_current_size);
for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
= buffer_particles_positions_recv[src_idx*size_particle_positions + idx_val];
}
for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
for(int idx_val = 0 ; idx_val < int(size_particle_rhs) ; ++idx_val){
buffer_particles_rhs_send[idx_rhs][dst_idx*size_particle_rhs + idx_val]
= buffer_particles_rhs_recv[idx_rhs][src_idx*size_particle_rhs + idx_val];
}
}
}
}
write(idx_time_step, buffer_particles_positions_send.get(), buffer_particles_rhs_send.data(),
nb_to_receive, particles_chunk_current_offset, size_particle_rhs);
}
virtual void write(const int idx_time_step, const real_number* positions, const std::unique_ptr<real_number[]>* rhs,
const partsize_t nb_particles, const partsize_t particles_idx_offset, const int size_particle_rhs) = 0;
};
#endif