mpi_utils.h 9 KB
Newer Older
Volker Springel's avatar
Volker Springel committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*******************************************************************************
 * \copyright   This file is part of the GADGET4 N-body/SPH code developed
 * \copyright   by Volker Springel. Copyright (C) 2014-2020 by Volker Springel
 * \copyright   (vspringel@mpa-garching.mpg.de) and all contributing authors.
 *******************************************************************************/

/*! \file  mpi_utils.h
 *  \brief declares some numerical values for MPI tags and function prototypes for MPI helper functions
 */

#ifndef MPI_UTILS_H
#define MPI_UTILS_H

#include <mpi.h>
#include "../data/dtypes.h"
#include "../data/mymalloc.h"

/*!< Various tags used for labeling MPI messages */
#define TAG_TOPNODE_FREE 4
#define TAG_TOPNODE_OFFSET 5
#define TAG_TOPNODE_ALLOC 6
#define TAG_EWALD_ALLOC 7
#define TAG_TABLE_ALLOC 8
#define TAG_TABLE_FREE 9
#define TAG_N 10
#define TAG_HEADER 11
#define TAG_PDATA 12
#define TAG_SPHDATA 13
#define TAG_KEY 14
#define TAG_DMOM 15
#define TAG_NODELEN 16
#define TAG_HMAX 17
#define TAG_GRAV_A 18
#define TAG_GRAV_B 19
#define TAG_DIRECT_A 20
#define TAG_DIRECT_B 21
#define TAG_HYDRO_A 22
#define TAG_HYDRO_B 23
#define TAG_NFORTHISTASK 24
#define TAG_PERIODIC_A 25
#define TAG_PERIODIC_B 26
#define TAG_PERIODIC_C 27
#define TAG_PERIODIC_D 28
#define TAG_NONPERIOD_A 29
#define TAG_NONPERIOD_B 30
#define TAG_NONPERIOD_C 31
#define TAG_NONPERIOD_D 32
#define TAG_POTENTIAL_A 33
#define TAG_POTENTIAL_B 34
#define TAG_DENS_A 35
#define TAG_DENS_B 36
#define TAG_LOCALN 37
#define TAG_BH_A 38
#define TAG_BH_B 39
#define TAG_SMOOTH_A 40
#define TAG_SMOOTH_B 41
#define TAG_ENRICH_A 42
#define TAG_CONDUCT_A 43
#define TAG_CONDUCT_B 44
#define TAG_FOF_A 45
#define TAG_FOF_B 46
#define TAG_FOF_C 47
#define TAG_FOF_D 48
#define TAG_FOF_E 49
#define TAG_FOF_F 50
#define TAG_FOF_G 51
#define TAG_HOTNGB_A 52
#define TAG_HOTNGB_B 53
#define TAG_GRAD_A 54
#define TAG_GRAD_B 55

#define TAG_SE 56

#define TAG_SEARCH_A 58
#define TAG_SEARCH_B 59

#define TAG_INJECT_A 61

#define TAG_PDATA_SPH 70
#define TAG_KEY_SPH 71

#define TAG_PDATA_STAR 72
#define TAG_STARDATA 73
#define TAG_KEY_STAR 74

#define TAG_PDATA_BH 75
#define TAG_BHDATA 76
#define TAG_KEY_BH 77

#define TAG_GRAVCOST_A 79
#define TAG_GRAVCOST_B 80

#define TAG_PM_FOLD 81

#define TAG_BARRIER 82
#define TAG_PART_DATA 83
#define TAG_NODE_DATA 84
#define TAG_RESULTS 85
#define TAG_DRIFT_INIT 86
100
#define TAG_ALL_UPDATE 87
Volker Springel's avatar
Volker Springel committed
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#define TAG_METDATA 500
#define TAG_FETCH_GRAVTREE 1000
#define TAG_FETCH_SPH_DENSITY 2000
#define TAG_FETCH_SPH_HYDRO 3000
#define TAG_FETCH_SPH_TREETIMESTEP 4000

void my_mpi_types_init(void);

int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount,
                   MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);

int myMPI_Alltoallv_new_prep(int *sendcnt, int *recvcnt, int *rdispls, MPI_Comm comm, int method);

void myMPI_Alltoallv_new(void *sendb, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void *recvb, int *recvcounts, int *rdispls,
                         MPI_Datatype recvtype, MPI_Comm comm, int method);

void myMPI_Alltoallv(void *sendbuf, size_t *sendcounts, size_t *sdispls, void *recvbuf, size_t *recvcounts, size_t *rdispls, int len,
                     int big_flag, MPI_Comm comm);

void my_int_MPI_Alltoallv(void *sendb, int *sendcounts, int *sdispls, void *recvb, int *recvcounts, int *rdispls, int len,
                          int big_flag, MPI_Comm comm);

int myMPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);

int MPI_hypercube_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs,
                             MPI_Datatype recvtype, MPI_Comm comm);

void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm comm);

void minimum_large_ints(int n, long long *src, long long *res, MPI_Comm comm);
void sumup_longs(int n, long long *src, long long *res, MPI_Comm comm);
void sumup_large_ints(int n, int *src, long long *res, MPI_Comm comm);

extern MPI_Datatype MPI_MyIntPosType;

extern MPI_Op MPI_MIN_MyIntPosType;
extern MPI_Op MPI_MAX_MyIntPosType;
extern MPI_Op MPI_MIN_MySignedIntPosType;
extern MPI_Op MPI_MAX_MySignedIntPosType;

template <typename T>
void allreduce_sum(T *glob, int N, MPI_Comm Communicator)
{
  int ntask, thistask, ptask;
  MPI_Comm_size(Communicator, &ntask);
  MPI_Comm_rank(Communicator, &thistask);

  for(ptask = 0; ntask > (1 << ptask); ptask++)
    ;

  // we are responsible for a certain stretch of the result, namely the one starting at loc_first_n, of length blocksize[thistask]

  int *blocksize  = (int *)Mem.mymalloc("blocksize", sizeof(int) * ntask);
  int *blockstart = (int *)Mem.mymalloc("blockstart", sizeof(int) * ntask);

  int blk     = N / ntask;
  int rmd     = N - blk * ntask; /* remainder */
  int pivot_n = rmd * (blk + 1);

  int loc_first_n = 0;
  blockstart[0]   = 0;

  for(int task = 0; task < ntask; task++)
    {
      if(task < rmd)
        blocksize[task] = blk + 1;
      else
        blocksize[task] = blk;

      if(task < thistask)
        loc_first_n += blocksize[task];

      if(task > 0)
        blockstart[task] = blockstart[task - 1] + blocksize[task - 1];
    }

  /* here we store the local result */
  T *loc_data = (T *)Mem.mymalloc_clear("loc_data", blocksize[thistask] * sizeof(T));

  int *send_count = (int *)Mem.mymalloc("send_count", sizeof(int) * ntask);
  int *recv_count = (int *)Mem.mymalloc("recv_count", sizeof(int) * ntask);

  int *send_offset = (int *)Mem.mymalloc("send_offset", sizeof(int) * ntask);

  struct ind_data
  {
    int n;
    T val;
  };

  ind_data *export_data = NULL;
  int nexport           = 0;

  for(int rep = 0; rep < 2; rep++)
    {
      for(int j = 0; j < ntask; j++)
        send_count[j] = 0;

      /* find for each non-zero element the processor where it should go for being summed */
      for(int n = 0; n < N; n++)
        {
          if(glob[n] != 0)
            {
              int task;
              if(n < pivot_n)
                task = n / (blk + 1);
              else
                task = rmd + (n - pivot_n) / blk; /* note: if blk=0, then this case can not occur */

              if(rep == 0)
                send_count[task]++;
              else
                {
                  int index              = send_offset[task] + send_count[task]++;
                  export_data[index].n   = n;
                  export_data[index].val = glob[n];
                }
            }
        }

      if(rep == 0)
        {
          MPI_Alltoall(send_count, 1, MPI_INT, recv_count, 1, MPI_INT, Communicator);

          send_offset[0] = 0;

          for(int j = 0; j < ntask; j++)
            {
              nexport += send_count[j];

              if(j > 0)
                send_offset[j] = send_offset[j - 1] + send_count[j - 1];
            }

          export_data = (ind_data *)Mem.mymalloc("export_data", nexport * sizeof(ind_data));
        }
      else
        {
          for(int ngrp = 0; ngrp < (1 << ptask); ngrp++) /* note: here we also have a transfer from each task to itself (for ngrp=0) */
            {
              int recvTask = thistask ^ ngrp;
              if(recvTask < ntask)
                if(send_count[recvTask] > 0 || recv_count[recvTask] > 0)
                  {
                    int nimport = recv_count[recvTask];

                    ind_data *import_data = (ind_data *)Mem.mymalloc("import_data", nimport * sizeof(ind_data));

                    MPI_Sendrecv(&export_data[send_offset[recvTask]], send_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask,
                                 TAG_DENS_B, import_data, recv_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask, TAG_DENS_B,
                                 Communicator, MPI_STATUS_IGNORE);

                    for(int i = 0; i < nimport; i++)
                      {
                        int j = import_data[i].n - loc_first_n;

                        if(j < 0 || j >= blocksize[thistask])
                          Terminate("j=%d < 0 || j>= blocksize[thistask]=%d", j, blocksize[thistask]);

                        loc_data[j] += import_data[i].val;
                      }

                    Mem.myfree(import_data);
                  }
            }

          Mem.myfree(export_data);
        }
    }

  Mem.myfree(send_offset);
  Mem.myfree(recv_count);
  Mem.myfree(send_count);

  /* now share the result across all processors */
  for(int ngrp = 0; ngrp < (1 << ptask); ngrp++) /* note: here we also have a transfer from each task to itself (for ngrp=0) */
    {
      int recvTask = thistask ^ ngrp;
      if(recvTask < ntask)
        if(blocksize[thistask] > 0 || blocksize[recvTask] > 0)
          MPI_Sendrecv(loc_data, blocksize[thistask] * sizeof(T), MPI_BYTE, recvTask, TAG_DENS_A, &glob[blockstart[recvTask]],
                       blocksize[recvTask] * sizeof(T), MPI_BYTE, recvTask, TAG_DENS_A, Communicator, MPI_STATUS_IGNORE);
    }

  Mem.myfree(loc_data);
  Mem.myfree(blockstart);
  Mem.myfree(blocksize);
}

#endif