Commit 9710bf08 authored by Andreas Marek's avatar Andreas Marek
Browse files

Split file elpa1.F90 into elpa1.F90 and elpa1_compute.F90

For automatic generation of documentation, the file elpa1.F90
has been splitted into two files, in order to have a lean,
easy-to-understand user interface:

elpa1.F90
the visible user functios, which provide the library calls.
The usage is the same as always

elpa1_compute.F90
all internal routines, which are used by ELPA1 and ELPA2, but
which are never called by the user. These functions are now "hidden"
in the module elpa1_compute, which is used by ELPA1 and ELPA2.

The procedures in elpa1_compute.F90 are identical to the ones in
elpa1.F90 before this split commit. The only -- but lot of --
changes are intendation.
parent 8f82627a
......@@ -10,6 +10,7 @@ lib_LTLIBRARIES = libelpa@SUFFIX@.la
libelpa@SUFFIX@_la_LINK = $(FCLINK) $(AM_LDFLAGS) -version-info $(ELPA_SO_VERSION) -lstdc++
libelpa@SUFFIX@_la_SOURCES = src/elpa_utilities.F90 \
src/elpa1_compute.F90 \
src/elpa1.F90 \
src/elpa2_utilities.F90 \
src/elpa2.F90 \
......
......@@ -54,6 +54,7 @@
module ELPA1
use elpa_utilities
use elpa1_compute
#ifdef HAVE_DETAILED_TIMINGS
use timings
......@@ -69,33 +70,6 @@ module ELPA1
public :: solve_evp_real ! Driver routine for real eigenvalue problem
public :: solve_evp_complex ! Driver routine for complex eigenvalue problem
public :: tridiag_real ! Transform real symmetric matrix to tridiagonal form
public :: trans_ev_real ! Transform eigenvectors of a tridiagonal matrix back
public :: mult_at_b_real ! Multiply real matrices A**T * B
public :: tridiag_complex ! Transform complex hermitian matrix to tridiagonal form
public :: trans_ev_complex ! Transform eigenvectors of a tridiagonal matrix back
public :: mult_ah_b_complex ! Multiply complex matrices A**H * B
public :: solve_tridi ! Solve tridiagonal eigensystem with divide and conquer method
public :: cholesky_real ! Cholesky factorization of a real matrix
public :: invert_trm_real ! Invert real triangular matrix
public :: cholesky_complex ! Cholesky factorization of a complex matrix
public :: invert_trm_complex ! Invert complex triangular matrix
public :: local_index ! Get local index of a block cyclic distributed matrix
public :: least_common_multiple ! Get least common multiple
public :: hh_transform_real
public :: hh_transform_complex
public :: elpa_reduce_add_vectors_complex, elpa_reduce_add_vectors_real
public :: elpa_transpose_vectors_complex, elpa_transpose_vectors_real
!-------------------------------------------------------------------------------
! Timing results, set by every call to solve_evp_xxx
real*8, public :: time_evp_fwd ! forward transformations (to tridiagonal form)
......@@ -106,8 +80,6 @@ module ELPA1
logical, public :: elpa_print_times = .false.
!-------------------------------------------------------------------------------
include 'mpif.h'
contains
......@@ -369,3921 +341,5 @@ function solve_evp_complex(na, nev, a, lda, ev, q, ldq, nblk, matrixCols, mpi_co
end function solve_evp_complex
#define DATATYPE REAL
#define BYTESIZE 8
#define REALCASE 1
#include "elpa_transpose_vectors.X90"
#include "elpa_reduce_add_vectors.X90"
#undef DATATYPE
#undef BYTESIZE
#undef REALCASE
!-------------------------------------------------------------------------------
subroutine tridiag_real(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, d, e, tau)
!-------------------------------------------------------------------------------
! tridiag_real: Reduces a distributed symmetric matrix to tridiagonal form
! (like Scalapack Routine PDSYTRD)
!
! Parameters
!
! na Order of matrix
!
! a(lda,matrixCols) Distributed matrix which should be reduced.
! Distribution is like in Scalapack.
! Opposed to PDSYTRD, a(:,:) must be set completely (upper and lower half)
! a(:,:) is overwritten on exit with the Householder vectors
!
! lda Leading dimension of a
! matrixCols local columns of matrix
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
! mpi_comm_rows
! mpi_comm_cols
! MPI-Communicators for rows/columns
!
! d(na) Diagonal elements (returned), identical on all processors
!
! e(na) Off-Diagonal elements (returned), identical on all processors
!
! tau(na) Factors for the Householder vectors (returned), needed for back transformation
!
!-------------------------------------------------------------------------------
#ifdef HAVE_DETAILED_TIMINGS
use timings
#endif
implicit none
integer na, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols
real*8 a(lda,matrixCols), d(na), e(na), tau(na)
integer, parameter :: max_stored_rows = 32
integer my_prow, my_pcol, np_rows, np_cols, mpierr
integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols
integer l_cols, l_rows, nstor
integer istep, i, j, lcs, lce, lrs, lre
integer tile_size, l_rows_tile, l_cols_tile
#ifdef WITH_OPENMP
integer my_thread, n_threads, max_threads, n_iter
integer omp_get_thread_num, omp_get_num_threads, omp_get_max_threads
#endif
real*8 vav, vnorm2, x, aux(2*max_stored_rows), aux1(2), aux2(2), vrl, xf
real*8, allocatable:: tmp(:), vr(:), vc(:), ur(:), uc(:), vur(:,:), uvc(:,:)
#ifdef WITH_OPENMP
real*8, allocatable:: ur_p(:,:), uc_p(:,:)
#endif
#ifdef HAVE_DETAILED_TIMINGS
call timer%start("tridiag_real")
#endif
call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr)
call mpi_comm_size(mpi_comm_rows,np_rows,mpierr)
call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr)
call mpi_comm_size(mpi_comm_cols,np_cols,mpierr)
! Matrix is split into tiles; work is done only for tiles on the diagonal or above
tile_size = nblk*least_common_multiple(np_rows,np_cols) ! minimum global tile size
tile_size = ((128*max(np_rows,np_cols)-1)/tile_size+1)*tile_size ! make local tiles at least 128 wide
l_rows_tile = tile_size/np_rows ! local rows of a tile
l_cols_tile = tile_size/np_cols ! local cols of a tile
totalblocks = (na-1)/nblk + 1
max_blocks_row = (totalblocks-1)/np_rows + 1
max_blocks_col = (totalblocks-1)/np_cols + 1
max_local_rows = max_blocks_row*nblk
max_local_cols = max_blocks_col*nblk
allocate(tmp(MAX(max_local_rows,max_local_cols)))
allocate(vr(max_local_rows+1))
allocate(ur(max_local_rows))
allocate(vc(max_local_cols))
allocate(uc(max_local_cols))
#ifdef WITH_OPENMP
max_threads = omp_get_max_threads()
allocate(ur_p(max_local_rows,0:max_threads-1))
allocate(uc_p(max_local_cols,0:max_threads-1))
#endif
tmp = 0
vr = 0
ur = 0
vc = 0
uc = 0
allocate(vur(max_local_rows,2*max_stored_rows))
allocate(uvc(max_local_cols,2*max_stored_rows))
d(:) = 0
e(:) = 0
tau(:) = 0
nstor = 0
l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a
l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local cols of a
if(my_prow==prow(na, nblk, np_rows) .and. my_pcol==pcol(na, nblk, np_cols)) d(na) = a(l_rows,l_cols)
do istep=na,3,-1
! Calculate number of local rows and columns of the still remaining matrix
! on the local processor
l_rows = local_index(istep-1, my_prow, np_rows, nblk, -1)
l_cols = local_index(istep-1, my_pcol, np_cols, nblk, -1)
! Calculate vector for Householder transformation on all procs
! owning column istep
if(my_pcol==pcol(istep, nblk, np_cols)) then
! Get vector to be transformed; distribute last element and norm of
! remaining elements to all procs in current column
vr(1:l_rows) = a(1:l_rows,l_cols+1)
if(nstor>0 .and. l_rows>0) then
call DGEMV('N',l_rows,2*nstor,1.d0,vur,ubound(vur,dim=1), &
uvc(l_cols+1,1),ubound(uvc,dim=1),1.d0,vr,1)
endif
if(my_prow==prow(istep-1, nblk, np_rows)) then
aux1(1) = dot_product(vr(1:l_rows-1),vr(1:l_rows-1))
aux1(2) = vr(l_rows)
else
aux1(1) = dot_product(vr(1:l_rows),vr(1:l_rows))
aux1(2) = 0.
endif
call mpi_allreduce(aux1,aux2,2,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
vnorm2 = aux2(1)
vrl = aux2(2)
! Householder transformation
call hh_transform_real(vrl, vnorm2, xf, tau(istep))
! Scale vr and store Householder vector for back transformation
vr(1:l_rows) = vr(1:l_rows) * xf
if(my_prow==prow(istep-1, nblk, np_rows)) then
vr(l_rows) = 1.
e(istep-1) = vrl
endif
a(1:l_rows,l_cols+1) = vr(1:l_rows) ! store Householder vector for back transformation
endif
! Broadcast the Householder vector (and tau) along columns
if(my_pcol==pcol(istep, nblk, np_cols)) vr(l_rows+1) = tau(istep)
call MPI_Bcast(vr,l_rows+1,MPI_REAL8,pcol(istep, nblk, np_cols),mpi_comm_cols,mpierr)
tau(istep) = vr(l_rows+1)
! Transpose Householder vector vr -> vc
call elpa_transpose_vectors_real (vr, ubound(vr,dim=1), mpi_comm_rows, &
vc, ubound(vc,dim=1), mpi_comm_cols, &
1, istep-1, 1, nblk)
! Calculate u = (A + VU**T + UV**T)*v
! For cache efficiency, we use only the upper half of the matrix tiles for this,
! thus the result is partly in uc(:) and partly in ur(:)
uc(1:l_cols) = 0
ur(1:l_rows) = 0
if(l_rows>0 .and. l_cols>0) then
#ifdef WITH_OPENMP
#ifdef HAVE_DETAILED_TIMINGS
call timer%start("OpenMP parallel")
#endif
!$OMP PARALLEL PRIVATE(my_thread,n_threads,n_iter,i,lcs,lce,j,lrs,lre)
my_thread = omp_get_thread_num()
n_threads = omp_get_num_threads()
n_iter = 0
uc_p(1:l_cols,my_thread) = 0.
ur_p(1:l_rows,my_thread) = 0.
#endif
do i=0,(istep-2)/tile_size
lcs = i*l_cols_tile+1
lce = min(l_cols,(i+1)*l_cols_tile)
if(lce<lcs) cycle
do j=0,i
lrs = j*l_rows_tile+1
lre = min(l_rows,(j+1)*l_rows_tile)
if(lre<lrs) cycle
#ifdef WITH_OPENMP
if(mod(n_iter,n_threads) == my_thread) then
call DGEMV('T',lre-lrs+1,lce-lcs+1,1.d0,a(lrs,lcs),lda,vr(lrs),1,1.d0,uc_p(lcs,my_thread),1)
if(i/=j) call DGEMV('N',lre-lrs+1,lce-lcs+1,1.d0,a(lrs,lcs),lda,vc(lcs),1,1.d0,ur_p(lrs,my_thread),1)
endif
n_iter = n_iter+1
#else
call DGEMV('T',lre-lrs+1,lce-lcs+1,1.d0,a(lrs,lcs),lda,vr(lrs),1,1.d0,uc(lcs),1)
if(i/=j) call DGEMV('N',lre-lrs+1,lce-lcs+1,1.d0,a(lrs,lcs),lda,vc(lcs),1,1.d0,ur(lrs),1)
#endif
enddo
enddo
#ifdef WITH_OPENMP
!$OMP END PARALLEL
#ifdef HAVE_DETAILED_TIMINGS
call timer%stop("OpenMP parallel")
#endif
do i=0,max_threads-1
uc(1:l_cols) = uc(1:l_cols) + uc_p(1:l_cols,i)
ur(1:l_rows) = ur(1:l_rows) + ur_p(1:l_rows,i)
enddo
#endif
if(nstor>0) then
call DGEMV('T',l_rows,2*nstor,1.d0,vur,ubound(vur,dim=1),vr,1,0.d0,aux,1)
call DGEMV('N',l_cols,2*nstor,1.d0,uvc,ubound(uvc,dim=1),aux,1,1.d0,uc,1)
endif
endif
! Sum up all ur(:) parts along rows and add them to the uc(:) parts
! on the processors containing the diagonal
! This is only necessary if ur has been calculated, i.e. if the
! global tile size is smaller than the global remaining matrix
if(tile_size < istep-1) then
call elpa_reduce_add_vectors_REAL (ur, ubound(ur,dim=1), mpi_comm_rows, &
uc, ubound(uc,dim=1), mpi_comm_cols, &
istep-1, 1, nblk)
endif
! Sum up all the uc(:) parts, transpose uc -> ur
if(l_cols>0) then
tmp(1:l_cols) = uc(1:l_cols)
call mpi_allreduce(tmp,uc,l_cols,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
endif
call elpa_transpose_vectors_real (uc, ubound(uc,dim=1), mpi_comm_cols, &
ur, ubound(ur,dim=1), mpi_comm_rows, &
1, istep-1, 1, nblk)
! calculate u**T * v (same as v**T * (A + VU**T + UV**T) * v )
x = 0
if(l_cols>0) x = dot_product(vc(1:l_cols),uc(1:l_cols))
call mpi_allreduce(x,vav,1,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr)
! store u and v in the matrices U and V
! these matrices are stored combined in one here
do j=1,l_rows
vur(j,2*nstor+1) = tau(istep)*vr(j)
vur(j,2*nstor+2) = 0.5*tau(istep)*vav*vr(j) - ur(j)
enddo
do j=1,l_cols
uvc(j,2*nstor+1) = 0.5*tau(istep)*vav*vc(j) - uc(j)
uvc(j,2*nstor+2) = tau(istep)*vc(j)
enddo
nstor = nstor+1
! If the limit of max_stored_rows is reached, calculate A + VU**T + UV**T
if(nstor==max_stored_rows .or. istep==3) then
do i=0,(istep-2)/tile_size
lcs = i*l_cols_tile+1
lce = min(l_cols,(i+1)*l_cols_tile)
lrs = 1
lre = min(l_rows,(i+1)*l_rows_tile)
if(lce<lcs .or. lre<lrs) cycle
call dgemm('N','T',lre-lrs+1,lce-lcs+1,2*nstor,1.d0, &
vur(lrs,1),ubound(vur,dim=1),uvc(lcs,1),ubound(uvc,dim=1), &
1.d0,a(lrs,lcs),lda)
enddo
nstor = 0
endif
if(my_prow==prow(istep-1, nblk, np_rows) .and. my_pcol==pcol(istep-1, nblk, np_cols)) then
if(nstor>0) a(l_rows,l_cols) = a(l_rows,l_cols) &
+ dot_product(vur(l_rows,1:2*nstor),uvc(l_cols,1:2*nstor))
d(istep-1) = a(l_rows,l_cols)
endif
enddo
! Store e(1) and d(1)
if(my_prow==prow(1, nblk, np_rows) .and. my_pcol==pcol(2, nblk, np_cols)) e(1) = a(1,l_cols) ! use last l_cols value of loop above
if(my_prow==prow(1, nblk, np_rows) .and. my_pcol==pcol(1, nblk, np_cols)) d(1) = a(1,1)
deallocate(tmp, vr, ur, vc, uc, vur, uvc)
! distribute the arrays d and e to all processors
allocate(tmp(na))
tmp = d
call mpi_allreduce(tmp,d,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
tmp = d
call mpi_allreduce(tmp,d,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr)
tmp = e
call mpi_allreduce(tmp,e,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
tmp = e
call mpi_allreduce(tmp,e,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr)
deallocate(tmp)
#ifdef HAVE_DETAILED_TIMINGS
call timer%stop("tridiag_real")
#endif
end subroutine tridiag_real
!-------------------------------------------------------------------------------
subroutine trans_ev_real(na, nqc, a, lda, tau, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols)
!-------------------------------------------------------------------------------
! trans_ev_real: Transforms the eigenvectors of a tridiagonal matrix back
! to the eigenvectors of the original matrix
! (like Scalapack Routine PDORMTR)
!
! Parameters
!
! na Order of matrix a, number of rows of matrix q
!
! nqc Number of columns of matrix q
!
! a(lda,matrixCols) Matrix containing the Householder vectors (i.e. matrix a after tridiag_real)
! Distribution is like in Scalapack.
!
! lda Leading dimension of a
! matrixCols local columns of matrix a and q
!
! tau(na) Factors of the Householder vectors
!
! q On input: Eigenvectors of tridiagonal matrix
! On output: Transformed eigenvectors
! Distribution is like in Scalapack.
!
! ldq Leading dimension of q
!
! nblk blocksize of cyclic distribution, must be the same in both directions!
!
! mpi_comm_rows
! mpi_comm_cols
! MPI-Communicators for rows/columns
!
!-------------------------------------------------------------------------------
#ifdef HAVE_DETAILED_TIMINGS
use timings
#endif
implicit none
integer na, nqc, lda, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols
real*8 a(lda,matrixCols), q(ldq,matrixCols), tau(na)
integer :: max_stored_rows
integer my_prow, my_pcol, np_rows, np_cols, mpierr
integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols
integer l_cols, l_rows, l_colh, nstor
integer istep, i, n, nc, ic, ics, ice, nb, cur_pcol
real*8, allocatable:: tmp1(:), tmp2(:), hvb(:), hvm(:,:)
real*8, allocatable:: tmat(:,:), h1(:), h2(:)
#ifdef HAVE_DETAILED_TIMINGS
call timer%start("trans_ev_real")
#endif
call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr)
call mpi_comm_size(mpi_comm_rows,np_rows,mpierr)
call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr)
call mpi_comm_size(mpi_comm_cols,np_cols,mpierr)
totalblocks = (na-1)/nblk + 1
max_blocks_row = (totalblocks-1)/np_rows + 1
max_blocks_col = ((nqc-1)/nblk)/np_cols + 1 ! Columns of q!
max_local_rows = max_blocks_row*nblk
max_local_cols = max_blocks_col*nblk
max_stored_rows = (63/nblk+1)*nblk
allocate(tmat(max_stored_rows,max_stored_rows))
allocate(h1(max_stored_rows*max_stored_rows))
allocate(h2(max_stored_rows*max_stored_rows))
allocate(tmp1(max_local_cols*max_stored_rows))
allocate(tmp2(max_local_cols*max_stored_rows))
allocate(hvb(max_local_rows*nblk))
allocate(hvm(max_local_rows,max_stored_rows))
hvm = 0 ! Must be set to 0 !!!
hvb = 0 ! Safety only
l_cols = local_index(nqc, my_pcol, np_cols, nblk, -1) ! Local columns of q
nstor = 0
do istep=1,na,nblk
ics = MAX(istep,3)
ice = MIN(istep+nblk-1,na)
if(ice<ics) cycle
cur_pcol = pcol(istep, nblk, np_cols)
nb = 0
do ic=ics,ice
l_colh = local_index(ic , my_pcol, np_cols, nblk, -1) ! Column of Householder vector
l_rows = local_index(ic-1, my_prow, np_rows, nblk, -1) ! # rows of Householder vector
if(my_pcol==cur_pcol) then
hvb(nb+1:nb+l_rows) = a(1:l_rows,l_colh)
if(my_prow==prow(ic-1, nblk, np_rows)) then
hvb(nb+l_rows) = 1.
endif
endif
nb = nb+l_rows
enddo
if(nb>0) &
call MPI_Bcast(hvb,nb,MPI_REAL8,cur_pcol,mpi_comm_cols,mpierr)
nb = 0
do ic=ics,ice
l_rows = local_index(ic-1, my_prow, np_rows, nblk, -1) ! # rows of Householder vector
hvm(1:l_rows,nstor+1) = hvb(nb+1:nb+l_rows)
nstor = nstor+1
nb = nb+l_rows
enddo
! Please note: for smaller matix sizes (na/np_rows<=256), a value of 32 for nstor is enough!
if(nstor+nblk>max_stored_rows .or. istep+nblk>na .or. (na/np_rows<=256 .and. nstor>=32)) then
! Calculate scalar products of stored vectors.
! This can be done in different ways, we use dsyrk
tmat = 0
if(l_rows>0) &
call dsyrk('U','T',nstor,l_rows,1.d0,hvm,ubound(hvm,dim=1),0.d0,tmat,max_stored_rows)
nc = 0
do n=1,nstor-1
h1(nc+1:nc+n) = tmat(1:n,n+1)
nc = nc+n
enddo
if(nc>0) call mpi_allreduce(h1,h2,nc,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
! Calculate triangular matrix T
nc = 0
tmat(1,1) = tau(ice-nstor+1)
do n=1,nstor-1
call dtrmv('L','T','N',n,tmat,max_stored_rows,h2(nc+1),1)
tmat(n+1,1:n) = -h2(nc+1:nc+n)*tau(ice-nstor+n+1)
tmat(n+1,n+1) = tau(ice-nstor+n+1)
nc = nc+n
enddo
! Q = Q - V * T * V**T * Q
if(l_rows>0) then
call dgemm('T','N',nstor,l_cols,l_rows,1.d0,hvm,ubound(hvm,dim=1), &
q,ldq,0.d0,tmp1,nstor)
else
tmp1(1:l_cols*nstor) = 0
endif
call mpi_allreduce(tmp1,tmp2,nstor*l_cols,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr)
if(l_rows>0) then
call dtrmm('L','L','N','N',nstor,l_cols,1.0d0,tmat,max_stored_rows,tmp2,nstor)
call dgemm('N','N',l_rows,l_cols,nstor,-1.d0,hvm,ubound(hvm,dim=1), &
tmp2,nstor,1.d0,q,ldq)
endif
nstor = 0
endif
enddo
deallocate(tmat, h1, h2, tmp1, tmp2, hvb, hvm)
#ifdef HAVE_DETAILED_TIMINGS
call timer%stop("trans_ev_real")
#endif
end subroutine trans_ev_real
!-------------------------------------------------------------------------------
subroutine mult_at_b_real(uplo_a, uplo_c, na, ncb, a, lda, b, ldb, nblk, mpi_comm_rows, mpi_comm_cols, c, ldc)
!-------------------------------------------------------------------------------
! mult_at_b_real: Performs C := A**T * B
!
! where: A is a square matrix (na,na) which is optionally upper or lower triangular