Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • feature/add-fft-interface
  • feature/expose-rnumber-from-simulations
  • feature/particle_state_generation_with_variable_box_size
  • feature/forcing-unit-test
  • feature/dealias-check2
  • bugfix/check_field_exists
  • feature/dealias-check
  • v3.x
  • feature/particles-vectorization
  • 6.2.4
  • 6.2.3
  • 6.2.2
  • 6.2.1
  • 6.2.0
  • 6.1.0
  • 6.0.0
  • 5.8.1
  • 5.8.0
  • 5.7.2
  • 5.7.1
  • 5.7.0
  • 5.6.0
  • 5.5.1
  • 5.5.0
  • 5.4.7
  • 5.4.6
  • 5.4.5
  • 5.4.4
  • 5.4.3
30 results

abstract_particles_output.hpp

Blame
  • Cristian C Lalescu's avatar
    Cristian Lalescu authored
    this gets rid of a whole bunch of warnings
    1c73f150
    History
    abstract_particles_output.hpp 15.72 KiB
    /******************************************************************************
    *                                                                             *
    *  Copyright 2019 Max Planck Institute for Dynamics and Self-Organization     *
    *                                                                             *
    *  This file is part of bfps.                                                 *
    *                                                                             *
    *  bfps is free software: you can redistribute it and/or modify               *
    *  it under the terms of the GNU General Public License as published          *
    *  by the Free Software Foundation, either version 3 of the License,          *
    *  or (at your option) any later version.                                     *
    *                                                                             *
    *  bfps is distributed in the hope that it will be useful,                    *
    *  but WITHOUT ANY WARRANTY; without even the implied warranty of             *
    *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
    *  GNU General Public License for more details.                               *
    *                                                                             *
    *  You should have received a copy of the GNU General Public License          *
    *  along with bfps.  If not, see <http://www.gnu.org/licenses/>               *
    *                                                                             *
    * Contact: Cristian.Lalescu@ds.mpg.de                                         *
    *                                                                             *
    ******************************************************************************/
    
    
    
    #ifndef ABSTRACT_PARTICLES_OUTPUT
    #define ABSTRACT_PARTICLES_OUTPUT
    
    #include <memory>
    #include <vector>
    #include <cassert>
    #include <algorithm>
    #include <cstddef>
    
    #include "base.hpp"
    #include "particles_utils.hpp"
    #include "alltoall_exchanger.hpp"
    #include "scope_timer.hpp"
    #include "env_utils.hpp"
    
    template <class partsize_t, class real_number, int size_particle_positions>
    class abstract_particles_output {
        MPI_Comm mpi_com;
        MPI_Comm mpi_com_writer;
    
        int my_rank;
        int nb_processes;
    
        const partsize_t total_nb_particles;
        const int nb_rhs;
    
        std::unique_ptr<std::pair<partsize_t,partsize_t>[]> buffer_indexes_send;
        std::unique_ptr<real_number[]> buffer_particles_positions_send;
        std::vector<std::unique_ptr<real_number[]>> buffer_particles_rhs_send;
        partsize_t size_buffers_send;
        int buffers_size_particle_rhs_send;
    
        std::unique_ptr<real_number[]> buffer_particles_positions_recv;
        std::vector<std::unique_ptr<real_number[]>> buffer_particles_rhs_recv;
        std::unique_ptr<partsize_t[]> buffer_indexes_recv;
        partsize_t size_buffers_recv;    
        int buffers_size_particle_rhs_recv;
    
        int nb_processes_involved;
        bool current_is_involved;
        partsize_t particles_chunk_per_process;
        partsize_t particles_chunk_current_size;
        partsize_t particles_chunk_current_offset;
    
    protected:
        MPI_Comm& getCom(){
            return mpi_com;
        }
    
        MPI_Comm& getComWriter(){
            return mpi_com_writer;
        }
    
        int getNbRhs() const {
            return nb_rhs;
        }
    
        int getMyRank(){
            return this->my_rank;
        }
    
        bool isInvolved() const{
            return current_is_involved;
        }
    
    public:
        abstract_particles_output(MPI_Comm in_mpi_com, const partsize_t inTotalNbParticles, const int in_nb_rhs) throw()
                : mpi_com(in_mpi_com), my_rank(-1), nb_processes(-1),
                    total_nb_particles(inTotalNbParticles), nb_rhs(in_nb_rhs),
                    buffer_particles_rhs_send(in_nb_rhs), size_buffers_send(0),
                    buffers_size_particle_rhs_send(0),
                    buffer_particles_rhs_recv(in_nb_rhs), size_buffers_recv(0),
                    buffers_size_particle_rhs_recv(0),
                    nb_processes_involved(0), current_is_involved(true), particles_chunk_per_process(0),
                    particles_chunk_current_size(0), particles_chunk_current_offset(0) {
    
            AssertMpi(MPI_Comm_rank(mpi_com, &my_rank));
            AssertMpi(MPI_Comm_size(mpi_com, &nb_processes));
    
            const size_t MinBytesPerProcess = env_utils::GetValue<size_t>("BFPS_PO_MIN_BYTES", 32 * 1024 * 1024); // Default 32MB
            const size_t ChunkBytes = env_utils::GetValue<size_t>("BFPS_PO_CHUNK_BYTES", 8 * 1024 * 1024); // Default 8MB
            const int MaxProcessesInvolved = std::min(nb_processes, env_utils::GetValue<int>("BFPS_PO_MAX_PROCESSES", 128));
    
            // We split the processes using positions size only
            const size_t totalBytesForPositions = total_nb_particles*size_particle_positions*sizeof(real_number);
    
    
            if(MinBytesPerProcess*MaxProcessesInvolved < totalBytesForPositions){
                size_t extraChunkBytes = 1;
                while((MinBytesPerProcess+extraChunkBytes*ChunkBytes)*MaxProcessesInvolved < totalBytesForPositions){
                    extraChunkBytes += 1;
                }
                const size_t bytesPerProcess = (MinBytesPerProcess+extraChunkBytes*ChunkBytes);
                particles_chunk_per_process = partsize_t((bytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions));
                nb_processes_involved = int((total_nb_particles+particles_chunk_per_process-1)/particles_chunk_per_process);
            }
            // else limit based on minBytesPerProcess
            else{
                nb_processes_involved = std::max(1,std::min(MaxProcessesInvolved,int((totalBytesForPositions+MinBytesPerProcess-1)/MinBytesPerProcess)));
                particles_chunk_per_process = partsize_t((MinBytesPerProcess+sizeof(real_number)*size_particle_positions-1)/(sizeof(real_number)*size_particle_positions));
            }
    
            // Print out
            if(my_rank == 0){
                DEBUG_MSG("[INFO] Limit of processes involved in the particles ouput = %d (BFPS_PO_MAX_PROCESSES)\n", MaxProcessesInvolved);
                DEBUG_MSG("[INFO] Minimum bytes per process to write = %llu (BFPS_PO_MIN_BYTES) for a complete output of = %llu for positions\n", MinBytesPerProcess, totalBytesForPositions);
                DEBUG_MSG("[INFO] Consequently, there are %d processes that actually write data (%d particles per process)\n", nb_processes_involved, particles_chunk_per_process);
            }
    
            if(my_rank < nb_processes_involved){
                current_is_involved = true;
                particles_chunk_current_offset = my_rank*particles_chunk_per_process;
                assert(particles_chunk_current_offset < total_nb_particles);
                particles_chunk_current_size = std::min(particles_chunk_per_process, total_nb_particles-particles_chunk_current_offset);
                assert(particles_chunk_current_offset + particles_chunk_current_size <= total_nb_particles);
                assert(my_rank != nb_processes_involved-1 || particles_chunk_current_offset + particles_chunk_current_size == total_nb_particles);
            }
            else{
                current_is_involved = false;
                particles_chunk_current_size = 0;
                particles_chunk_current_offset = total_nb_particles;
            }
    
            AssertMpi( MPI_Comm_split(mpi_com,
                           (current_is_involved ? 1 : MPI_UNDEFINED),
                           my_rank, &mpi_com_writer) );
        }
    
        virtual ~abstract_particles_output() noexcept(false){
            if(current_is_involved){
                AssertMpi( MPI_Comm_free(&mpi_com_writer) );
            }
        }   
    
        partsize_t getTotalNbParticles() const {
            return total_nb_particles;
        }
    
        void releaseMemory(){
            delete[] buffer_indexes_send.release();
            delete[] buffer_particles_positions_send.release();
            size_buffers_send = 0;
            delete[] buffer_indexes_recv.release();
            delete[] buffer_particles_positions_recv.release();
            size_buffers_recv = 0;
            for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                delete[] buffer_particles_rhs_send[idx_rhs].release();
                delete[] buffer_particles_rhs_recv[idx_rhs].release();
            }
            buffers_size_particle_rhs_send = 0;
            buffers_size_particle_rhs_recv = 0;
        }
    
        template <int size_particle_rhs>
        void save(
                const real_number input_particles_positions[],
                const std::unique_ptr<real_number[]> input_particles_rhs[],
                const partsize_t index_particles[],
                const partsize_t nb_particles,
                const int idx_time_step){
            TIMEZONE("abstract_particles_output::save");
            assert(total_nb_particles != -1);
    
            {
                TIMEZONE("sort-to-distribute");
    
                if(size_buffers_send < nb_particles){
                    size_buffers_send = nb_particles;
                    buffer_indexes_send.reset(new std::pair<partsize_t,partsize_t>[size_buffers_send]);
                    buffer_particles_positions_send.reset(new real_number[size_buffers_send*size_particle_positions]);
                    
                    if(buffers_size_particle_rhs_send < size_particle_rhs){
                        buffers_size_particle_rhs_send = size_particle_rhs;
                    }
                    for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                        buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
                    }
                }
                else if(buffers_size_particle_rhs_send < size_particle_rhs){
                    buffers_size_particle_rhs_send = size_particle_rhs;
                    if(size_buffers_send > 0){
                        for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                            buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
                        }
                    }
                }
    
                for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
                    buffer_indexes_send[idx_part].first = idx_part;
                    buffer_indexes_send[idx_part].second = index_particles[idx_part];
                }
    
                std::sort(&buffer_indexes_send[0], &buffer_indexes_send[nb_particles], [](const std::pair<partsize_t,partsize_t>& p1,
                                                                                          const std::pair<partsize_t,partsize_t>& p2){
                    return p1.second < p2.second;
                });
    
                for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
                    const partsize_t src_idx = buffer_indexes_send[idx_part].first;
                    const partsize_t dst_idx = idx_part;
    
                    for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
                        buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
                                = input_particles_positions[src_idx*size_particle_positions + idx_val];
                    }
                    for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                        for(int idx_val = 0 ; idx_val < int(size_particle_rhs) ; ++idx_val){
                            buffer_particles_rhs_send[idx_rhs][dst_idx*size_particle_rhs + idx_val]
                                    = input_particles_rhs[idx_rhs][src_idx*size_particle_rhs + idx_val];
                        }
                    }
                }
            }
    
            partsize_t* buffer_indexes_send_tmp = reinterpret_cast<partsize_t*>(buffer_indexes_send.get());// trick re-use buffer_indexes_send memory
            std::vector<partsize_t> nb_particles_to_send(nb_processes, 0);
            for(partsize_t idx_part = 0 ; idx_part < nb_particles ; ++idx_part){
                const int dest_proc = int(buffer_indexes_send[idx_part].second/particles_chunk_per_process);
                assert(dest_proc < nb_processes_involved);
                nb_particles_to_send[dest_proc] += 1;
                buffer_indexes_send_tmp[idx_part] = buffer_indexes_send[idx_part].second;
            }
    
            alltoall_exchanger exchanger(mpi_com, std::move(nb_particles_to_send));
            // nb_particles_to_send is invalid after here
    
            const int nb_to_receive = exchanger.getTotalToRecv();
            assert(nb_to_receive == particles_chunk_current_size);
    
            if(size_buffers_recv < nb_to_receive){
                size_buffers_recv = nb_to_receive;
                buffer_indexes_recv.reset(new partsize_t[size_buffers_recv]);
                buffer_particles_positions_recv.reset(new real_number[size_buffers_recv*size_particle_positions]);
    
                buffers_size_particle_rhs_recv = size_particle_rhs;
                for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                    buffer_particles_rhs_recv[idx_rhs].reset(new real_number[size_buffers_recv*buffers_size_particle_rhs_recv]);
                }
            }
            else if(buffers_size_particle_rhs_recv < size_particle_rhs){
                buffers_size_particle_rhs_recv = size_particle_rhs;
                if(size_buffers_recv > 0){
                    for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                        buffer_particles_rhs_recv[idx_rhs].reset(new real_number[size_buffers_recv*buffers_size_particle_rhs_recv]);
                    }
                }
            }
    
            {
                TIMEZONE("exchange");
                // Could be done with multiple asynchronous coms
                exchanger.alltoallv<partsize_t>(buffer_indexes_send_tmp, buffer_indexes_recv.get());
                exchanger.alltoallv<real_number>(buffer_particles_positions_send.get(), buffer_particles_positions_recv.get(), size_particle_positions);
                for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                    exchanger.alltoallv<real_number>(buffer_particles_rhs_send[idx_rhs].get(), buffer_particles_rhs_recv[idx_rhs].get(), size_particle_rhs);
                }
            }
    
            // Stop here if not involved
            if(current_is_involved == false){
                assert(nb_to_receive == 0);
                return;
            }
    
            if(size_buffers_send < nb_to_receive){
                size_buffers_send = nb_to_receive;
                buffer_indexes_send.reset(new std::pair<partsize_t,partsize_t>[size_buffers_send]);
                buffer_particles_positions_send.reset(new real_number[size_buffers_send*size_particle_positions]);
                buffers_size_particle_rhs_send = size_particle_rhs;
                for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                    buffer_particles_rhs_send[idx_rhs].reset(new real_number[size_buffers_send*buffers_size_particle_rhs_send]);
                }
            }
    
            {
                TIMEZONE("copy-local-order");
                for(partsize_t idx_part = 0 ; idx_part < nb_to_receive ; ++idx_part){
                    const partsize_t src_idx = idx_part;
                    const partsize_t dst_idx = buffer_indexes_recv[idx_part]-particles_chunk_current_offset;
                    assert(0 <= dst_idx);
                    assert(dst_idx < particles_chunk_current_size);
    
                    for(int idx_val = 0 ; idx_val < size_particle_positions ; ++idx_val){
                        buffer_particles_positions_send[dst_idx*size_particle_positions + idx_val]
                                = buffer_particles_positions_recv[src_idx*size_particle_positions + idx_val];
                    }
                    for(int idx_rhs = 0 ; idx_rhs < nb_rhs ; ++idx_rhs){
                        for(int idx_val = 0 ; idx_val < int(size_particle_rhs) ; ++idx_val){
                            buffer_particles_rhs_send[idx_rhs][dst_idx*size_particle_rhs + idx_val]
                                    = buffer_particles_rhs_recv[idx_rhs][src_idx*size_particle_rhs + idx_val];
                        }
                    }
                }
            }
    
            write(idx_time_step, buffer_particles_positions_send.get(), buffer_particles_rhs_send.data(),
                  nb_to_receive, particles_chunk_current_offset, size_particle_rhs);
        }
    
        virtual void write(const int idx_time_step, const real_number* positions, const std::unique_ptr<real_number[]>* rhs,
                           const partsize_t nb_particles, const partsize_t particles_idx_offset, const int size_particle_rhs) = 0;
    };
    
    #endif