Skip to content
Snippets Groups Projects
distributed_particles.cpp 18.04 KiB
/**********************************************************************
*                                                                     *
*  Copyright 2015 Max Planck Institute                                *
*                 for Dynamics and Self-Organization                  *
*                                                                     *
*  This file is part of bfps.                                         *
*                                                                     *
*  bfps is free software: you can redistribute it and/or modify       *
*  it under the terms of the GNU General Public License as published  *
*  by the Free Software Foundation, either version 3 of the License,  *
*  or (at your option) any later version.                             *
*                                                                     *
*  bfps is distributed in the hope that it will be useful,            *
*  but WITHOUT ANY WARRANTY; without even the implied warranty of     *
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the      *
*  GNU General Public License for more details.                       *
*                                                                     *
*  You should have received a copy of the GNU General Public License  *
*  along with bfps.  If not, see <http://www.gnu.org/licenses/>       *
*                                                                     *
* Contact: Cristian.Lalescu@ds.mpg.de                                 *
*                                                                     *
**********************************************************************/



//#define NDEBUG

#include <cmath>
#include <cassert>
#include <cstring>
#include <string>
#include <sstream>

#include "base.hpp"
#include "distributed_particles.hpp"
#include "fftw_tools.hpp"


extern int myrank, nprocs;

template <int particle_type, class rnumber, int interp_neighbours>
distributed_particles<particle_type, rnumber, interp_neighbours>::distributed_particles(
        const char *NAME,
        const hid_t data_file_id,
        interpolator<rnumber, interp_neighbours> *FIELD,
        const int NPARTICLES,
        const int TRAJ_SKIP,
        const int INTEGRATION_STEPS) : particles_io_base<particle_type>(
            NAME,
            TRAJ_SKIP,
            data_file_id,
            FIELD->descriptor->comm)
{
    assert((INTEGRATION_STEPS <= 6) &&
           (INTEGRATION_STEPS >= 1));
    this->vel = FIELD;
    this->rhs.resize(INTEGRATION_STEPS);
    this->integration_steps = INTEGRATION_STEPS;
    assert(this->nparticles == NPARTICLES);
}

template <int particle_type, class rnumber, int interp_neighbours>
distributed_particles<particle_type, rnumber, interp_neighbours>::~distributed_particles()
{
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::sample(
        interpolator<rnumber, interp_neighbours> *field,
        std::unordered_map<int, single_particle_state<POINT3D>> &y)
{
    double *yy = new double[3];
    y.clear();
    for (auto &pp: this->state)
    {
        (*field)(pp.second.data, yy);
        y[pp.first] = yy;
    }
    delete[] yy;
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::get_rhs(
        const std::unordered_map<int, single_particle_state<particle_type>> &x,
        std::unordered_map<int, single_particle_state<particle_type>> &y)
{
    double *yy = new double[this->ncomponents];
    y.clear();
    for (auto &pp: x)
    {
        (*this->vel)(pp.second.data, yy);
        y[pp.first] = yy;
    }
    delete[] yy;
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::sample(
        interpolator<rnumber, interp_neighbours> *field,
        const hid_t data_file_id,
        const char *dset_name)
{
    std::unordered_map<int, single_particle_state<POINT3D>> y;
    this->sample(field, y);
    this->write(data_file_id, dset_name, y);
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::roll_rhs()
{
    for (int i=this->integration_steps-2; i>=0; i--)
        rhs[i+1] = rhs[i];
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::redistribute(
        std::unordered_map<int, single_particle_state<particle_type>> &x,
        std::vector<std::unordered_map<int, single_particle_state<particle_type>>> &vals)
{
    //DEBUG_MSG("entered redistribute\n");
    /* neighbouring rank offsets */
    int ro[2];
    ro[0] = -1;
    ro[1] = 1;
    /* neighbouring ranks */
    int nr[2];
    nr[0] = MOD(this->myrank+ro[0], this->nprocs);
    nr[1] = MOD(this->myrank+ro[1], this->nprocs);
    /* particles to send, particles to receive */
    std::vector<int> ps[2], pr[2];
    /* number of particles to send, number of particles to receive */
    int nps[2], npr[2];
    int rsrc, rdst;
    /* get list of id-s to send */
    for (auto &pp: x)
        for (int i=0; i<2; i++)
            if (this->vel->get_rank(pp.second.data[2]) == nr[i])
                ps[i].push_back(pp.first);
    /* prepare data for send recv */
    for (int i=0; i<2; i++)
        nps[i] = ps[i].size();
    for (rsrc = 0; rsrc<this->nprocs; rsrc++)
        for (int i=0; i<2; i++)
        {
            rdst = MOD(rsrc+ro[i], this->nprocs);
            if (this->myrank == rsrc)
                MPI_Send(
                        nps+i,
                        1,
                        MPI_INTEGER,
                        rdst,
                        2*(rsrc*this->nprocs + rdst)+i,
                        this->comm);
            if (this->myrank == rdst)
                MPI_Recv(
                        npr+1-i,
                        1,
                        MPI_INTEGER,
                        rsrc,
                        2*(rsrc*this->nprocs + rdst)+i,
                        this->comm,
                        MPI_STATUS_IGNORE);
        }
    //DEBUG_MSG("I have to send %d %d particles\n", nps[0], nps[1]);
    //DEBUG_MSG("I have to recv %d %d particles\n", npr[0], npr[1]);
    for (int i=0; i<2; i++)
        pr[i].resize(npr[i]);

    int buffer_size = (nps[0] > nps[1]) ? nps[0] : nps[1];
    buffer_size = (buffer_size > npr[0])? buffer_size : npr[0];
    buffer_size = (buffer_size > npr[1])? buffer_size : npr[1];
    //DEBUG_MSG("buffer size is %d\n", buffer_size);
    double *buffer = new double[buffer_size*this->ncomponents*(1+vals.size())];
    for (rsrc = 0; rsrc<this->nprocs; rsrc++)
        for (int i=0; i<2; i++)
        {
            rdst = MOD(rsrc+ro[i], this->nprocs);
            if (this->myrank == rsrc && nps[i] > 0)
            {
                MPI_Send(
                        &ps[i].front(),
                        nps[i],
                        MPI_INTEGER,
                        rdst,
                        2*(rsrc*this->nprocs + rdst),
                        this->comm);
                int pcounter = 0;
                for (int p: ps[i])
                {
                    std::copy(x[p].data,
                              x[p].data + this->ncomponents,
                              buffer + pcounter*(1+vals.size())*this->ncomponents);
                    x.erase(p);
                    for (int tindex=0; tindex<vals.size(); tindex++)
                    {
                        std::copy(vals[tindex][p].data,
                                  vals[tindex][p].data + this->ncomponents,
                                  buffer + (pcounter*(1+vals.size()) + tindex+1)*this->ncomponents);
                        vals[tindex].erase(p);
                    }
                    pcounter++;
                }
                MPI_Send(
                        buffer,
                        nps[i]*(1+vals.size())*this->ncomponents,
                        MPI_DOUBLE,
                        rdst,
                        2*(rsrc*this->nprocs + rdst)+1,
                        this->comm);
            }
            if (this->myrank == rdst && npr[1-i] > 0)
            {
                MPI_Recv(
                        &pr[1-i].front(),
                        npr[1-i],
                        MPI_INTEGER,
                        rsrc,
                        2*(rsrc*this->nprocs + rdst),
                        this->comm,
                        MPI_STATUS_IGNORE);
                MPI_Recv(
                        buffer,
                        npr[1-i]*(1+vals.size())*this->ncomponents,
                        MPI_DOUBLE,
                        rsrc,
                        2*(rsrc*this->nprocs + rdst)+1,
                        this->comm,
                        MPI_STATUS_IGNORE);
                int pcounter = 0;
                for (int p: pr[1-i])
                {
                    x[p] = buffer + (pcounter*(1+vals.size()))*this->ncomponents;
                    for (int tindex=0; tindex<vals.size(); tindex++)
                    {
                        vals[tindex][p] = buffer + (pcounter*(1+vals.size()) + tindex+1)*this->ncomponents;
                    }
                    pcounter++;
                }
            }
        }
    delete[] buffer;


#ifndef NDEBUG
    /* check that all particles at x are local */
    for (auto &pp: x)
        if (this->vel->get_rank(pp.second.data[2]) != this->myrank)
        {
            DEBUG_MSG("found particle %d with rank %d\n",
                    pp.first,
                    this->vel->get_rank(pp.second.data[2]));
            assert(false);
        }
#endif
    //DEBUG_MSG("exiting redistribute\n");
}



template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::AdamsBashforth(
        const int nsteps)
{
    this->get_rhs(this->state, this->rhs[0]);
    for (auto &pp: this->state)
        for (int i=0; i<this->ncomponents; i++)
            switch(nsteps)
            {
                case 1:
                    pp.second[i] += this->dt*this->rhs[0][pp.first][i];
                    break;
                case 2:
                    pp.second[i] += this->dt*(3*this->rhs[0][pp.first][i]
                                            -   this->rhs[1][pp.first][i])/2;
                    break;
                case 3:
                    pp.second[i] += this->dt*(23*this->rhs[0][pp.first][i]
                                            - 16*this->rhs[1][pp.first][i]
                                            +  5*this->rhs[2][pp.first][i])/12;
                    break;
                case 4:
                    pp.second[i] += this->dt*(55*this->rhs[0][pp.first][i]
                                            - 59*this->rhs[1][pp.first][i]
                                            + 37*this->rhs[2][pp.first][i]
                                            -  9*this->rhs[3][pp.first][i])/24;
                    break;
                case 5:
                    pp.second[i] += this->dt*(1901*this->rhs[0][pp.first][i]
                                            - 2774*this->rhs[1][pp.first][i]
                                            + 2616*this->rhs[2][pp.first][i]
                                            - 1274*this->rhs[3][pp.first][i]
                                            +  251*this->rhs[4][pp.first][i])/720;
                    break;
                case 6:
                    pp.second[i] += this->dt*(4277*this->rhs[0][pp.first][i]
                                            - 7923*this->rhs[1][pp.first][i]
                                            + 9982*this->rhs[2][pp.first][i]
                                            - 7298*this->rhs[3][pp.first][i]
                                            + 2877*this->rhs[4][pp.first][i]
                                            -  475*this->rhs[5][pp.first][i])/1440;
                    break;
            }
    this->redistribute(this->state, this->rhs);
    this->roll_rhs();
}


template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::step()
{
    this->AdamsBashforth((this->iteration < this->integration_steps) ?
                            this->iteration+1 :
                            this->integration_steps);
    this->iteration++;
}


template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::read(
        const hid_t data_file_id)
{
    double *temp = new double[this->chunk_size*this->ncomponents];
    for (int cindex=0; cindex<this->get_number_of_chunks(); cindex++)
    {
        //read state
        if (this->myrank == 0)
            this->read_state_chunk(cindex, temp);
        MPI_Bcast(
                temp,
                this->chunk_size*this->ncomponents,
                MPI_DOUBLE,
                0,
                this->comm);
        for (int p=0; p<this->chunk_size; p++)
        {
            if (this->vel->get_rank(temp[this->ncomponents*p+2]) == this->myrank)
                this->state[p+cindex*this->chunk_size] = temp + this->ncomponents*p;
        }
        //read rhs
        if (this->iteration > 0)
            for (int i=0; i<this->integration_steps; i++)
            {
                if (this->myrank == 0)
                    this->read_rhs_chunk(cindex, i, temp);
                MPI_Bcast(
                        temp,
                        this->chunk_size*this->ncomponents,
                        MPI_DOUBLE,
                        0,
                        this->comm);
                for (int p=0; p<this->chunk_size; p++)
                {
                    auto pp = this->state.find(p+cindex*this->chunk_size);
                    if (pp != this->state.end())
                        this->rhs[i][p+cindex*this->chunk_size] = temp + this->ncomponents*p;
                }
            }
    }
    DEBUG_MSG("%s->state.size = %ld\n", this->name.c_str(), this->state.size());
    delete[] temp;
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::write(
        const hid_t data_file_id,
        const char *dset_name,
        std::unordered_map<int, single_particle_state<POINT3D>> &y)
{
    double *data = new double[this->nparticles*3];
    double *yy = new double[this->nparticles*3];
    for (int cindex=0; cindex<this->get_number_of_chunks(); cindex++)
    {
        std::fill_n(yy, this->nparticles*3, 0);
        for (int p=0; p<this->chunk_size; p++)
        {
            auto pp = y.find(p+cindex*this->chunk_size);
            if (pp != y.end())
                std::copy(pp->second.data,
                          pp->second.data + 3,
                          yy + pp->first*3);
        }
        MPI_Allreduce(
                yy,
                data,
                3*this->nparticles,
                MPI_DOUBLE,
                MPI_SUM,
                this->comm);
        if (this->myrank == 0)
            this->write_point3D_chunk(dset_name, cindex, data);
    }
    delete[] yy;
    delete[] data;
}

template <int particle_type, class rnumber, int interp_neighbours>
void distributed_particles<particle_type, rnumber, interp_neighbours>::write(
        const hid_t data_file_id,
        const bool write_rhs)
{
    double *temp0 = new double[this->chunk_size*this->ncomponents];
    double *temp1 = new double[this->chunk_size*this->ncomponents];
    for (int cindex=0; cindex<this->get_number_of_chunks(); cindex++)
    {
        //write state
        std::fill_n(temp0, this->ncomponents*this->chunk_size, 0);
        for (int p=0; p<this->chunk_size; p++)
        {
            auto pp = this->state.find(p + cindex*this->chunk_size);
            if (pp != this->state.end())
                std::copy(pp->second.data,
                          pp->second.data + this->ncomponents,
                          temp0 + pp->first*this->ncomponents);
        }
        MPI_Allreduce(
                temp0,
                temp1,
                this->ncomponents*this->chunk_size,
                MPI_DOUBLE,
                MPI_SUM,
                this->comm);
        if (this->myrank == 0)
            this->write_state_chunk(cindex, temp1);
        //write rhs
        if (write_rhs)
            for (int i=0; i<this->integration_steps; i++)
            {
                std::fill_n(temp0, this->ncomponents*this->chunk_size, 0);
                for (int p=0; p<this->chunk_size; p++)
                {
                    auto pp = this->rhs[i].find(p + cindex*this->chunk_size);
                    if (pp != this->rhs[i].end())
                        std::copy(pp->second.data,
                                  pp->second.data + this->ncomponents,
                                  temp0 + pp->first*this->ncomponents);
                }
                MPI_Allreduce(
                        temp0,
                        temp1,
                        this->ncomponents*this->chunk_size,
                        MPI_DOUBLE,
                        MPI_SUM,
                        this->comm);
                if (this->myrank == 0)
                    this->write_rhs_chunk(cindex, i, temp1);
            }
    }
    delete[] temp0;
    delete[] temp1;
}


/*****************************************************************************/
template class distributed_particles<VELOCITY_TRACER, float, 1>;
template class distributed_particles<VELOCITY_TRACER, float, 2>;
template class distributed_particles<VELOCITY_TRACER, float, 3>;
template class distributed_particles<VELOCITY_TRACER, float, 4>;
template class distributed_particles<VELOCITY_TRACER, float, 5>;
template class distributed_particles<VELOCITY_TRACER, float, 6>;
template class distributed_particles<VELOCITY_TRACER, double, 1>;
template class distributed_particles<VELOCITY_TRACER, double, 2>;
template class distributed_particles<VELOCITY_TRACER, double, 3>;
template class distributed_particles<VELOCITY_TRACER, double, 4>;
template class distributed_particles<VELOCITY_TRACER, double, 5>;
template class distributed_particles<VELOCITY_TRACER, double, 6>;
/*****************************************************************************/