Commit ccc7d1b9 authored by Andreas Marek's avatar Andreas Marek
Browse files

Move redist_band in seperate file, remove anti-pattern for real and

complex cases

Create automatically two independent routines for real and complex
valued matrices
parent f059c6e3
......@@ -3722,7 +3722,7 @@ subroutine invert_trm_real(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm
!
! na Order of matrix
!
! a(lda,*) Distributed matrix which should be inverted.
! a(lda,matrixCols) Distributed matrix which should be inverted.
! Distribution is like in Scalapack.
! Only upper triangle is needs to be set.
! The lower triangle is not referenced.
......@@ -3857,13 +3857,14 @@ subroutine cholesky_complex(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_com
!
! na Order of matrix
!
! a(lda,*) Distributed matrix which should be factorized.
! a(lda,matriCols) Distributed matrix which should be factorized.
! Distribution is like in Scalapack.
! Only upper triangle is needs to be set.
! On return, the upper triangle contains the Cholesky factor
! and the lower triangle is set to 0.
!
! lda Leading dimension of a
! matrixCols local columns of matrix a
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
......@@ -4044,12 +4045,13 @@ subroutine invert_trm_complex(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_c
!
! na Order of matrix
!
! a(lda,*) Distributed matrix which should be inverted.
! a(lda,matrixCols) Distributed matrix which should be inverted.
! Distribution is like in Scalapack.
! Only upper triangle is needs to be set.
! The lower triangle is not referenced.
!
! lda Leading dimension of a
! matrixCols local columns of matrix a
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
......
......@@ -265,7 +265,7 @@ function solve_evp_real_2stage(na, nev, a, lda, ev, q, ldq, nblk, &
ttt0 = MPI_Wtime()
ttts = ttt0
call bandred_real(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, &
call bandred_real(na, a, lda, nblk, nbw, matrixCols, num_blocks, mpi_comm_rows, mpi_comm_cols, &
tmat, wantDebug, success, useQRActual)
if (.not.(success)) return
ttt1 = MPI_Wtime()
......@@ -320,7 +320,7 @@ function solve_evp_real_2stage(na, nev, a, lda, ev, q, ldq, nblk, &
! Backtransform stage 2
ttt0 = MPI_Wtime()
call trans_ev_band_to_full_real(na, nev, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, mpi_comm_rows, &
call trans_ev_band_to_full_real(na, nev, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, num_blocks, mpi_comm_rows, &
mpi_comm_cols, useQRActual)
ttt1 = MPI_Wtime()
if (my_prow==0 .and. my_pcol==0 .and. elpa_print_times) &
......@@ -460,7 +460,7 @@ function solve_evp_complex_2stage(na, nev, a, lda, ev, q, ldq, nblk, &
ttt0 = MPI_Wtime()
ttts = ttt0
call bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, &
call bandred_complex(na, a, lda, nblk, nbw, matrixCols, num_blocks, mpi_comm_rows, mpi_comm_cols, &
tmat, wantDebug, success)
if (.not.(success)) then
#ifdef HAVE_DETAILED_TIMINGS
......@@ -528,7 +528,7 @@ function solve_evp_complex_2stage(na, nev, a, lda, ev, q, ldq, nblk, &
! Backtransform stage 2
ttt0 = MPI_Wtime()
call trans_ev_band_to_full_complex(na, nev, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, mpi_comm_rows, mpi_comm_cols)
call trans_ev_band_to_full_complex(na, nev, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, num_blocks, mpi_comm_rows, mpi_comm_cols)
ttt1 = MPI_Wtime()
if (my_prow==0 .and. my_pcol==0 .and. elpa_print_times) &
write(error_unit,*) 'Time trans_ev_band_to_full_complex :',ttt1-ttt0
......@@ -545,7 +545,7 @@ end function solve_evp_complex_2stage
!-------------------------------------------------------------------------------
subroutine bandred_real(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, &
subroutine bandred_real(na, a, lda, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols, &
tmat, wantDebug, success, useQR)
!-------------------------------------------------------------------------------
......@@ -572,7 +572,7 @@ subroutine bandred_real(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_co
! mpi_comm_cols
! MPI-Communicators for rows/columns
!
! tmat(nbw,nbw,num_blocks) where num_blocks = (na-1)/nbw + 1
! tmat(nbw,nbw,numBlocks) where numBlocks = (na-1)/nbw + 1
! Factors for the Householder vectors (returned), needed for back transformation
!
!-------------------------------------------------------------------------------
......@@ -581,8 +581,8 @@ subroutine bandred_real(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_co
#endif
implicit none
integer :: na, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols
real*8 :: a(lda,matrixCols), tmat(nbw,nbw,*)
integer :: na, lda, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols
real*8 :: a(lda,matrixCols), tmat(nbw,nbw,numBlocks)
integer :: my_prow, my_pcol, np_rows, np_cols, mpierr
integer :: l_cols, l_rows
......@@ -935,7 +935,7 @@ end subroutine symm_matrix_allreduce
!-------------------------------------------------------------------------------
subroutine trans_ev_band_to_full_real(na, nqc, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, mpi_comm_rows, &
subroutine trans_ev_band_to_full_real(na, nqc, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, numBlocks, mpi_comm_rows, &
mpi_comm_cols, useQR)
......@@ -959,7 +959,7 @@ subroutine trans_ev_band_to_full_real(na, nqc, nblk, nbw, a, lda, tmat, q, ldq,
! lda Leading dimension of a
! matrixCols local columns of matrix a and q
!
! tmat(nbw,nbw,.) Factors returned by bandred_real
! tmat(nbw,nbw,numBlocks) Factors returned by bandred_real
!
! q On input: Eigenvectors of band matrix
! On output: Transformed eigenvectors
......@@ -977,8 +977,8 @@ subroutine trans_ev_band_to_full_real(na, nqc, nblk, nbw, a, lda, tmat, q, ldq,
#endif
implicit none
integer :: na, nqc, lda, ldq, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols
real*8 :: a(lda,matrixCols), q(ldq,matrixCols), tmat(nbw, nbw, *)
integer :: na, nqc, lda, ldq, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols
real*8 :: a(lda,matrixCols), q(ldq,matrixCols), tmat(nbw, nbw, numBlocks)
integer :: my_prow, my_pcol, np_rows, np_cols, mpierr
integer :: max_blocks_row, max_blocks_col, max_local_rows, &
......@@ -1194,9 +1194,10 @@ subroutine tridiag_band_real(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm_ro
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
! a(lda,*) Distributed system matrix reduced to banded form in the upper diagonal
! a(lda,matrixCols) Distributed system matrix reduced to banded form in the upper diagonal
!
! lda Leading dimension of a
! matrixCols local columns of matrix a
!
! d(na) Diagonal of tridiagonal matrix, set only on PE 0 (output)
!
......@@ -1237,8 +1238,8 @@ subroutine tridiag_band_real(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm_ro
integer, allocatable :: limits(:), snd_limits(:,:)
integer, allocatable :: block_limits(:)
real*8, allocatable :: ab(:,:), hh_gath(:,:,:), hh_send(:,:,:)
! dummies for calling redist_band
complex*16 :: c_a(1,1), c_ab(1,1)
! ! dummies for calling redist_band
! complex*16 :: c_a(1,1), c_ab(1,1)
#ifdef WITH_OPENMP
integer :: omp_get_max_threads
......@@ -1294,7 +1295,7 @@ subroutine tridiag_band_real(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm_ro
n_off = block_limits(my_pe)*nb
! Redistribute band in a to ab
call redist_band(.true., a, c_a, lda, na, nblk, nb, mpi_comm_rows, mpi_comm_cols, mpi_comm, ab, c_ab)
call redist_band_real(a, lda, na, nblk, nb, matrixCols, mpi_comm_rows, mpi_comm_cols, mpi_comm, ab)
! Calculate the workload for each sweep in the back transformation
! and the space requirements to hold the HH vectors
......@@ -3216,7 +3217,7 @@ end subroutine
!-------------------------------------------------------------------------------
subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, tmat, wantDebug, success)
subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols, tmat, wantDebug, success)
!-------------------------------------------------------------------------------
! bandred_complex: Reduces a distributed hermitian matrix to band form
......@@ -3242,7 +3243,7 @@ subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi
! mpi_comm_cols
! MPI-Communicators for rows/columns
!
! tmat(nbw,nbw,num_blocks) where num_blocks = (na-1)/nbw + 1
! tmat(nbw,nbw,numBlocks) where numBlocks = (na-1)/nbw + 1
! Factors for the Householder vectors (returned), needed for back transformation
!
!-------------------------------------------------------------------------------
......@@ -3251,8 +3252,8 @@ subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi
#endif
implicit none
integer :: na, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols
complex*16 :: a(lda,matrixCols), tmat(nbw,nbw,*)
integer :: na, lda, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols
complex*16 :: a(lda,matrixCols), tmat(nbw,nbw,numBlocks)
complex*16, parameter :: CZERO = (0.d0,0.d0), CONE = (1.d0,0.d0)
......@@ -3417,7 +3418,7 @@ subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi
vav = 0
if (l_rows>0) &
call zherk('U','C',n_cols,l_rows,CONE,vmr,ubound(vmr,dim=1),CZERO,vav,ubound(vav,dim=1))
call herm_matrix_allreduce(n_cols,vav,ubound(vav,dim=1),mpi_comm_rows)
call herm_matrix_allreduce(n_cols,vav, nbw,nbw,mpi_comm_rows)
! Calculate triangular matrix T for block Householder Transformation
......@@ -3488,7 +3489,7 @@ subroutine bandred_complex(na, a, lda, nblk, nbw, matrixCols, mpi_comm_rows, mpi
ubound(umc,dim=1),CZERO,vav,ubound(vav,dim=1))
call ztrmm('Right','Upper','C','Nonunit',n_cols,n_cols,CONE,tmat(1,1,istep),ubound(tmat,dim=1),vav,ubound(vav,dim=1))
call herm_matrix_allreduce(n_cols,vav,ubound(vav,dim=1),mpi_comm_cols)
call herm_matrix_allreduce(n_cols,vav, nbw,nbw,mpi_comm_cols)
! U = U - 0.5 * V * VAV
call zgemm('N','N',l_cols,n_cols,n_cols,(-0.5d0,0.d0),umc(1,n_cols+1),ubound(umc,dim=1),vav,ubound(vav,dim=1), &
......@@ -3523,7 +3524,7 @@ end subroutine bandred_complex
!-------------------------------------------------------------------------------
subroutine herm_matrix_allreduce(n,a,lda,comm)
subroutine herm_matrix_allreduce(n,a,lda,ldb,comm)
!-------------------------------------------------------------------------------
! herm_matrix_allreduce: Does an mpi_allreduce for a hermitian matrix A.
......@@ -3534,8 +3535,8 @@ subroutine herm_matrix_allreduce(n,a,lda,comm)
use timings
#endif
implicit none
integer :: n, lda, comm
complex*16 :: a(lda,*)
integer :: n, lda, ldb, comm
complex*16 :: a(lda,ldb)
integer :: i, nc, mpierr
complex*16 :: h1(n*n), h2(n*n)
......@@ -3567,7 +3568,7 @@ end subroutine herm_matrix_allreduce
!-------------------------------------------------------------------------------
subroutine trans_ev_band_to_full_complex(na, nqc, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, mpi_comm_rows, mpi_comm_cols)
subroutine trans_ev_band_to_full_complex(na, nqc, nblk, nbw, a, lda, tmat, q, ldq, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols)
!-------------------------------------------------------------------------------
! trans_ev_band_to_full_complex:
......@@ -3589,7 +3590,7 @@ subroutine trans_ev_band_to_full_complex(na, nqc, nblk, nbw, a, lda, tmat, q, ld
! lda Leading dimension of a
! matrixCols local columns of matrix a and q
!
! tmat(nbw,nbw,.) Factors returned by bandred_complex
! tmat(nbw,nbw,numBlocks) Factors returned by bandred_complex
!
! q On input: Eigenvectors of band matrix
! On output: Transformed eigenvectors
......@@ -3607,8 +3608,8 @@ subroutine trans_ev_band_to_full_complex(na, nqc, nblk, nbw, a, lda, tmat, q, ld
#endif
implicit none
integer :: na, nqc, lda, ldq, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols
complex*16 :: a(lda,matrixCols), q(ldq,matrixCols), tmat(nbw, nbw, *)
integer :: na, nqc, lda, ldq, nblk, nbw, matrixCols, numBlocks, mpi_comm_rows, mpi_comm_cols
complex*16 :: a(lda,matrixCols), q(ldq,matrixCols), tmat(nbw, nbw, numBlocks)
complex*16, parameter :: CZERO = (0.d0,0.d0), CONE = (1.d0,0.d0)
......@@ -3725,9 +3726,10 @@ subroutine tridiag_band_complex(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
! a(lda,*) Distributed system matrix reduced to banded form in the upper diagonal
! a(lda,matrixCols) Distributed system matrix reduced to banded form in the upper diagonal
!
! lda Leading dimension of a
! matrixCols local columns of matrix a
!
! d(na) Diagonal of tridiagonal matrix, set only on PE 0 (output)
!
......@@ -3770,8 +3772,8 @@ subroutine tridiag_band_complex(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm
integer, allocatable :: limits(:), snd_limits(:,:)
integer, allocatable :: block_limits(:)
complex*16, allocatable :: ab(:,:), hh_gath(:,:,:), hh_send(:,:,:)
! dummies for calling redist_band
real*8 :: r_a(1,1), r_ab(1,1)
! ! dummies for calling redist_band
! real*8 :: r_a(1,1), r_ab(1,1)
#ifdef HAVE_DETAILED_TIMINGS
call timer%start("tridiag_band_complex")
......@@ -3814,7 +3816,7 @@ subroutine tridiag_band_complex(na, nb, nblk, a, lda, d, e, matrixCols, mpi_comm
n_off = block_limits(my_pe)*nb
! Redistribute band in a to ab
call redist_band(.false., r_a, a, lda, na, nblk, nb, mpi_comm_rows, mpi_comm_cols, mpi_comm, r_ab, ab)
call redist_band_complex(a, lda, na, nblk, nb, matrixCols, mpi_comm_rows, mpi_comm_cols, mpi_comm, ab)
! Calculate the workload for each sweep in the back transformation
! and the space requirements to hold the HH vectors
......@@ -5566,233 +5568,21 @@ contains
end subroutine
! --------------------------------------------------------------------------------------------------
! redist_band: redistributes band from 2D block cyclic form to 1D band
subroutine redist_band(l_real, r_a, c_a, lda, na, nblk, nbw, mpi_comm_rows, mpi_comm_cols, mpi_comm, r_ab, c_ab)
#ifdef HAVE_DETAILED_TIMINGS
use timings
#endif
implicit none
logical, intent(in) :: l_real
real*8, intent(in) :: r_a(lda, *)
complex*16, intent(in) :: c_a(lda, *)
integer, intent(in) :: lda, na, nblk, nbw, mpi_comm_rows, mpi_comm_cols, mpi_comm
real*8, intent(out) :: r_ab(:,:)
complex*16, intent(out) :: c_ab(:,:)
integer, allocatable :: ncnt_s(:), nstart_s(:), ncnt_r(:), nstart_r(:), &
global_id(:,:), global_id_tmp(:,:), block_limits(:)
real*8, allocatable :: r_sbuf(:,:,:), r_rbuf(:,:,:), r_buf(:,:)
complex*16, allocatable :: c_sbuf(:,:,:), c_rbuf(:,:,:), c_buf(:,:)
integer :: i, j, my_pe, n_pes, my_prow, np_rows, my_pcol, np_cols, &
nfact, np, npr, npc, mpierr, is, js
integer :: nblocks_total, il, jl, l_rows, l_cols, n_off
#ifdef HAVE_DETAILED_TIMINGS
call timer%start("redist_band")
#endif
call mpi_comm_rank(mpi_comm,my_pe,mpierr)
call mpi_comm_size(mpi_comm,n_pes,mpierr)
call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr)
call mpi_comm_size(mpi_comm_rows,np_rows,mpierr)
call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr)
call mpi_comm_size(mpi_comm_cols,np_cols,mpierr)
! Get global_id mapping 2D procssor coordinates to global id
allocate(global_id(0:np_rows-1,0:np_cols-1))
#ifdef WITH_OPENMP
allocate(global_id_tmp(0:np_rows-1,0:np_cols-1))
#endif
global_id(:,:) = 0
global_id(my_prow, my_pcol) = my_pe
#ifdef WITH_OPENMP
global_id_tmp(:,:) = global_id(:,:)
call mpi_allreduce(global_id_tmp, global_id, np_rows*np_cols, mpi_integer, mpi_sum, mpi_comm, mpierr)
deallocate(global_id_tmp)
#else
call mpi_allreduce(mpi_in_place, global_id, np_rows*np_cols, mpi_integer, mpi_sum, mpi_comm, mpierr)
#endif
! Set work distribution
nblocks_total = (na-1)/nbw + 1
allocate(block_limits(0:n_pes))
call divide_band(nblocks_total, n_pes, block_limits)
allocate(ncnt_s(0:n_pes-1))
allocate(nstart_s(0:n_pes-1))
allocate(ncnt_r(0:n_pes-1))
allocate(nstart_r(0:n_pes-1))
nfact = nbw/nblk
! Count how many blocks go to which PE
ncnt_s(:) = 0
np = 0 ! receiver PE number
do j=0,(na-1)/nblk ! loop over rows of blocks
if (j/nfact==block_limits(np+1)) np = np+1
if (mod(j,np_rows) == my_prow) then
do i=0,nfact
if (mod(i+j,np_cols) == my_pcol) then
ncnt_s(np) = ncnt_s(np) + 1
endif
enddo
endif
enddo
! Allocate send buffer
if (l_real) then
allocate(r_sbuf(nblk,nblk,sum(ncnt_s)))
r_sbuf(:,:,:) = 0.
else
allocate(c_sbuf(nblk,nblk,sum(ncnt_s)))
c_sbuf(:,:,:) = 0.
endif
! Determine start offsets in send buffer
nstart_s(0) = 0
do i=1,n_pes-1
nstart_s(i) = nstart_s(i-1) + ncnt_s(i-1)
enddo
! Fill send buffer
l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a
l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local columns of a
np = 0
do j=0,(na-1)/nblk ! loop over rows of blocks
if (j/nfact==block_limits(np+1)) np = np+1
if (mod(j,np_rows) == my_prow) then
do i=0,nfact
if (mod(i+j,np_cols) == my_pcol) then
nstart_s(np) = nstart_s(np) + 1
js = (j/np_rows)*nblk
is = ((i+j)/np_cols)*nblk
jl = MIN(nblk,l_rows-js)
il = MIN(nblk,l_cols-is)
if (l_real) then
r_sbuf(1:jl,1:il,nstart_s(np)) = r_a(js+1:js+jl,is+1:is+il)
else
c_sbuf(1:jl,1:il,nstart_s(np)) = c_a(js+1:js+jl,is+1:is+il)
endif
endif
enddo
endif
enddo
! Count how many blocks we get from which PE
ncnt_r(:) = 0
do j=block_limits(my_pe)*nfact,min(block_limits(my_pe+1)*nfact-1,(na-1)/nblk)
npr = mod(j,np_rows)
do i=0,nfact
npc = mod(i+j,np_cols)
np = global_id(npr,npc)
ncnt_r(np) = ncnt_r(np) + 1
enddo
enddo
! Allocate receive buffer
if (l_real) then
allocate(r_rbuf(nblk,nblk,sum(ncnt_r)))
else
allocate(c_rbuf(nblk,nblk,sum(ncnt_r)))
endif
! Set send counts/send offsets, receive counts/receive offsets
! now actually in variables, not in blocks
ncnt_s(:) = ncnt_s(:)*nblk*nblk
nstart_s(0) = 0
do i=1,n_pes-1
nstart_s(i) = nstart_s(i-1) + ncnt_s(i-1)
enddo
ncnt_r(:) = ncnt_r(:)*nblk*nblk
nstart_r(0) = 0
do i=1,n_pes-1
nstart_r(i) = nstart_r(i-1) + ncnt_r(i-1)
enddo
! Exchange all data with MPI_Alltoallv
if (l_real) then
call MPI_Alltoallv(r_sbuf,ncnt_s,nstart_s,MPI_REAL8,r_rbuf,ncnt_r,nstart_r,MPI_REAL8,mpi_comm,mpierr)
else
call MPI_Alltoallv(c_sbuf,ncnt_s,nstart_s,MPI_COMPLEX16,c_rbuf,ncnt_r,nstart_r,MPI_COMPLEX16,mpi_comm,mpierr)
endif
! set band from receive buffer
ncnt_r(:) = ncnt_r(:)/(nblk*nblk)
nstart_r(0) = 0
do i=1,n_pes-1
nstart_r(i) = nstart_r(i-1) + ncnt_r(i-1)
enddo
if (l_real) then
allocate(r_buf((nfact+1)*nblk,nblk))
else
allocate(c_buf((nfact+1)*nblk,nblk))
endif
! n_off: Offset of ab within band
n_off = block_limits(my_pe)*nbw
do j=block_limits(my_pe)*nfact,min(block_limits(my_pe+1)*nfact-1,(na-1)/nblk)
npr = mod(j,np_rows)
do i=0,nfact
npc = mod(i+j,np_cols)
np = global_id(npr,npc)
nstart_r(np) = nstart_r(np) + 1
if (l_real) then
r_buf(i*nblk+1:i*nblk+nblk,:) = transpose(r_rbuf(:,:,nstart_r(np)))
else
c_buf(i*nblk+1:i*nblk+nblk,:) = conjg(transpose(c_rbuf(:,:,nstart_r(np))))
endif
enddo
do i=1,MIN(nblk,na-j*nblk)
if (l_real) then
r_ab(1:nbw+1,i+j*nblk-n_off) = r_buf(i:i+nbw,i)
else
c_ab(1:nbw+1,i+j*nblk-n_off) = c_buf(i:i+nbw,i)
endif
enddo
enddo
deallocate(ncnt_s, nstart_s)
deallocate(ncnt_r, nstart_r)
deallocate(global_id)
deallocate(block_limits)
if (l_real) then
deallocate(r_sbuf, r_rbuf, r_buf)
else
deallocate(c_sbuf, c_rbuf, c_buf)
endif
#ifdef HAVE_DETAILED_TIMINGS
call timer%stop("redist_band")
#endif
end subroutine
#define DATATYPE REAL
#define BYTESIZE 8
#define REALCASE 1
#include "redist_band.X90"
#undef DATATYPE
#undef BYTESIZE
#undef REALCASE
#define DATATYPE COMPLEX
#define BYTESIZE 16
#define COMPLEXCASE 1
#include "redist_band.X90"
#undef DATATYPE
#undef BYTESIZE
#undef COMPLEXCASE
!---------------------------------------------------------------------------------------------------
! divide_band: sets the work distribution in band
......
! --------------------------------------------------------------------------------------------------
! redist_band: redistributes band from 2D block cyclic form to 1D band
#if REALCASE==1
subroutine redist_band_real(r_a, lda, na, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, mpi_comm, r_ab)
#endif
#if COMPLEXCASE==1
subroutine redist_band_complex(c_a, lda, na, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, mpi_comm, c_ab)
#endif
#ifdef HAVE_DETAILED_TIMINGS
use timings
#endif
implicit none
integer, intent(in) :: lda, na, nblk, nbw, matrixCols, mpi_comm_rows, mpi_comm_cols, mpi_comm
#if REALCASE==1
real*8, intent(in) :: r_a(lda, matrixCols)
#endif
#if COMPLEXCASE==1
complex*16, intent(in) :: c_a(lda, matrixCols)
#endif
#if REALCASE==1
real*8, intent(out) :: r_ab(:,:)
#endif
#if COMPLEXCASE==1
complex*16, intent(out) :: c_ab(:,:)
#endif
integer, allocatable :: ncnt_s(:), nstart_s(:), ncnt_r(:), nstart_r(:), &
global_id(:,:), global_id_tmp(:,:), block_limits(:)
#if REALCASE==1
real*8, allocatable :: r_sbuf(:,:,:), r_rbuf(:,:,:), r_buf(:,:)
#endif
#if COMPLEXCASE==1
complex*16, allocatable :: c_sbuf(:,:,:), c_rbuf(:,:,:), c_buf(:,:)
#endif
integer :: i, j, my_pe, n_pes, my_prow, np_rows, my_pcol, np_cols, &
nfact, np, npr, npc, mpierr, is, js
integer :: nblocks_total, il, jl, l_rows, l_cols, n_off
#ifdef HAVE_DETAILED_TIMINGS
#if REALCASE==1
call timer%start("redist_band_real")
#endif
#if COMPLEXCASE==1
call timer%start("redist_band_complex")
#endif
#endif
call mpi_comm_rank(mpi_comm,my_pe,mpierr)
call mpi_comm_size(mpi_comm,n_pes,mpierr)
call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr)
call mpi_comm_size(mpi_comm_rows,np_rows,mpierr)
call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr)
call mpi_comm_size(mpi_comm_cols,np_cols,mpierr)
! Get global_id mapping 2D procssor coordinates to global id
allocate(global_id(0:np_rows-1,0:np_cols-1))
#ifdef WITH_OPENMP
allocate(global_id_tmp(0:np_rows-1,0:np_cols-1))
#endif