Skip to content
Snippets Groups Projects
abstract_particles_output.hpp 6.62 KiB
#ifndef ABSTRACT_PARTICLES_OUTPUT
#define ABSTRACT_PARTICLES_OUTPUT

#include <memory>
#include <vector>
#include <cassert>
#include <algorithm>
#include <cstddef>

#include "base.hpp"
#include "particles_utils.hpp"
#include "alltoall_exchanger.hpp"
#include "scope_timer.hpp"

#ifndef AssertMpi
#define AssertMpi(X) if(MPI_SUCCESS != (X)) { printf("MPI Error at line %d\n",__LINE__); fflush(stdout) ; throw std::runtime_error("Stop from from mpi erro"); }
#endif


template <int size_particle_positions, int size_particle_rhs>
class abstract_particles_output {
    struct movable_particle{
        int global_idx;
        double positions[size_particle_positions];
        double rhs[size_particle_rhs];
    };

    void create_movable_mpi_data_type(){
        /** Type in order in the struct */
        MPI_Datatype type[3] = { MPI_INT, MPI_DOUBLE, MPI_DOUBLE };
        /** Number of occurence of each type */
        int blocklen[3] = { 1, size_particle_positions, size_particle_rhs };
        /** Position offset from struct starting address */
        MPI_Aint disp[3];
        disp[0] = offsetof(movable_particle,global_idx);
        disp[1] = offsetof(movable_particle,positions);
        disp[2] = offsetof(movable_particle,rhs);
        /** Create the type */
        AssertMpi( MPI_Type_create_struct(3, blocklen, disp, type, &mpi_movable_particle_type) );
        /** Commit it*/
        AssertMpi( MPI_Type_commit(&mpi_movable_particle_type) );
    }

    MPI_Datatype mpi_movable_particle_type;

    MPI_Comm mpi_com;

    int my_rank;
    int nb_processes;

    std::unique_ptr<movable_particle[]> buffer_particles_send;
    int nb_particles_allocated_send;
    const int total_nb_particles;

    std::unique_ptr<movable_particle[]> buffer_particles_recv;
    int nb_particles_allocated_recv;

protected:
    MPI_Comm& getCom(){
        return mpi_com;
    }

    int getTotalNbParticles() const {
        return total_nb_particles;
    }

public:
    abstract_particles_output(MPI_Comm in_mpi_com, const int inTotalNbParticles)
            : mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
                nb_particles_allocated_send(-1), total_nb_particles(inTotalNbParticles),
                nb_particles_allocated_recv(-1){

        AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
        AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));

        create_movable_mpi_data_type();
    }

    virtual ~abstract_particles_output(){
        MPI_Type_free(&mpi_movable_particle_type);
    }

    void releaseMemory(){
        buffer_particles_send.release();
        nb_particles_allocated_send = -1;
        buffer_particles_recv.release();
        nb_particles_allocated_recv = -1;
    }

    void save(const double input_particles_positions[], const double input_particles_rhs[],
              const int index_particles[], const int nb_particles, const int idx_time_step){
        TIMEZONE("abstract_particles_output::save");
        assert(total_nb_particles != -1);
        DEBUG_MSG("[%d] total_nb_particles %d \n", my_rank, total_nb_particles);
        DEBUG_MSG("[%d] nb_particles %d to distribute for saving \n", my_rank, nb_particles);

        {
            TIMEZONE("sort");

            if(nb_particles_allocated_send < nb_particles){
                buffer_particles_send.reset(new movable_particle[nb_particles]);
                nb_particles_allocated_send = nb_particles;
            }

            for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
                buffer_particles_send[idx_part].global_idx = index_particles[idx_part];
                for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
                    buffer_particles_send[idx_part].positions[idx_val] = input_particles_positions[idx_part*size_particle_positions + idx_val];
                }
                for(int idx_val = 0 ; idx_val < size_particle_rhs ; ++idx_val){
                    buffer_particles_send[idx_part].rhs[idx_val] = input_particles_rhs[idx_part*size_particle_rhs + idx_val];
                }
            }

            std::sort(&buffer_particles_send[0], &buffer_particles_send[nb_particles], [](const movable_particle& p1, const movable_particle& p2){
                return p1.global_idx < p2.global_idx;
            });
        }

        const particles_utils::IntervalSplitter<int> particles_splitter(total_nb_particles, nb_processes, my_rank);
        DEBUG_MSG("[%d] nb_particles_per_proc %d for saving\n", my_rank, particles_splitter.getMySize());

        std::vector<int> nb_particles_to_send(nb_processes, 0);
        for(int idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
            nb_particles_to_send[particles_splitter.getOwner(buffer_particles_send[idx_part].global_idx)] += 1;
        }

        alltoall_exchanger exchanger(mpi_com, std::move(nb_particles_to_send));
        // nb_particles_to_send is invalid after here

        const int nb_to_receive = exchanger.getTotalToRecv();

        if(nb_particles_allocated_recv < nb_to_receive){
            buffer_particles_recv.reset(new movable_particle[nb_to_receive]);
            nb_particles_allocated_recv = nb_to_receive;
        }

        exchanger.alltoallv(buffer_particles_send.get(), buffer_particles_recv.get(), mpi_movable_particle_type);

        // Trick re-use the buffer to have only double

        if(nb_particles_allocated_send < nb_to_receive){
            buffer_particles_send.reset(new movable_particle[nb_to_receive]);
            nb_particles_allocated_send = nb_to_receive;
        }

        double* buffer_positions_dptr = reinterpret_cast<double*>(buffer_particles_recv.get());
        double* buffer_rhs_dptr = reinterpret_cast<double*>(buffer_particles_send.get());
        {
            TIMEZONE("copy");
            for(int idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
                for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
                    buffer_positions_dptr[idx_part*size_particle_positions + idx_val]
                            = buffer_particles_recv[idx_part].positions[idx_val];
                }
                // Can be done here or before "positions" copy
                for(int idx_val = 0 ; idx_val < size_particle_rhs ; ++idx_val){
                    buffer_rhs_dptr[idx_part*size_particle_rhs + idx_val]
                            = buffer_particles_recv[idx_part].rhs[idx_val];
                }
            }
        }

        write(idx_time_step, buffer_positions_dptr, buffer_rhs_dptr, nb_to_receive, particles_splitter.getMyOffset());
    }

    virtual void write(const int idx_time_step, const double* positions, const double* rhs,
                       const int nb_particles, const int particles_idx_offset) = 0;
};

#endif