Commit 8305457c authored by Berenger Bramas's avatar Berenger Bramas
Browse files

Merge branch 'feature/limited-particles-output' into 'develop'

Feature/limited particles output

See merge request !9
parents a2e3236f 4b9289fa
......@@ -11,11 +11,12 @@
#include "particles_utils.hpp"
#include "alltoall_exchanger.hpp"
#include "scope_timer.hpp"
#include "env_utils.hpp"
template <class real_number, int size_particle_positions, int size_particle_rhs>
class abstract_particles_output {
MPI_Comm mpi_com;
MPI_Comm mpi_com_writer;
int my_rank;
int nb_processes;
......@@ -33,10 +34,15 @@ class abstract_particles_output {
std::unique_ptr<int[]> buffer_indexes_recv;
int size_buffers_recv;
int nb_processes_involved;
bool current_is_involved;
int particles_chunk_per_process;
int particles_chunk_current_size;
int particles_chunk_current_offset;
protected:
MPI_Comm& getCom(){
return mpi_com;
MPI_Comm& getComWriter(){
return mpi_com_writer;
}
int getTotalNbParticles() const {
......@@ -51,18 +57,75 @@ protected:
return this->my_rank;
}
bool isInvolved() const{
return current_is_involved;
}
public:
abstract_particles_output(MPI_Comm in_mpi_com, const int inTotalNbParticles, const int in_nb_rhs)
: mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
total_nb_particles(inTotalNbParticles), nb_rhs(in_nb_rhs),
buffer_particles_rhs_send(in_nb_rhs), size_buffers_send(-1),
buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(-1){
buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(-1),
nb_processes_involved(0), current_is_involved(true), particles_chunk_per_process(0),
particles_chunk_current_size(0), particles_chunk_current_offset(0){
AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));
const size_t MinBytesPerProcess = env_utils::GetValue<size_t>("BFPS_PO_MIN_BYTES", 32 * 1024 * 1024); // Default 32MB
const size_t ChunkBytes = env_utils::GetValue<size_t>("BFPS_PO_CHUNK_BYTES", 8 * 1024 * 1024); // Default 8MB
const int MaxProcessesInvolved = std::min(nb_processes, env_utils::GetValue<int>("BFPS_PO_MAX_PROCESSES", 128));
// We split the processes using positions size only
const size_t totalBytesForPositions = total_nb_particles*size_particle_positions*sizeof(real_number);
if(MinBytesPerProcess*MaxProcessesInvolved < totalBytesForPositions){
size_t extraChunkBytes = 1;
while((MinBytesPerProcess+extraChunkBytes*ChunkBytes)*MaxProcessesInvolved < totalBytesForPositions){
extraChunkBytes += 1;
}
const size_t bytesPerProcess = (MinBytesPerProcess+extraChunkBytes*ChunkBytes);
particles_chunk_per_process = (bytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions);
nb_processes_involved = (total_nb_particles+particles_chunk_per_process-1)/particles_chunk_per_process;
}
// else limit based on minBytesPerProcess
else{
nb_processes_involved = std::max(1,std::min(MaxProcessesInvolved,int((totalBytesForPositions+MinBytesPerProcess-1)/MinBytesPerProcess)));
particles_chunk_per_process = (MinBytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions);
}
// Print out
if(my_rank == 0){
DEBUG_MSG("[INFO] Limit of processes involved in the particles ouput = %d (BFPS_PO_MAX_PROCESSES)\n", MaxProcessesInvolved);
DEBUG_MSG("[INFO] Minimum bytes per process to write = %llu (BFPS_PO_MIN_BYTES) for a complete output of = %llu for positions\n", MinBytesPerProcess, totalBytesForPositions);
DEBUG_MSG("[INFO] Consequently, there are %d processes that actually write data (%d particles per process)\n", nb_processes_involved, particles_chunk_per_process);
}
if(my_rank < nb_processes_involved){
current_is_involved = true;
particles_chunk_current_offset = my_rank*particles_chunk_per_process;
assert(particles_chunk_current_offset < total_nb_particles);
particles_chunk_current_size = std::min(particles_chunk_per_process, total_nb_particles-particles_chunk_current_offset);
assert(particles_chunk_current_offset + particles_chunk_current_size <= total_nb_particles);
assert(my_rank != nb_processes_involved-1 || particles_chunk_current_offset + particles_chunk_current_size == total_nb_particles);
}
else{
current_is_involved = false;
particles_chunk_current_size = 0;
particles_chunk_current_offset = total_nb_particles;
}
AssertMpi( MPI_Comm_split(mpi_com,
(current_is_involved ? 1 : MPI_UNDEFINED),
my_rank, &mpi_com_writer) );
}
virtual ~abstract_particles_output(){
if(current_is_involved){
AssertMpi( MPI_Comm_free(&mpi_com_writer) );
}
}
void releaseMemory(){
......@@ -125,12 +188,12 @@ public:
}
}
const particles_utils::IntervalSplitter<int> particles_splitter(total_nb_particles, nb_processes, my_rank);
int* buffer_indexes_send_tmp = reinterpret_cast<int*>(buffer_indexes_send.get());// trick re-use buffer_indexes_send memory
std::vector<int> nb_particles_to_send(nb_processes, 0);
for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
nb_particles_to_send[particles_splitter.getOwner(buffer_indexes_send[idx_part].second)] += 1;
const int dest_proc = buffer_indexes_send[idx_part].second/particles_chunk_per_process;
assert(dest_proc < nb_processes_involved);
nb_particles_to_send[dest_proc] += 1;
buffer_indexes_send_tmp[idx_part] = buffer_indexes_send[idx_part].second;
}
......@@ -138,7 +201,7 @@ public:
// nb_particles_to_send is invalid after here
const int nb_to_receive = exchanger.getTotalToRecv();
assert(nb_to_receive == particles_splitter.getMySize());
assert(nb_to_receive == particles_chunk_current_size);
if(size_buffers_recv < nb_to_receive && nb_to_receive){
buffer_indexes_recv.reset(new int[nb_to_receive]);
......@@ -159,6 +222,12 @@ public:
}
}
// Stop here if not involved
if(current_is_involved == false){
assert(nb_to_receive == 0);
return;
}
if(size_buffers_send < nb_to_receive && nb_to_receive){
buffer_indexes_send.reset(new std::pair<int,int>[nb_to_receive]);
buffer_particles_positions_send.reset(new real_number[nb_to_receive*size_particle_positions]);
......@@ -172,9 +241,9 @@ public:
TIMEZONE("copy-local-order");
for(int idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
const int src_idx = idx_part;
const int dst_idx = buffer_indexes_recv[idx_part]-particles_splitter.getMyOffset();
const int dst_idx = buffer_indexes_recv[idx_part]-particles_chunk_current_offset;
assert(0 <= dst_idx);
assert(dst_idx < particles_splitter.getMySize());
assert(dst_idx < particles_chunk_current_size);
for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
......@@ -190,7 +259,7 @@ public:
}
write(idx_time_step, buffer_particles_positions_send.get(), buffer_particles_rhs_send.data(),
nb_to_receive, particles_splitter.getMyOffset());
nb_to_receive, particles_chunk_current_offset);
}
virtual void write(const int idx_time_step, const real_number* positions, const std::unique_ptr<real_number[]>* rhs,
......
#ifndef ENV_UTILS_HPP
#define ENV_UTILS_HPP
#include <cstdlib>
#include <sstream>
#include <iostream>
#include <cstring>
#include <cstring>
#include <array>
class env_utils {
template <class VariableType>
static const VariableType StrToOther(const char* const str, const VariableType& defaultValue = VariableType()){
std::istringstream iss(str,std::istringstream::in);
VariableType value;
iss >> value;
if( /*iss.tellg()*/ iss.eof() ) return value;
return defaultValue;
}
public:
static bool VariableIsDefine(const char inVarName[]){
return getenv(inVarName) != 0;
}
template <class VariableType>
static const VariableType GetValue(const char inVarName[], const VariableType defaultValue = VariableType()){
const char*const value = getenv(inVarName);
if(!value){
return defaultValue;
}
return StrToOther(value,defaultValue);
}
static bool GetBool(const char inVarName[], const bool defaultValue = false){
const char*const value = getenv(inVarName);
if(!value){
return defaultValue;
}
return (strcmp(value,"TRUE") == 0) || (strcmp(value,"true") == 0) || (strcmp(value,"1") == 0);
}
static const char* GetStr(const char inVarName[], const char* const defaultValue = 0){
const char*const value = getenv(inVarName);
if(!value){
return defaultValue;
}
return value;
}
template <class VariableType, class ArrayType>
static int GetValueInArray(const char inVarName[], const ArrayType& possibleValues, const int nbPossibleValues, const int defaultIndex = -1){
const char*const value = getenv(inVarName);
if(value){
for(int idxPossible = 0 ; idxPossible < nbPossibleValues ; ++idxPossible){
if( StrToOther(value,VariableType()) == possibleValues[idxPossible] ){
return idxPossible;
}
}
}
return defaultIndex;
}
template <class ArrayType>
static int GetStrInArray(const char inVarName[], const ArrayType& possibleValues, const int nbPossibleValues, const int defaultIndex = -1){
const char*const value = getenv(inVarName);
if(value){
for(int idxPossible = 0 ; idxPossible < nbPossibleValues ; ++idxPossible){
if( strcmp(value,possibleValues[idxPossible]) == 0 ){
return idxPossible;
}
}
}
return defaultIndex;
}
};
#endif
......@@ -18,19 +18,22 @@ class particles_output_hdf5 : public abstract_particles_output<real_number,
size_particle_positions,
size_particle_rhs>;
const std::string particle_species_name;
hid_t file_id;
const int total_nb_particles;
const std::string particle_species_name;
hid_t dset_id_state;
hid_t dset_id_rhs;
bool use_collective_io;
public:
particles_output_hdf5(MPI_Comm in_mpi_com,
const std::string ps_name,
const int inTotalNbParticles,
const int in_nb_rhs)
const int in_nb_rhs,
const bool in_use_collective_io = false)
: abstract_particles_output<real_number,
size_particle_positions,
size_particle_rhs>(
......@@ -41,120 +44,128 @@ public:
file_id(0),
total_nb_particles(inTotalNbParticles),
dset_id_state(0),
dset_id_rhs(0){}
dset_id_rhs(0),
use_collective_io(in_use_collective_io){}
int open_file(std::string filename){
if(Parent::isInvolved()){
TIMEZONE("particles_output_hdf5::open_file");
TIMEZONE("particles_output_hdf5::open_file");
this->require_checkpoint_groups(filename);
hid_t plist_id_par = H5Pcreate(H5P_FILE_ACCESS);
assert(plist_id_par >= 0);
int retTest = H5Pset_fapl_mpio(
plist_id_par,
Parent::getCom(),
MPI_INFO_NULL);
assert(retTest >= 0);
// Parallel HDF5 write
file_id = H5Fopen(
filename.c_str(),
H5F_ACC_RDWR | H5F_ACC_DEBUG,
plist_id_par);
// file_id = H5Fcreate(filename.c_str(), H5F_ACC_TRUNC | H5F_ACC_DEBUG/*H5F_ACC_EXCL*/, H5P_DEFAULT/*H5F_ACC_RDWR*/, plist_id_par);
assert(file_id >= 0);
H5Pclose(plist_id_par);
dset_id_state = H5Gopen(
file_id,
(this->particle_species_name + std::string("/state")).c_str(),
H5P_DEFAULT);
assert(dset_id_state >= 0);
dset_id_rhs = H5Gopen(
file_id,
(this->particle_species_name + std::string("/rhs")).c_str(),
H5P_DEFAULT);
assert(dset_id_rhs >= 0);
this->require_checkpoint_groups(filename);
hid_t plist_id_par = H5Pcreate(H5P_FILE_ACCESS);
assert(plist_id_par >= 0);
int retTest = H5Pset_fapl_mpio(
plist_id_par,
Parent::getComWriter(),
MPI_INFO_NULL);
assert(retTest >= 0);
// Parallel HDF5 write
file_id = H5Fopen(
filename.c_str(),
H5F_ACC_RDWR | H5F_ACC_DEBUG,
plist_id_par);
// file_id = H5Fcreate(filename.c_str(), H5F_ACC_TRUNC | H5F_ACC_DEBUG/*H5F_ACC_EXCL*/, H5P_DEFAULT/*H5F_ACC_RDWR*/, plist_id_par);
assert(file_id >= 0);
H5Pclose(plist_id_par);
dset_id_state = H5Gopen(
file_id,
(this->particle_species_name + std::string("/state")).c_str(),
H5P_DEFAULT);
assert(dset_id_state >= 0);
dset_id_rhs = H5Gopen(
file_id,
(this->particle_species_name + std::string("/rhs")).c_str(),
H5P_DEFAULT);
assert(dset_id_rhs >= 0);
}
return EXIT_SUCCESS;
}
~particles_output_hdf5(){}
int close_file(void){
TIMEZONE("particles_output_hdf5::close_file");
if(Parent::isInvolved()){
TIMEZONE("particles_output_hdf5::close_file");
int rethdf = H5Gclose(dset_id_state);
assert(rethdf >= 0);
int rethdf = H5Gclose(dset_id_state);
assert(rethdf >= 0);
rethdf = H5Gclose(dset_id_rhs);
assert(rethdf >= 0);
rethdf = H5Gclose(dset_id_rhs);
assert(rethdf >= 0);
rethdf = H5Fclose(file_id);
assert(rethdf >= 0);
rethdf = H5Fclose(file_id);
assert(rethdf >= 0);
}
return EXIT_SUCCESS;
}
void require_checkpoint_groups(std::string filename){
if (Parent::getMyRank() == 0)
{
hid_t file_id = H5Fopen(
filename.c_str(),
H5F_ACC_RDWR | H5F_ACC_DEBUG,
H5P_DEFAULT);
assert(file_id >= 0);
bool group_exists = H5Lexists(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT);
if (!group_exists)
if(Parent::isInvolved()){
if (Parent::getMyRank() == 0)
{
hid_t gg = H5Gcreate(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
hid_t file_id = H5Fopen(
filename.c_str(),
H5F_ACC_RDWR | H5F_ACC_DEBUG,
H5P_DEFAULT);
assert(file_id >= 0);
bool group_exists = H5Lexists(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT);
if (!group_exists)
{
hid_t gg = H5Gcreate(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
assert(gg >= 0);
H5Gclose(gg);
}
hid_t gg = H5Gopen(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT);
assert(gg >= 0);
group_exists = H5Lexists(
gg,
"state",
H5P_DEFAULT);
if (!group_exists)
{
hid_t ggg = H5Gcreate(
gg,
"state",
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
assert(ggg >= 0);
H5Gclose(ggg);
}
group_exists = H5Lexists(
gg,
"rhs",
H5P_DEFAULT);
if (!group_exists)
{
hid_t ggg = H5Gcreate(
gg,
"rhs",
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
assert(ggg >= 0);
H5Gclose(ggg);
}
H5Gclose(gg);
H5Fclose(file_id);
}
hid_t gg = H5Gopen(
file_id,
this->particle_species_name.c_str(),
H5P_DEFAULT);
assert(gg >= 0);
group_exists = H5Lexists(
gg,
"state",
H5P_DEFAULT);
if (!group_exists)
{
hid_t ggg = H5Gcreate(
gg,
"state",
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
assert(ggg >= 0);
H5Gclose(ggg);
}
group_exists = H5Lexists(
gg,
"rhs",
H5P_DEFAULT);
if (!group_exists)
{
hid_t ggg = H5Gcreate(
gg,
"rhs",
H5P_DEFAULT,
H5P_DEFAULT,
H5P_DEFAULT);
assert(ggg >= 0);
H5Gclose(ggg);
}
H5Gclose(gg);
H5Fclose(file_id);
MPI_Barrier(Parent::getComWriter());
}
MPI_Barrier(Parent::getCom());
}
void write(
......@@ -163,6 +174,8 @@ public:
const std::unique_ptr<real_number[]>* particles_rhs,
const int nb_particles,
const int particles_idx_offset) final{
assert(Parent::isInvolved());
TIMEZONE("particles_output_hdf5::write");
assert(particles_idx_offset < Parent::getTotalNbParticles() || (particles_idx_offset == Parent::getTotalNbParticles() && nb_particles == 0));
......@@ -176,7 +189,7 @@ public:
hid_t plist_id = H5Pcreate(H5P_DATASET_XFER);
assert(plist_id >= 0);
{
int rethdf = H5Pset_dxpl_mpio(plist_id, H5FD_MPIO_INDEPENDENT);
int rethdf = H5Pset_dxpl_mpio(plist_id, use_collective_io ? H5FD_MPIO_COLLECTIVE : H5FD_MPIO_INDEPENDENT);
assert(rethdf >= 0);
}
......
......@@ -26,25 +26,31 @@ public:
const int in_nb_rhs, const int in_nb_step_prealloc = -1)
: abstract_particles_output<real_number, size_particle_positions, size_particle_rhs>(in_mpi_com, inTotalNbParticles, in_nb_rhs),
filename(in_filename), nb_step_prealloc(in_nb_step_prealloc), current_step_in_file(0){
{
TIMEZONE("particles_output_mpiio::MPI_File_open");
AssertMpi(MPI_File_open(Parent::getCom(), const_cast<char*>(filename.c_str()),
MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &mpi_file));
}
if(nb_step_prealloc != -1){
TIMEZONE("particles_output_mpiio::MPI_File_set_size");
AssertMpi(MPI_File_set_size(mpi_file,
nb_step_prealloc*Parent::getTotalNbParticles()*sizeof(real_number)*(size_particle_positions+size_particle_rhs*Parent::getNbRhs())));
if(Parent::isInvolved()){
{
TIMEZONE("particles_output_mpiio::MPI_File_open");
AssertMpi(MPI_File_open(Parent::getComWriter(), const_cast<char*>(filename.c_str()),
MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &mpi_file));
}
if(nb_step_prealloc != -1){
TIMEZONE("particles_output_mpiio::MPI_File_set_size");
AssertMpi(MPI_File_set_size(mpi_file,
nb_step_prealloc*Parent::getTotalNbParticles()*sizeof(real_number)*(size_particle_positions+size_particle_rhs*Parent::getNbRhs())));
}
}
}
~particles_output_mpiio(){
TIMEZONE("particles_output_mpiio::MPI_File_close");
AssertMpi(MPI_File_close(&mpi_file));
if(Parent::isInvolved()){
TIMEZONE("particles_output_mpiio::MPI_File_close");
AssertMpi(MPI_File_close(&mpi_file));
}
}
void write(const int /*time_step*/, const real_number* particles_positions, const std::unique_ptr<real_number[]>* particles_rhs,
const int nb_particles, const int particles_idx_offset) final{
assert(Parent::isInvolved());
TIMEZONE("particles_output_mpiio::write");
assert(nb_step_prealloc == -1 || current_step_in_file < nb_step_prealloc);
......
......@@ -199,8 +199,14 @@ public:
}
else{
step_split = double(nb_items)/double(nb_intervals);
offset_mine = NumType(step_split*double(my_idx));
size_mine = (my_idx != nb_intervals-1 ? NumType(step_split*double(my_idx+1)) : nb_items) -offset_mine;
if(nb_intervals <= my_idx){
offset_mine = nb_items;
size_mine = 0;
}
else{
offset_mine = NumType(step_split*double(my_idx));
size_mine = (my_idx != nb_intervals-1 ? NumType(step_split*double(my_idx+1)) : nb_items) -offset_mine;
}
}
}
......
......@@ -128,7 +128,8 @@ particle_headers = [
'cpp/particles/particles_output_mpiio.hpp',
'cpp/particles/particles_system_builder.hpp',
'cpp/particles/particles_system.hpp',
'cpp/particles/particles_utils.hpp']
'cpp/particles/particles_utils.hpp',
'cpp/particles/env_utils.hpp']
header_list = (['cpp/base.hpp'] +
['cpp/fftw_interface.hpp'] +
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment