elpa_transpose_vectors.X90 4.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#if REALCASE==1
subroutine elpa_transpose_vectors_real(vmat_s,ld_s,comm_s,vmat_t,ld_t,comm_t,nvs,nvr,nvc,nblk)
#endif
#if COMPLEXCASE==1
subroutine elpa_transpose_vectors_complex(vmat_s,ld_s,comm_s,vmat_t,ld_t,comm_t,nvs,nvr,nvc,nblk)
#endif

!-------------------------------------------------------------------------------
! This routine transposes an array of vectors which are distributed in
! communicator comm_s into its transposed form distributed in communicator comm_t.
! There must be an identical copy of vmat_s in every communicator comm_s.
! After this routine, there is an identical copy of vmat_t in every communicator comm_t.
!
! vmat_s    original array of vectors
! ld_s      leading dimension of vmat_s
! comm_s    communicator over which vmat_s is distributed
! vmat_t    array of vectors in transposed form
! ld_t      leading dimension of vmat_t
! comm_t    communicator over which vmat_t is distributed
! nvs       global index where to start in vmat_s/vmat_t
!           Please note: this is kind of a hint, some values before nvs will be
!           accessed in vmat_s/put into vmat_t
! nvr       global length of vmat_s/vmat_t
! nvc       number of columns in vmat_s/vmat_t
! nblk      block size of block cyclic distribution
!
!-------------------------------------------------------------------------------

!   use ELPA1 ! for least_common_multiple
Andreas Marek's avatar
Andreas Marek committed
30
31
32
#ifdef WITH_OPENMP
   use omp_lib
#endif
33
34
35
36
37
38
39
40
41
42
43
44
45

   implicit none

   include 'mpif.h'

   integer, intent(in)              :: ld_s, comm_s, ld_t, comm_t, nvs, nvr, nvc, nblk
   DATATYPE*BYTESIZE, intent(in)    :: vmat_s(ld_s,nvc)
   DATATYPE*BYTESIZE, intent(inout) :: vmat_t(ld_t,nvc)

   DATATYPE*BYTESIZE, allocatable   :: aux(:)
   integer                          :: myps, mypt, nps, npt
   integer                          :: n, lc, k, i, ips, ipt, ns, nl, mpierr
   integer                          :: lcm_s_t, nblks_tot, nblks_comm, nblks_skip
Andreas Marek's avatar
Andreas Marek committed
46
   integer                          :: auxstride
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

   call mpi_comm_rank(comm_s,myps,mpierr)
   call mpi_comm_size(comm_s,nps ,mpierr)
   call mpi_comm_rank(comm_t,mypt,mpierr)
   call mpi_comm_size(comm_t,npt ,mpierr)

   ! The basic idea of this routine is that for every block (in the block cyclic
   ! distribution), the processor within comm_t which owns the diagonal
   ! broadcasts its values of vmat_s to all processors within comm_t.
   ! Of course this has not to be done for every block separately, since
   ! the communictation pattern repeats in the global matrix after
   ! the least common multiple of (nps,npt) blocks

   lcm_s_t   = least_common_multiple(nps,npt) ! least common multiple of nps, npt

   nblks_tot = (nvr+nblk-1)/nblk ! number of blocks corresponding to nvr

   ! Get the number of blocks to be skipped at the begin.
   ! This must be a multiple of lcm_s_t (else it is getting complicated),
   ! thus some elements before nvs will be accessed/set.

   nblks_skip = ((nvs-1)/(nblk*lcm_s_t))*lcm_s_t

   allocate(aux( ((nblks_tot-nblks_skip+lcm_s_t-1)/lcm_s_t) * nblk * nvc ))
Andreas Marek's avatar
Andreas Marek committed
71
#ifdef WITH_OPENMP
Andreas Marek's avatar
Andreas Marek committed
72
73
   !$omp parallel private(lc, i, k, ns, nl, nblks_comm, auxstride, ips, ipt, n)
#endif
74
75
76
77
78
79
80
81
   do n = 0, lcm_s_t-1

      ips = mod(n,nps)
      ipt = mod(n,npt)

      if(mypt == ipt) then

         nblks_comm = (nblks_tot-nblks_skip-n+lcm_s_t-1)/lcm_s_t
Andreas Marek's avatar
Andreas Marek committed
82
83
84
         auxstride = nblk * nblks_comm
!         if(nblks_comm==0) cycle
         if (nblks_comm .ne. 0) then
85
         if(myps == ips) then
Andreas Marek's avatar
Andreas Marek committed
86
87
88
89
!            k = 0
#ifdef WITH_OPENMP
            !$omp do
#endif
90
91
            do lc=1,nvc
               do i = nblks_skip+n, nblks_tot-1, lcm_s_t
Andreas Marek's avatar
Andreas Marek committed
92
                  k = (i - nblks_skip - n)/lcm_s_t * nblk + (lc - 1) * auxstride
93
94
95
                  ns = (i/nps)*nblk ! local start of block i
                  nl = min(nvr-i*nblk,nblk) ! length
                  aux(k+1:k+nl) = vmat_s(ns+1:ns+nl,lc)
Andreas Marek's avatar
Andreas Marek committed
96
!                  k = k+nblk
97
98
99
100
               enddo
            enddo
         endif

Andreas Marek's avatar
Andreas Marek committed
101
102
103
104
#ifdef WITH_OPENMP
         !$omp barrier
         !$omp master
#endif
105
106
107
108
109
110
111
#if COMPLEXCASE==1
         call MPI_Bcast(aux,nblks_comm*nblk*nvc,MPI_DOUBLE_COMPLEX,ips,comm_s,mpierr)
#endif

#if REALCASE==1
         call MPI_Bcast(aux,nblks_comm*nblk*nvc,MPI_REAL8,ips,comm_s,mpierr)
#endif
Andreas Marek's avatar
Andreas Marek committed
112
113
114
115
116
117
118
#ifdef WITH_OPENMP
         !$omp end master
         !$omp barrier

         !$omp do
#endif
!         k = 0
119
120
         do lc=1,nvc
            do i = nblks_skip+n, nblks_tot-1, lcm_s_t
Andreas Marek's avatar
Andreas Marek committed
121
               k = (i - nblks_skip - n)/lcm_s_t * nblk + (lc - 1) * auxstride
122
123
124
               ns = (i/npt)*nblk ! local start of block i
               nl = min(nvr-i*nblk,nblk) ! length
               vmat_t(ns+1:ns+nl,lc) = aux(k+1:k+nl)
Andreas Marek's avatar
Andreas Marek committed
125
!               k = k+nblk
126
127
            enddo
         enddo
Andreas Marek's avatar
Andreas Marek committed
128
         endif
129
130
131
      endif

   enddo
Andreas Marek's avatar
Andreas Marek committed
132
133
134
#ifdef WITH_OPENMP
   !$omp end parallel
#endif
135
136
137
138
   deallocate(aux)

end subroutine