! This file is part of ELPA. ! ! The ELPA library was originally created by the ELPA consortium, ! consisting of the following organizations: ! ! - Rechenzentrum Garching der Max-Planck-Gesellschaft (RZG), ! - Bergische Universität Wuppertal, Lehrstuhl für angewandte ! Informatik, ! - Technische Universität München, Lehrstuhl für Informatik mit ! Schwerpunkt Wissenschaftliches Rechnen , ! - Fritz-Haber-Institut, Berlin, Abt. Theorie, ! - Max-Plack-Institut für Mathematik in den Naturwissenschaftrn, ! Leipzig, Abt. Komplexe Strukutren in Biologie und Kognition, ! and ! - IBM Deutschland GmbH ! ! ! More information can be found here: ! http://elpa.rzg.mpg.de/ ! ! ELPA is free software: you can redistribute it and/or modify ! it under the terms of the version 3 of the license of the ! GNU Lesser General Public License as published by the Free ! Software Foundation. ! ! ELPA is distributed in the hope that it will be useful, ! but WITHOUT ANY WARRANTY; without even the implied warranty of ! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ! GNU Lesser General Public License for more details. ! ! You should have received a copy of the GNU Lesser General Public License ! along with ELPA. If not, see ! ! ELPA reflects a substantial effort on the part of the original ! ELPA consortium, and we ask you to respect the spirit of the ! license that we chose: i.e., please contribute any changes you ! may have back to the original ELPA library distribution, and keep ! any derivatives of ELPA under the same license that we chose for ! the original distribution, the GNU Lesser General Public License. ! ! ! ELPA1 -- Faster replacements for ScaLAPACK symmetric eigenvalue routines ! ! Copyright of the original code rests with the authors inside the ELPA ! consortium. The copyright of any additional modifications shall rest ! with their original authors, but shall adhere to the licensing terms ! distributed along with the original code in the file "COPYING". #include "config-f90.h" module ELPA1 use elpa_utilities #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none PRIVATE ! By default, all routines contained are private ! The following routines are public: public :: get_elpa_row_col_comms ! Sets MPI row/col communicators public :: solve_evp_real ! Driver routine for real eigenvalue problem public :: solve_evp_complex ! Driver routine for complex eigenvalue problem public :: tridiag_real ! Transform real symmetric matrix to tridiagonal form public :: trans_ev_real ! Transform eigenvectors of a tridiagonal matrix back public :: mult_at_b_real ! Multiply real matrices A**T * B public :: tridiag_complex ! Transform complex hermitian matrix to tridiagonal form public :: trans_ev_complex ! Transform eigenvectors of a tridiagonal matrix back public :: mult_ah_b_complex ! Multiply complex matrices A**H * B public :: solve_tridi ! Solve tridiagonal eigensystem with divide and conquer method public :: cholesky_real ! Cholesky factorization of a real matrix public :: invert_trm_real ! Invert real triangular matrix public :: cholesky_complex ! Cholesky factorization of a complex matrix public :: invert_trm_complex ! Invert complex triangular matrix public :: local_index ! Get local index of a block cyclic distributed matrix public :: least_common_multiple ! Get least common multiple public :: hh_transform_real public :: hh_transform_complex public :: elpa_reduce_add_vectors_complex, elpa_reduce_add_vectors_real public :: elpa_transpose_vectors_complex, elpa_transpose_vectors_real !------------------------------------------------------------------------------- ! Timing results, set by every call to solve_evp_xxx real*8, public :: time_evp_fwd ! forward transformations (to tridiagonal form) real*8, public :: time_evp_solve ! time for solving the tridiagonal system real*8, public :: time_evp_back ! time for back transformations of eigenvectors ! Set elpa_print_times to .true. for explicit timing outputs logical, public :: elpa_print_times = .false. !------------------------------------------------------------------------------- include 'mpif.h' contains !------------------------------------------------------------------------------- function get_elpa_row_col_comms(mpi_comm_global, my_prow, my_pcol, mpi_comm_rows, mpi_comm_cols) result(mpierr) !------------------------------------------------------------------------------- ! get_elpa_row_col_comms: ! All ELPA routines need MPI communicators for communicating within ! rows or columns of processes, these are set here. ! mpi_comm_rows/mpi_comm_cols can be free'd with MPI_Comm_free if not used any more. ! ! Parameters ! ! mpi_comm_global Global communicator for the calculations (in) ! ! my_prow Row coordinate of the calling process in the process grid (in) ! ! my_pcol Column coordinate of the calling process in the process grid (in) ! ! mpi_comm_rows Communicator for communicating within rows of processes (out) ! ! mpi_comm_cols Communicator for communicating within columns of processes (out) ! !------------------------------------------------------------------------------- implicit none integer, intent(in) :: mpi_comm_global, my_prow, my_pcol integer, intent(out) :: mpi_comm_rows, mpi_comm_cols integer :: mpierr ! mpi_comm_rows is used for communicating WITHIN rows, i.e. all processes ! having the same column coordinate share one mpi_comm_rows. ! So the "color" for splitting is my_pcol and the "key" is my row coordinate. ! Analogous for mpi_comm_cols call mpi_comm_split(mpi_comm_global,my_pcol,my_prow,mpi_comm_rows,mpierr) call mpi_comm_split(mpi_comm_global,my_prow,my_pcol,mpi_comm_cols,mpierr) end function get_elpa_row_col_comms !------------------------------------------------------------------------------- function solve_evp_real(na, nev, a, lda, ev, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) result(success) !------------------------------------------------------------------------------- ! solve_evp_real: Solves the real eigenvalue problem ! ! Parameters ! ! na Order of matrix a ! ! nev Number of eigenvalues needed. ! The smallest nev eigenvalues/eigenvectors are calculated. ! ! a(lda,matrixCols) Distributed matrix for which eigenvalues are to be computed. ! Distribution is like in Scalapack. ! The full matrix must be set (not only one half like in scalapack). ! Destroyed on exit (upper and lower half). ! ! lda Leading dimension of a ! ! ev(na) On output: eigenvalues of a, every processor gets the complete set ! ! q(ldq,matrixCols) On output: Eigenvectors of a ! Distribution is like in Scalapack. ! Must be always dimensioned to the full size (corresponding to (na,na)) ! even if only a part of the eigenvalues is needed. ! ! ldq Leading dimension of q ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer, intent(in) :: na, nev, lda, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols real*8 :: a(lda,matrixCols), ev(na), q(ldq,matrixCols) integer :: my_prow, my_pcol, mpierr real*8, allocatable :: e(:), tau(:) real*8 :: ttt0, ttt1 logical :: success logical, save :: firstCall = .true. logical :: wantDebug #ifdef HAVE_DETAILED_TIMINGS call timer%start("solve_evp_real") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) success = .true. wantDebug = .false. if (firstCall) then ! are debug messages desired? wantDebug = debug_messages_via_environment_variable() firstCall = .false. endif allocate(e(na), tau(na)) ttt0 = MPI_Wtime() call tridiag_real(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, ev, e, tau) ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time tridiag_real :',ttt1-ttt0 time_evp_fwd = ttt1-ttt0 ttt0 = MPI_Wtime() call solve_tridi(na, nev, ev, e, q, ldq, nblk, matrixCols, mpi_comm_rows, & mpi_comm_cols, wantDebug, success) if (.not.(success)) return ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time solve_tridi :',ttt1-ttt0 time_evp_solve = ttt1-ttt0 ttt0 = MPI_Wtime() call trans_ev_real(na, nev, a, lda, tau, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time trans_ev_real:',ttt1-ttt0 time_evp_back = ttt1-ttt0 deallocate(e, tau) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_evp_real") #endif end function solve_evp_real !------------------------------------------------------------------------------- function solve_evp_complex(na, nev, a, lda, ev, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) result(success) !------------------------------------------------------------------------------- ! solve_evp_complex: Solves the complex eigenvalue problem ! ! Parameters ! ! na Order of matrix a ! ! nev Number of eigenvalues needed ! The smallest nev eigenvalues/eigenvectors are calculated. ! ! a(lda,matrixCols) Distributed matrix for which eigenvalues are to be computed. ! Distribution is like in Scalapack. ! The full matrix must be set (not only one half like in scalapack). ! Destroyed on exit (upper and lower half). ! ! lda Leading dimension of a ! ! ev(na) On output: eigenvalues of a, every processor gets the complete set ! ! q(ldq,matrixCols) On output: Eigenvectors of a ! Distribution is like in Scalapack. ! Must be always dimensioned to the full size (corresponding to (na,na)) ! even if only a part of the eigenvalues is needed. ! ! ldq Leading dimension of q ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer, intent(in) :: na, nev, lda, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols complex*16 :: a(lda,matrixCols), q(ldq,matrixCols) real*8 :: ev(na) integer :: my_prow, my_pcol, np_rows, np_cols, mpierr integer :: l_rows, l_cols, l_cols_nev real*8, allocatable :: q_real(:,:), e(:) complex*16, allocatable :: tau(:) real*8 ttt0, ttt1 logical :: success logical, save :: firstCall = .true. logical :: wantDebug #ifdef HAVE_DETAILED_TIMINGS call timer%start("solve_evp_complex") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) success = .true. wantDebug = .false. if (firstCall) then ! are debug messages desired? wantDebug = debug_messages_via_environment_variable() firstCall = .false. endif l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a and q l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local columns of q l_cols_nev = local_index(nev, my_pcol, np_cols, nblk, -1) ! Local columns corresponding to nev allocate(e(na), tau(na)) allocate(q_real(l_rows,l_cols)) ttt0 = MPI_Wtime() call tridiag_complex(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, ev, e, tau) ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time tridiag_complex :',ttt1-ttt0 time_evp_fwd = ttt1-ttt0 ttt0 = MPI_Wtime() call solve_tridi(na, nev, ev, e, q_real, l_rows, nblk, matrixCols, mpi_comm_rows, & mpi_comm_cols, wantDebug, success) if (.not.(success)) return ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time solve_tridi :',ttt1-ttt0 time_evp_solve = ttt1-ttt0 ttt0 = MPI_Wtime() q(1:l_rows,1:l_cols_nev) = q_real(1:l_rows,1:l_cols_nev) call trans_ev_complex(na, nev, a, lda, tau, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) ttt1 = MPI_Wtime() if(my_prow==0 .and. my_pcol==0 .and. elpa_print_times) write(error_unit,*) 'Time trans_ev_complex:',ttt1-ttt0 time_evp_back = ttt1-ttt0 deallocate(q_real) deallocate(e, tau) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_evp_complex") #endif end function solve_evp_complex #define DATATYPE REAL #define BYTESIZE 8 #define REALCASE 1 #include "elpa_transpose_vectors.X90" #include "elpa_reduce_add_vectors.X90" #undef DATATYPE #undef BYTESIZE #undef REALCASE !------------------------------------------------------------------------------- subroutine tridiag_real(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, d, e, tau) !------------------------------------------------------------------------------- ! tridiag_real: Reduces a distributed symmetric matrix to tridiagonal form ! (like Scalapack Routine PDSYTRD) ! ! Parameters ! ! na Order of matrix ! ! a(lda,matrixCols) Distributed matrix which should be reduced. ! Distribution is like in Scalapack. ! Opposed to PDSYTRD, a(:,:) must be set completely (upper and lower half) ! a(:,:) is overwritten on exit with the Householder vectors ! ! lda Leading dimension of a ! matrixCols local columns of matrix ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! ! d(na) Diagonal elements (returned), identical on all processors ! ! e(na) Off-Diagonal elements (returned), identical on all processors ! ! tau(na) Factors for the Householder vectors (returned), needed for back transformation ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer na, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols real*8 a(lda,matrixCols), d(na), e(na), tau(na) integer, parameter :: max_stored_rows = 32 integer my_prow, my_pcol, np_rows, np_cols, mpierr integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols integer l_cols, l_rows, nstor integer istep, i, j, lcs, lce, lrs, lre integer tile_size, l_rows_tile, l_cols_tile #ifdef WITH_OPENMP integer my_thread, n_threads, max_threads, n_iter integer omp_get_thread_num, omp_get_num_threads, omp_get_max_threads #endif real*8 vav, vnorm2, x, aux(2*max_stored_rows), aux1(2), aux2(2), vrl, xf real*8, allocatable:: tmp(:), vr(:), vc(:), ur(:), uc(:), vur(:,:), uvc(:,:) #ifdef WITH_OPENMP real*8, allocatable:: ur_p(:,:), uc_p(:,:) #endif #ifdef HAVE_DETAILED_TIMINGS call timer%start("tridiag_real") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) ! Matrix is split into tiles; work is done only for tiles on the diagonal or above tile_size = nblk*least_common_multiple(np_rows,np_cols) ! minimum global tile size tile_size = ((128*max(np_rows,np_cols)-1)/tile_size+1)*tile_size ! make local tiles at least 128 wide l_rows_tile = tile_size/np_rows ! local rows of a tile l_cols_tile = tile_size/np_cols ! local cols of a tile totalblocks = (na-1)/nblk + 1 max_blocks_row = (totalblocks-1)/np_rows + 1 max_blocks_col = (totalblocks-1)/np_cols + 1 max_local_rows = max_blocks_row*nblk max_local_cols = max_blocks_col*nblk allocate(tmp(MAX(max_local_rows,max_local_cols))) allocate(vr(max_local_rows+1)) allocate(ur(max_local_rows)) allocate(vc(max_local_cols)) allocate(uc(max_local_cols)) #ifdef WITH_OPENMP max_threads = omp_get_max_threads() allocate(ur_p(max_local_rows,0:max_threads-1)) allocate(uc_p(max_local_cols,0:max_threads-1)) #endif tmp = 0 vr = 0 ur = 0 vc = 0 uc = 0 allocate(vur(max_local_rows,2*max_stored_rows)) allocate(uvc(max_local_cols,2*max_stored_rows)) d(:) = 0 e(:) = 0 tau(:) = 0 nstor = 0 l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local cols of a if(my_prow==prow(na, nblk, np_rows) .and. my_pcol==pcol(na, nblk, np_cols)) d(na) = a(l_rows,l_cols) do istep=na,3,-1 ! Calculate number of local rows and columns of the still remaining matrix ! on the local processor l_rows = local_index(istep-1, my_prow, np_rows, nblk, -1) l_cols = local_index(istep-1, my_pcol, np_cols, nblk, -1) ! Calculate vector for Householder transformation on all procs ! owning column istep if(my_pcol==pcol(istep, nblk, np_cols)) then ! Get vector to be transformed; distribute last element and norm of ! remaining elements to all procs in current column vr(1:l_rows) = a(1:l_rows,l_cols+1) if(nstor>0 .and. l_rows>0) then call DGEMV('N',l_rows,2*nstor,1.d0,vur,ubound(vur,dim=1), & uvc(l_cols+1,1),ubound(uvc,dim=1),1.d0,vr,1) endif if(my_prow==prow(istep-1, nblk, np_rows)) then aux1(1) = dot_product(vr(1:l_rows-1),vr(1:l_rows-1)) aux1(2) = vr(l_rows) else aux1(1) = dot_product(vr(1:l_rows),vr(1:l_rows)) aux1(2) = 0. endif call mpi_allreduce(aux1,aux2,2,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) vnorm2 = aux2(1) vrl = aux2(2) ! Householder transformation call hh_transform_real(vrl, vnorm2, xf, tau(istep)) ! Scale vr and store Householder vector for back transformation vr(1:l_rows) = vr(1:l_rows) * xf if(my_prow==prow(istep-1, nblk, np_rows)) then vr(l_rows) = 1. e(istep-1) = vrl endif a(1:l_rows,l_cols+1) = vr(1:l_rows) ! store Householder vector for back transformation endif ! Broadcast the Householder vector (and tau) along columns if(my_pcol==pcol(istep, nblk, np_cols)) vr(l_rows+1) = tau(istep) call MPI_Bcast(vr,l_rows+1,MPI_REAL8,pcol(istep, nblk, np_cols),mpi_comm_cols,mpierr) tau(istep) = vr(l_rows+1) ! Transpose Householder vector vr -> vc call elpa_transpose_vectors_real (vr, ubound(vr,dim=1), mpi_comm_rows, & vc, ubound(vc,dim=1), mpi_comm_cols, & 1, istep-1, 1, nblk) ! Calculate u = (A + VU**T + UV**T)*v ! For cache efficiency, we use only the upper half of the matrix tiles for this, ! thus the result is partly in uc(:) and partly in ur(:) uc(1:l_cols) = 0 ur(1:l_rows) = 0 if(l_rows>0 .and. l_cols>0) then #ifdef WITH_OPENMP #ifdef HAVE_DETAILED_TIMINGS call timer%start("OpenMP parallel") #endif !$OMP PARALLEL PRIVATE(my_thread,n_threads,n_iter,i,lcs,lce,j,lrs,lre) my_thread = omp_get_thread_num() n_threads = omp_get_num_threads() n_iter = 0 uc_p(1:l_cols,my_thread) = 0. ur_p(1:l_rows,my_thread) = 0. #endif do i=0,(istep-2)/tile_size lcs = i*l_cols_tile+1 lce = min(l_cols,(i+1)*l_cols_tile) if(lce0) then call DGEMV('T',l_rows,2*nstor,1.d0,vur,ubound(vur,dim=1),vr,1,0.d0,aux,1) call DGEMV('N',l_cols,2*nstor,1.d0,uvc,ubound(uvc,dim=1),aux,1,1.d0,uc,1) endif endif ! Sum up all ur(:) parts along rows and add them to the uc(:) parts ! on the processors containing the diagonal ! This is only necessary if ur has been calculated, i.e. if the ! global tile size is smaller than the global remaining matrix if(tile_size < istep-1) then call elpa_reduce_add_vectors_REAL (ur, ubound(ur,dim=1), mpi_comm_rows, & uc, ubound(uc,dim=1), mpi_comm_cols, & istep-1, 1, nblk) endif ! Sum up all the uc(:) parts, transpose uc -> ur if(l_cols>0) then tmp(1:l_cols) = uc(1:l_cols) call mpi_allreduce(tmp,uc,l_cols,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) endif call elpa_transpose_vectors_real (uc, ubound(uc,dim=1), mpi_comm_cols, & ur, ubound(ur,dim=1), mpi_comm_rows, & 1, istep-1, 1, nblk) ! calculate u**T * v (same as v**T * (A + VU**T + UV**T) * v ) x = 0 if(l_cols>0) x = dot_product(vc(1:l_cols),uc(1:l_cols)) call mpi_allreduce(x,vav,1,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr) ! store u and v in the matrices U and V ! these matrices are stored combined in one here do j=1,l_rows vur(j,2*nstor+1) = tau(istep)*vr(j) vur(j,2*nstor+2) = 0.5*tau(istep)*vav*vr(j) - ur(j) enddo do j=1,l_cols uvc(j,2*nstor+1) = 0.5*tau(istep)*vav*vc(j) - uc(j) uvc(j,2*nstor+2) = tau(istep)*vc(j) enddo nstor = nstor+1 ! If the limit of max_stored_rows is reached, calculate A + VU**T + UV**T if(nstor==max_stored_rows .or. istep==3) then do i=0,(istep-2)/tile_size lcs = i*l_cols_tile+1 lce = min(l_cols,(i+1)*l_cols_tile) lrs = 1 lre = min(l_rows,(i+1)*l_rows_tile) if(lce0) a(l_rows,l_cols) = a(l_rows,l_cols) & + dot_product(vur(l_rows,1:2*nstor),uvc(l_cols,1:2*nstor)) d(istep-1) = a(l_rows,l_cols) endif enddo ! Store e(1) and d(1) if(my_prow==prow(1, nblk, np_rows) .and. my_pcol==pcol(2, nblk, np_cols)) e(1) = a(1,l_cols) ! use last l_cols value of loop above if(my_prow==prow(1, nblk, np_rows) .and. my_pcol==pcol(1, nblk, np_cols)) d(1) = a(1,1) deallocate(tmp, vr, ur, vc, uc, vur, uvc) ! distribute the arrays d and e to all processors allocate(tmp(na)) tmp = d call mpi_allreduce(tmp,d,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) tmp = d call mpi_allreduce(tmp,d,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr) tmp = e call mpi_allreduce(tmp,e,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) tmp = e call mpi_allreduce(tmp,e,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr) deallocate(tmp) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("tridiag_real") #endif end subroutine tridiag_real !------------------------------------------------------------------------------- subroutine trans_ev_real(na, nqc, a, lda, tau, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) !------------------------------------------------------------------------------- ! trans_ev_real: Transforms the eigenvectors of a tridiagonal matrix back ! to the eigenvectors of the original matrix ! (like Scalapack Routine PDORMTR) ! ! Parameters ! ! na Order of matrix a, number of rows of matrix q ! ! nqc Number of columns of matrix q ! ! a(lda,matrixCols) Matrix containing the Householder vectors (i.e. matrix a after tridiag_real) ! Distribution is like in Scalapack. ! ! lda Leading dimension of a ! matrixCols local columns of matrix a and q ! ! tau(na) Factors of the Householder vectors ! ! q On input: Eigenvectors of tridiagonal matrix ! On output: Transformed eigenvectors ! Distribution is like in Scalapack. ! ! ldq Leading dimension of q ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer na, nqc, lda, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols real*8 a(lda,matrixCols), q(ldq,matrixCols), tau(na) integer :: max_stored_rows integer my_prow, my_pcol, np_rows, np_cols, mpierr integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols integer l_cols, l_rows, l_colh, nstor integer istep, i, n, nc, ic, ics, ice, nb, cur_pcol real*8, allocatable:: tmp1(:), tmp2(:), hvb(:), hvm(:,:) real*8, allocatable:: tmat(:,:), h1(:), h2(:) #ifdef HAVE_DETAILED_TIMINGS call timer%start("trans_ev_real") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) totalblocks = (na-1)/nblk + 1 max_blocks_row = (totalblocks-1)/np_rows + 1 max_blocks_col = ((nqc-1)/nblk)/np_cols + 1 ! Columns of q! max_local_rows = max_blocks_row*nblk max_local_cols = max_blocks_col*nblk max_stored_rows = (63/nblk+1)*nblk allocate(tmat(max_stored_rows,max_stored_rows)) allocate(h1(max_stored_rows*max_stored_rows)) allocate(h2(max_stored_rows*max_stored_rows)) allocate(tmp1(max_local_cols*max_stored_rows)) allocate(tmp2(max_local_cols*max_stored_rows)) allocate(hvb(max_local_rows*nblk)) allocate(hvm(max_local_rows,max_stored_rows)) hvm = 0 ! Must be set to 0 !!! hvb = 0 ! Safety only l_cols = local_index(nqc, my_pcol, np_cols, nblk, -1) ! Local columns of q nstor = 0 do istep=1,na,nblk ics = MAX(istep,3) ice = MIN(istep+nblk-1,na) if(ice0) & call MPI_Bcast(hvb,nb,MPI_REAL8,cur_pcol,mpi_comm_cols,mpierr) nb = 0 do ic=ics,ice l_rows = local_index(ic-1, my_prow, np_rows, nblk, -1) ! # rows of Householder vector hvm(1:l_rows,nstor+1) = hvb(nb+1:nb+l_rows) nstor = nstor+1 nb = nb+l_rows enddo ! Please note: for smaller matix sizes (na/np_rows<=256), a value of 32 for nstor is enough! if(nstor+nblk>max_stored_rows .or. istep+nblk>na .or. (na/np_rows<=256 .and. nstor>=32)) then ! Calculate scalar products of stored vectors. ! This can be done in different ways, we use dsyrk tmat = 0 if(l_rows>0) & call dsyrk('U','T',nstor,l_rows,1.d0,hvm,ubound(hvm,dim=1),0.d0,tmat,max_stored_rows) nc = 0 do n=1,nstor-1 h1(nc+1:nc+n) = tmat(1:n,n+1) nc = nc+n enddo if(nc>0) call mpi_allreduce(h1,h2,nc,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) ! Calculate triangular matrix T nc = 0 tmat(1,1) = tau(ice-nstor+1) do n=1,nstor-1 call dtrmv('L','T','N',n,tmat,max_stored_rows,h2(nc+1),1) tmat(n+1,1:n) = -h2(nc+1:nc+n)*tau(ice-nstor+n+1) tmat(n+1,n+1) = tau(ice-nstor+n+1) nc = nc+n enddo ! Q = Q - V * T * V**T * Q if(l_rows>0) then call dgemm('T','N',nstor,l_cols,l_rows,1.d0,hvm,ubound(hvm,dim=1), & q,ldq,0.d0,tmp1,nstor) else tmp1(1:l_cols*nstor) = 0 endif call mpi_allreduce(tmp1,tmp2,nstor*l_cols,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) if(l_rows>0) then call dtrmm('L','L','N','N',nstor,l_cols,1.0d0,tmat,max_stored_rows,tmp2,nstor) call dgemm('N','N',l_rows,l_cols,nstor,-1.d0,hvm,ubound(hvm,dim=1), & tmp2,nstor,1.d0,q,ldq) endif nstor = 0 endif enddo deallocate(tmat, h1, h2, tmp1, tmp2, hvb, hvm) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("trans_ev_real") #endif end subroutine trans_ev_real !------------------------------------------------------------------------------- subroutine mult_at_b_real(uplo_a, uplo_c, na, ncb, a, lda, b, ldb, nblk, mpi_comm_rows, mpi_comm_cols, c, ldc) !------------------------------------------------------------------------------- ! mult_at_b_real: Performs C := A**T * B ! ! where: A is a square matrix (na,na) which is optionally upper or lower triangular ! B is a (na,ncb) matrix ! C is a (na,ncb) matrix where optionally only the upper or lower ! triangle may be computed ! ! Parameters ! ! uplo_a 'U' if A is upper triangular ! 'L' if A is lower triangular ! anything else if A is a full matrix ! Please note: This pertains to the original A (as set in the calling program) ! whereas the transpose of A is used for calculations ! If uplo_a is 'U' or 'L', the other triangle is not used at all, ! i.e. it may contain arbitrary numbers ! ! uplo_c 'U' if only the upper diagonal part of C is needed ! 'L' if only the upper diagonal part of C is needed ! anything else if the full matrix C is needed ! Please note: Even when uplo_c is 'U' or 'L', the other triangle may be ! written to a certain extent, i.e. one shouldn't rely on the content there! ! ! na Number of rows/columns of A, number of rows of B and C ! ! ncb Number of columns of B and C ! ! a Matrix A ! ! lda Leading dimension of a ! ! b Matrix B ! ! ldb Leading dimension of b ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! ! c Matrix C ! ! ldc Leading dimension of c ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none character*1 uplo_a, uplo_c integer na, ncb, lda, ldb, nblk, mpi_comm_rows, mpi_comm_cols, ldc real*8 a(lda,*), b(ldb,*), c(ldc,*) integer my_prow, my_pcol, np_rows, np_cols, mpierr integer l_cols, l_rows, l_rows_np integer np, n, nb, nblk_mult, lrs, lre, lcs, lce integer gcol_min, gcol, goff integer nstor, nr_done, noff, np_bc, n_aux_bc, nvals integer, allocatable :: lrs_save(:), lre_save(:) logical a_lower, a_upper, c_lower, c_upper real*8, allocatable:: aux_mat(:,:), aux_bc(:), tmp1(:,:), tmp2(:,:) #ifdef HAVE_DETAILED_TIMINGS call timer%start("mult_at_b_real") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a and b l_cols = local_index(ncb, my_pcol, np_cols, nblk, -1) ! Local cols of b ! Block factor for matrix multiplications, must be a multiple of nblk if(na/np_rows<=256) then nblk_mult = (31/nblk+1)*nblk else nblk_mult = (63/nblk+1)*nblk endif allocate(aux_mat(l_rows,nblk_mult)) allocate(aux_bc(l_rows*nblk)) allocate(lrs_save(nblk)) allocate(lre_save(nblk)) a_lower = .false. a_upper = .false. c_lower = .false. c_upper = .false. if(uplo_a=='u' .or. uplo_a=='U') a_upper = .true. if(uplo_a=='l' .or. uplo_a=='L') a_lower = .true. if(uplo_c=='u' .or. uplo_c=='U') c_upper = .true. if(uplo_c=='l' .or. uplo_c=='L') c_lower = .true. ! Build up the result matrix by processor rows do np = 0, np_rows-1 ! In this turn, procs of row np assemble the result l_rows_np = local_index(na, np, np_rows, nblk, -1) ! local rows on receiving processors nr_done = 0 ! Number of rows done aux_mat = 0 nstor = 0 ! Number of columns stored in aux_mat ! Loop over the blocks on row np do nb=0,(l_rows_np-1)/nblk goff = nb*np_rows + np ! Global offset in blocks corresponding to nb ! Get the processor column which owns this block (A is transposed, so we need the column) ! and the offset in blocks within this column. ! The corresponding block column in A is then broadcast to all for multiplication with B np_bc = MOD(goff,np_cols) noff = goff/np_cols n_aux_bc = 0 ! Gather up the complete block column of A on the owner do n = 1, min(l_rows_np-nb*nblk,nblk) ! Loop over columns to be broadcast gcol = goff*nblk + n ! global column corresponding to n if(nstor==0 .and. n==1) gcol_min = gcol lrs = 1 ! 1st local row number for broadcast lre = l_rows ! last local row number for broadcast if(a_lower) lrs = local_index(gcol, my_prow, np_rows, nblk, +1) if(a_upper) lre = local_index(gcol, my_prow, np_rows, nblk, -1) if(lrs<=lre) then nvals = lre-lrs+1 if(my_pcol == np_bc) aux_bc(n_aux_bc+1:n_aux_bc+nvals) = a(lrs:lre,noff*nblk+n) n_aux_bc = n_aux_bc + nvals endif lrs_save(n) = lrs lre_save(n) = lre enddo ! Broadcast block column call MPI_Bcast(aux_bc,n_aux_bc,MPI_REAL8,np_bc,mpi_comm_cols,mpierr) ! Insert what we got in aux_mat n_aux_bc = 0 do n = 1, min(l_rows_np-nb*nblk,nblk) nstor = nstor+1 lrs = lrs_save(n) lre = lre_save(n) if(lrs<=lre) then nvals = lre-lrs+1 aux_mat(lrs:lre,nstor) = aux_bc(n_aux_bc+1:n_aux_bc+nvals) n_aux_bc = n_aux_bc + nvals endif enddo ! If we got nblk_mult columns in aux_mat or this is the last block ! do the matrix multiplication if(nstor==nblk_mult .or. nb*nblk+nblk >= l_rows_np) then lrs = 1 ! 1st local row number for multiply lre = l_rows ! last local row number for multiply if(a_lower) lrs = local_index(gcol_min, my_prow, np_rows, nblk, +1) if(a_upper) lre = local_index(gcol, my_prow, np_rows, nblk, -1) lcs = 1 ! 1st local col number for multiply lce = l_cols ! last local col number for multiply if(c_upper) lcs = local_index(gcol_min, my_pcol, np_cols, nblk, +1) if(c_lower) lce = MIN(local_index(gcol, my_pcol, np_cols, nblk, -1),l_cols) if(lcs<=lce) then allocate(tmp1(nstor,lcs:lce),tmp2(nstor,lcs:lce)) if(lrs<=lre) then call dgemm('T','N',nstor,lce-lcs+1,lre-lrs+1,1.d0,aux_mat(lrs,1),ubound(aux_mat,dim=1), & b(lrs,lcs),ldb,0.d0,tmp1,nstor) else tmp1 = 0 endif ! Sum up the results and send to processor row np call mpi_reduce(tmp1,tmp2,nstor*(lce-lcs+1),MPI_REAL8,MPI_SUM,np,mpi_comm_rows,mpierr) ! Put the result into C if(my_prow==np) c(nr_done+1:nr_done+nstor,lcs:lce) = tmp2(1:nstor,lcs:lce) deallocate(tmp1,tmp2) endif nr_done = nr_done+nstor nstor=0 aux_mat(:,:)=0 endif enddo enddo deallocate(aux_mat, aux_bc, lrs_save, lre_save) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("mult_at_b_real") #endif end subroutine mult_at_b_real !------------------------------------------------------------------------------- #define DATATYPE COMPLEX #define BYTESIZE 16 #define COMPLEXCASE 1 #include "elpa_transpose_vectors.X90" #include "elpa_reduce_add_vectors.X90" #undef DATATYPE #undef BYTESIZE #undef COMPLEXCASE subroutine tridiag_complex(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, d, e, tau) !------------------------------------------------------------------------------- ! tridiag_complex: Reduces a distributed hermitian matrix to tridiagonal form ! (like Scalapack Routine PZHETRD) ! ! Parameters ! ! na Order of matrix ! ! a(lda,matrixCols) Distributed matrix which should be reduced. ! Distribution is like in Scalapack. ! Opposed to PZHETRD, a(:,:) must be set completely (upper and lower half) ! a(:,:) is overwritten on exit with the Householder vectors ! ! lda Leading dimension of a ! matrixCols local columns of matrix a ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! ! d(na) Diagonal elements (returned), identical on all processors ! ! e(na) Off-Diagonal elements (returned), identical on all processors ! ! tau(na) Factors for the Householder vectors (returned), needed for back transformation ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer na, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols complex*16 a(lda,matrixCols), tau(na) real*8 d(na), e(na) integer, parameter :: max_stored_rows = 32 complex*16, parameter :: CZERO = (0.d0,0.d0), CONE = (1.d0,0.d0) integer my_prow, my_pcol, np_rows, np_cols, mpierr integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols integer l_cols, l_rows, nstor integer istep, i, j, lcs, lce, lrs, lre integer tile_size, l_rows_tile, l_cols_tile #ifdef WITH_OPENMP integer my_thread, n_threads, max_threads, n_iter integer omp_get_thread_num, omp_get_num_threads, omp_get_max_threads #endif real*8 vnorm2 complex*16 vav, xc, aux(2*max_stored_rows), aux1(2), aux2(2), vrl, xf complex*16, allocatable:: tmp(:), vr(:), vc(:), ur(:), uc(:), vur(:,:), uvc(:,:) #ifdef WITH_OPENMP complex*16, allocatable:: ur_p(:,:), uc_p(:,:) #endif real*8, allocatable:: tmpr(:) #ifdef HAVE_DETAILED_TIMINGS call timer%start("tridiag_complex") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) ! Matrix is split into tiles; work is done only for tiles on the diagonal or above tile_size = nblk*least_common_multiple(np_rows,np_cols) ! minimum global tile size tile_size = ((128*max(np_rows,np_cols)-1)/tile_size+1)*tile_size ! make local tiles at least 128 wide l_rows_tile = tile_size/np_rows ! local rows of a tile l_cols_tile = tile_size/np_cols ! local cols of a tile totalblocks = (na-1)/nblk + 1 max_blocks_row = (totalblocks-1)/np_rows + 1 max_blocks_col = (totalblocks-1)/np_cols + 1 max_local_rows = max_blocks_row*nblk max_local_cols = max_blocks_col*nblk allocate(tmp(MAX(max_local_rows,max_local_cols))) allocate(vr(max_local_rows+1)) allocate(ur(max_local_rows)) allocate(vc(max_local_cols)) allocate(uc(max_local_cols)) #ifdef WITH_OPENMP max_threads = omp_get_max_threads() allocate(ur_p(max_local_rows,0:max_threads-1)) allocate(uc_p(max_local_cols,0:max_threads-1)) #endif tmp = 0 vr = 0 ur = 0 vc = 0 uc = 0 allocate(vur(max_local_rows,2*max_stored_rows)) allocate(uvc(max_local_cols,2*max_stored_rows)) d(:) = 0 e(:) = 0 tau(:) = 0 nstor = 0 l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local cols of a if(my_prow==prow(na, nblk, np_rows) .and. my_pcol==pcol(na, nblk, np_cols)) d(na) = a(l_rows,l_cols) do istep=na,3,-1 ! Calculate number of local rows and columns of the still remaining matrix ! on the local processor l_rows = local_index(istep-1, my_prow, np_rows, nblk, -1) l_cols = local_index(istep-1, my_pcol, np_cols, nblk, -1) ! Calculate vector for Householder transformation on all procs ! owning column istep if(my_pcol==pcol(istep, nblk, np_cols)) then ! Get vector to be transformed; distribute last element and norm of ! remaining elements to all procs in current column vr(1:l_rows) = a(1:l_rows,l_cols+1) if(nstor>0 .and. l_rows>0) then aux(1:2*nstor) = conjg(uvc(l_cols+1,1:2*nstor)) call ZGEMV('N',l_rows,2*nstor,CONE,vur,ubound(vur,dim=1), & aux,1,CONE,vr,1) endif if(my_prow==prow(istep-1, nblk, np_rows)) then aux1(1) = dot_product(vr(1:l_rows-1),vr(1:l_rows-1)) aux1(2) = vr(l_rows) else aux1(1) = dot_product(vr(1:l_rows),vr(1:l_rows)) aux1(2) = 0. endif call mpi_allreduce(aux1,aux2,2,MPI_DOUBLE_COMPLEX,MPI_SUM,mpi_comm_rows,mpierr) vnorm2 = aux2(1) vrl = aux2(2) ! Householder transformation call hh_transform_complex(vrl, vnorm2, xf, tau(istep)) ! Scale vr and store Householder vector for back transformation vr(1:l_rows) = vr(1:l_rows) * xf if(my_prow==prow(istep-1, nblk, np_rows)) then vr(l_rows) = 1. e(istep-1) = vrl endif a(1:l_rows,l_cols+1) = vr(1:l_rows) ! store Householder vector for back transformation endif ! Broadcast the Householder vector (and tau) along columns if(my_pcol==pcol(istep, nblk, np_cols)) vr(l_rows+1) = tau(istep) call MPI_Bcast(vr,l_rows+1,MPI_DOUBLE_COMPLEX,pcol(istep, nblk, np_cols),mpi_comm_cols,mpierr) tau(istep) = vr(l_rows+1) ! Transpose Householder vector vr -> vc ! call elpa_transpose_vectors (vr, 2*ubound(vr,dim=1), mpi_comm_rows, & ! vc, 2*ubound(vc,dim=1), mpi_comm_cols, & ! 1, 2*(istep-1), 1, 2*nblk) call elpa_transpose_vectors_complex (vr, ubound(vr,dim=1), mpi_comm_rows, & vc, ubound(vc,dim=1), mpi_comm_cols, & 1, (istep-1), 1, nblk) ! Calculate u = (A + VU**T + UV**T)*v ! For cache efficiency, we use only the upper half of the matrix tiles for this, ! thus the result is partly in uc(:) and partly in ur(:) uc(1:l_cols) = 0 ur(1:l_rows) = 0 if(l_rows>0 .and. l_cols>0) then #ifdef WITH_OPENMP #ifdef HAVE_DETAILED_TIMINGS call timer%start("OpenMP parallel") #endif !$OMP PARALLEL PRIVATE(my_thread,n_threads,n_iter,i,lcs,lce,j,lrs,lre) my_thread = omp_get_thread_num() n_threads = omp_get_num_threads() n_iter = 0 uc_p(1:l_cols,my_thread) = 0. ur_p(1:l_rows,my_thread) = 0. #endif do i=0,(istep-2)/tile_size lcs = i*l_cols_tile+1 lce = min(l_cols,(i+1)*l_cols_tile) if(lce0) then call ZGEMV('C',l_rows,2*nstor,CONE,vur,ubound(vur,dim=1),vr,1,CZERO,aux,1) call ZGEMV('N',l_cols,2*nstor,CONE,uvc,ubound(uvc,dim=1),aux,1,CONE,uc,1) endif endif ! Sum up all ur(:) parts along rows and add them to the uc(:) parts ! on the processors containing the diagonal ! This is only necessary if ur has been calculated, i.e. if the ! global tile size is smaller than the global remaining matrix if(tile_size < istep-1) then call elpa_reduce_add_vectors_COMPLEX (ur, ubound(ur,dim=1), mpi_comm_rows, & uc, ubound(uc,dim=1), mpi_comm_cols, & (istep-1), 1, nblk) endif ! Sum up all the uc(:) parts, transpose uc -> ur if(l_cols>0) then tmp(1:l_cols) = uc(1:l_cols) call mpi_allreduce(tmp,uc,l_cols,MPI_DOUBLE_COMPLEX,MPI_SUM,mpi_comm_rows,mpierr) endif ! call elpa_transpose_vectors (uc, 2*ubound(uc,dim=1), mpi_comm_cols, & ! ur, 2*ubound(ur,dim=1), mpi_comm_rows, & ! 1, 2*(istep-1), 1, 2*nblk) call elpa_transpose_vectors_complex (uc, ubound(uc,dim=1), mpi_comm_cols, & ur, ubound(ur,dim=1), mpi_comm_rows, & 1, (istep-1), 1, nblk) ! calculate u**T * v (same as v**T * (A + VU**T + UV**T) * v ) xc = 0 if(l_cols>0) xc = dot_product(vc(1:l_cols),uc(1:l_cols)) call mpi_allreduce(xc,vav,1,MPI_DOUBLE_COMPLEX,MPI_SUM,mpi_comm_cols,mpierr) ! store u and v in the matrices U and V ! these matrices are stored combined in one here do j=1,l_rows vur(j,2*nstor+1) = conjg(tau(istep))*vr(j) vur(j,2*nstor+2) = 0.5*conjg(tau(istep))*vav*vr(j) - ur(j) enddo do j=1,l_cols uvc(j,2*nstor+1) = 0.5*conjg(tau(istep))*vav*vc(j) - uc(j) uvc(j,2*nstor+2) = conjg(tau(istep))*vc(j) enddo nstor = nstor+1 ! If the limit of max_stored_rows is reached, calculate A + VU**T + UV**T if(nstor==max_stored_rows .or. istep==3) then do i=0,(istep-2)/tile_size lcs = i*l_cols_tile+1 lce = min(l_cols,(i+1)*l_cols_tile) lrs = 1 lre = min(l_rows,(i+1)*l_rows_tile) if(lce0) a(l_rows,l_cols) = a(l_rows,l_cols) & + dot_product(vur(l_rows,1:2*nstor),uvc(l_cols,1:2*nstor)) d(istep-1) = a(l_rows,l_cols) endif enddo ! Store e(1) and d(1) if(my_pcol==pcol(2, nblk, np_cols)) then if(my_prow==prow(1, nblk, np_rows)) then ! We use last l_cols value of loop above vrl = a(1,l_cols) call hh_transform_complex(vrl, 0.d0, xf, tau(2)) e(1) = vrl a(1,l_cols) = 1. ! for consistency only endif call mpi_bcast(tau(2),1,MPI_DOUBLE_COMPLEX,prow(1, nblk, np_rows),mpi_comm_rows,mpierr) endif call mpi_bcast(tau(2),1,MPI_DOUBLE_COMPLEX,pcol(2, nblk, np_cols),mpi_comm_cols,mpierr) if(my_prow==prow(1, nblk, np_rows) .and. my_pcol==pcol(1, nblk, np_cols)) d(1) = a(1,1) deallocate(tmp, vr, ur, vc, uc, vur, uvc) ! distribute the arrays d and e to all processors allocate(tmpr(na)) tmpr = d call mpi_allreduce(tmpr,d,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) tmpr = d call mpi_allreduce(tmpr,d,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr) tmpr = e call mpi_allreduce(tmpr,e,na,MPI_REAL8,MPI_SUM,mpi_comm_rows,mpierr) tmpr = e call mpi_allreduce(tmpr,e,na,MPI_REAL8,MPI_SUM,mpi_comm_cols,mpierr) deallocate(tmpr) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("tridiag_complex") #endif end subroutine tridiag_complex !------------------------------------------------------------------------------- subroutine trans_ev_complex(na, nqc, a, lda, tau, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols) !------------------------------------------------------------------------------- ! trans_ev_complex: Transforms the eigenvectors of a tridiagonal matrix back ! to the eigenvectors of the original matrix ! (like Scalapack Routine PZUNMTR) ! ! Parameters ! ! na Order of matrix a, number of rows of matrix q ! ! nqc Number of columns of matrix q ! ! a(lda,matrixCols) Matrix containing the Householder vectors (i.e. matrix a after tridiag_complex) ! Distribution is like in Scalapack. ! ! lda Leading dimension of a ! ! tau(na) Factors of the Householder vectors ! ! q On input: Eigenvectors of tridiagonal matrix ! On output: Transformed eigenvectors ! Distribution is like in Scalapack. ! ! ldq Leading dimension of q ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer na, nqc, lda, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols complex*16 a(lda,matrixCols), q(ldq,matrixCols), tau(na) integer :: max_stored_rows complex*16, parameter :: CZERO = (0.d0,0.d0), CONE = (1.d0,0.d0) integer my_prow, my_pcol, np_rows, np_cols, mpierr integer totalblocks, max_blocks_row, max_blocks_col, max_local_rows, max_local_cols integer l_cols, l_rows, l_colh, nstor integer istep, i, n, nc, ic, ics, ice, nb, cur_pcol complex*16, allocatable:: tmp1(:), tmp2(:), hvb(:), hvm(:,:) complex*16, allocatable:: tmat(:,:), h1(:), h2(:) #ifdef HAVE_DETAILED_TIMINGS call timer%start("trans_ev_complex") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) totalblocks = (na-1)/nblk + 1 max_blocks_row = (totalblocks-1)/np_rows + 1 max_blocks_col = ((nqc-1)/nblk)/np_cols + 1 ! Columns of q! max_local_rows = max_blocks_row*nblk max_local_cols = max_blocks_col*nblk max_stored_rows = (63/nblk+1)*nblk allocate(tmat(max_stored_rows,max_stored_rows)) allocate(h1(max_stored_rows*max_stored_rows)) allocate(h2(max_stored_rows*max_stored_rows)) allocate(tmp1(max_local_cols*max_stored_rows)) allocate(tmp2(max_local_cols*max_stored_rows)) allocate(hvb(max_local_rows*nblk)) allocate(hvm(max_local_rows,max_stored_rows)) hvm = 0 ! Must be set to 0 !!! hvb = 0 ! Safety only l_cols = local_index(nqc, my_pcol, np_cols, nblk, -1) ! Local columns of q nstor = 0 ! In the complex case tau(2) /= 0 if(my_prow == prow(1, nblk, np_rows)) then q(1,1:l_cols) = q(1,1:l_cols)*((1.d0,0.d0)-tau(2)) endif do istep=1,na,nblk ics = MAX(istep,3) ice = MIN(istep+nblk-1,na) if(ice0) & call MPI_Bcast(hvb,nb,MPI_DOUBLE_COMPLEX,cur_pcol,mpi_comm_cols,mpierr) nb = 0 do ic=ics,ice l_rows = local_index(ic-1, my_prow, np_rows, nblk, -1) ! # rows of Householder vector hvm(1:l_rows,nstor+1) = hvb(nb+1:nb+l_rows) nstor = nstor+1 nb = nb+l_rows enddo ! Please note: for smaller matix sizes (na/np_rows<=256), a value of 32 for nstor is enough! if(nstor+nblk>max_stored_rows .or. istep+nblk>na .or. (na/np_rows<=256 .and. nstor>=32)) then ! Calculate scalar products of stored vectors. ! This can be done in different ways, we use zherk tmat = 0 if(l_rows>0) & call zherk('U','C',nstor,l_rows,CONE,hvm,ubound(hvm,dim=1),CZERO,tmat,max_stored_rows) nc = 0 do n=1,nstor-1 h1(nc+1:nc+n) = tmat(1:n,n+1) nc = nc+n enddo if(nc>0) call mpi_allreduce(h1,h2,nc,MPI_DOUBLE_COMPLEX,MPI_SUM,mpi_comm_rows,mpierr) ! Calculate triangular matrix T nc = 0 tmat(1,1) = tau(ice-nstor+1) do n=1,nstor-1 call ztrmv('L','C','N',n,tmat,max_stored_rows,h2(nc+1),1) tmat(n+1,1:n) = -conjg(h2(nc+1:nc+n))*tau(ice-nstor+n+1) tmat(n+1,n+1) = tau(ice-nstor+n+1) nc = nc+n enddo ! Q = Q - V * T * V**T * Q if(l_rows>0) then call zgemm('C','N',nstor,l_cols,l_rows,CONE,hvm,ubound(hvm,dim=1), & q,ldq,CZERO,tmp1,nstor) else tmp1(1:l_cols*nstor) = 0 endif call mpi_allreduce(tmp1,tmp2,nstor*l_cols,MPI_DOUBLE_COMPLEX,MPI_SUM,mpi_comm_rows,mpierr) if(l_rows>0) then call ztrmm('L','L','N','N',nstor,l_cols,CONE,tmat,max_stored_rows,tmp2,nstor) call zgemm('N','N',l_rows,l_cols,nstor,-CONE,hvm,ubound(hvm,dim=1), & tmp2,nstor,CONE,q,ldq) endif nstor = 0 endif enddo deallocate(tmat, h1, h2, tmp1, tmp2, hvb, hvm) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("trans_ev_complex") #endif end subroutine trans_ev_complex !------------------------------------------------------------------------------- subroutine mult_ah_b_complex(uplo_a, uplo_c, na, ncb, a, lda, b, ldb, nblk, mpi_comm_rows, mpi_comm_cols, c, ldc) !------------------------------------------------------------------------------- ! mult_ah_b_complex: Performs C := A**H * B ! ! where: A is a square matrix (na,na) which is optionally upper or lower triangular ! B is a (na,ncb) matrix ! C is a (na,ncb) matrix where optionally only the upper or lower ! triangle may be computed ! ! Parameters ! ! uplo_a 'U' if A is upper triangular ! 'L' if A is lower triangular ! anything else if A is a full matrix ! Please note: This pertains to the original A (as set in the calling program) ! whereas the transpose of A is used for calculations ! If uplo_a is 'U' or 'L', the other triangle is not used at all, ! i.e. it may contain arbitrary numbers ! ! uplo_c 'U' if only the upper diagonal part of C is needed ! 'L' if only the upper diagonal part of C is needed ! anything else if the full matrix C is needed ! Please note: Even when uplo_c is 'U' or 'L', the other triangle may be ! written to a certain extent, i.e. one shouldn't rely on the content there! ! ! na Number of rows/columns of A, number of rows of B and C ! ! ncb Number of columns of B and C ! ! a Matrix A ! ! lda Leading dimension of a ! ! b Matrix B ! ! ldb Leading dimension of b ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! ! c Matrix C ! ! ldc Leading dimension of c ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none character*1 uplo_a, uplo_c integer na, ncb, lda, ldb, nblk, mpi_comm_rows, mpi_comm_cols, ldc complex*16 a(lda,*), b(ldb,*), c(ldc,*) integer my_prow, my_pcol, np_rows, np_cols, mpierr integer l_cols, l_rows, l_rows_np integer np, n, nb, nblk_mult, lrs, lre, lcs, lce integer gcol_min, gcol, goff integer nstor, nr_done, noff, np_bc, n_aux_bc, nvals integer, allocatable :: lrs_save(:), lre_save(:) logical a_lower, a_upper, c_lower, c_upper complex*16, allocatable:: aux_mat(:,:), aux_bc(:), tmp1(:,:), tmp2(:,:) #ifdef HAVE_DETAILED_TIMINGS call timer%start("mult_ah_b_complex") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a and b l_cols = local_index(ncb, my_pcol, np_cols, nblk, -1) ! Local cols of b ! Block factor for matrix multiplications, must be a multiple of nblk if(na/np_rows<=256) then nblk_mult = (31/nblk+1)*nblk else nblk_mult = (63/nblk+1)*nblk endif allocate(aux_mat(l_rows,nblk_mult)) allocate(aux_bc(l_rows*nblk)) allocate(lrs_save(nblk)) allocate(lre_save(nblk)) a_lower = .false. a_upper = .false. c_lower = .false. c_upper = .false. if(uplo_a=='u' .or. uplo_a=='U') a_upper = .true. if(uplo_a=='l' .or. uplo_a=='L') a_lower = .true. if(uplo_c=='u' .or. uplo_c=='U') c_upper = .true. if(uplo_c=='l' .or. uplo_c=='L') c_lower = .true. ! Build up the result matrix by processor rows do np = 0, np_rows-1 ! In this turn, procs of row np assemble the result l_rows_np = local_index(na, np, np_rows, nblk, -1) ! local rows on receiving processors nr_done = 0 ! Number of rows done aux_mat = 0 nstor = 0 ! Number of columns stored in aux_mat ! Loop over the blocks on row np do nb=0,(l_rows_np-1)/nblk goff = nb*np_rows + np ! Global offset in blocks corresponding to nb ! Get the processor column which owns this block (A is transposed, so we need the column) ! and the offset in blocks within this column. ! The corresponding block column in A is then broadcast to all for multiplication with B np_bc = MOD(goff,np_cols) noff = goff/np_cols n_aux_bc = 0 ! Gather up the complete block column of A on the owner do n = 1, min(l_rows_np-nb*nblk,nblk) ! Loop over columns to be broadcast gcol = goff*nblk + n ! global column corresponding to n if(nstor==0 .and. n==1) gcol_min = gcol lrs = 1 ! 1st local row number for broadcast lre = l_rows ! last local row number for broadcast if(a_lower) lrs = local_index(gcol, my_prow, np_rows, nblk, +1) if(a_upper) lre = local_index(gcol, my_prow, np_rows, nblk, -1) if(lrs<=lre) then nvals = lre-lrs+1 if(my_pcol == np_bc) aux_bc(n_aux_bc+1:n_aux_bc+nvals) = a(lrs:lre,noff*nblk+n) n_aux_bc = n_aux_bc + nvals endif lrs_save(n) = lrs lre_save(n) = lre enddo ! Broadcast block column call MPI_Bcast(aux_bc,n_aux_bc,MPI_DOUBLE_COMPLEX,np_bc,mpi_comm_cols,mpierr) ! Insert what we got in aux_mat n_aux_bc = 0 do n = 1, min(l_rows_np-nb*nblk,nblk) nstor = nstor+1 lrs = lrs_save(n) lre = lre_save(n) if(lrs<=lre) then nvals = lre-lrs+1 aux_mat(lrs:lre,nstor) = aux_bc(n_aux_bc+1:n_aux_bc+nvals) n_aux_bc = n_aux_bc + nvals endif enddo ! If we got nblk_mult columns in aux_mat or this is the last block ! do the matrix multiplication if(nstor==nblk_mult .or. nb*nblk+nblk >= l_rows_np) then lrs = 1 ! 1st local row number for multiply lre = l_rows ! last local row number for multiply if(a_lower) lrs = local_index(gcol_min, my_prow, np_rows, nblk, +1) if(a_upper) lre = local_index(gcol, my_prow, np_rows, nblk, -1) lcs = 1 ! 1st local col number for multiply lce = l_cols ! last local col number for multiply if(c_upper) lcs = local_index(gcol_min, my_pcol, np_cols, nblk, +1) if(c_lower) lce = MIN(local_index(gcol, my_pcol, np_cols, nblk, -1),l_cols) if(lcs<=lce) then allocate(tmp1(nstor,lcs:lce),tmp2(nstor,lcs:lce)) if(lrs<=lre) then call zgemm('C','N',nstor,lce-lcs+1,lre-lrs+1,(1.d0,0.d0),aux_mat(lrs,1),ubound(aux_mat,dim=1), & b(lrs,lcs),ldb,(0.d0,0.d0),tmp1,nstor) else tmp1 = 0 endif ! Sum up the results and send to processor row np call mpi_reduce(tmp1,tmp2,nstor*(lce-lcs+1),MPI_DOUBLE_COMPLEX,MPI_SUM,np,mpi_comm_rows,mpierr) ! Put the result into C if(my_prow==np) c(nr_done+1:nr_done+nstor,lcs:lce) = tmp2(1:nstor,lcs:lce) deallocate(tmp1,tmp2) endif nr_done = nr_done+nstor nstor=0 aux_mat(:,:)=0 endif enddo enddo deallocate(aux_mat, aux_bc, lrs_save, lre_save) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("mult_ah_b_complex") #endif end subroutine mult_ah_b_complex !------------------------------------------------------------------------------- subroutine solve_tridi( na, nev, d, e, q, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, wantDebug, success ) #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer na, nev, ldq, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols real*8 d(na), e(na), q(ldq,matrixCols) integer i, j, n, np, nc, nev1, l_cols, l_rows integer my_prow, my_pcol, np_rows, np_cols, mpierr integer, allocatable :: limits(:), l_col(:), p_col(:), l_col_bc(:), p_col_bc(:) logical, intent(in) :: wantDebug logical, intent(out) :: success #ifdef HAVE_DETAILED_TIMINGS call timer%start("solve_tridi") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) success = .true. l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a and q l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local columns of q ! Set Q to 0 q(1:l_rows, 1:l_cols) = 0. ! Get the limits of the subdivisons, each subdivison has as many cols ! as fit on the respective processor column allocate(limits(0:np_cols)) limits(0) = 0 do np=0,np_cols-1 nc = local_index(na, np, np_cols, nblk, -1) ! number of columns on proc column np ! Check for the case that a column has have zero width. ! This is not supported! ! Scalapack supports it but delivers no results for these columns, ! which is rather annoying if (nc==0) then #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_tridi") #endif if (wantDebug) write(error_unit,*) 'ELPA1_solve_tridi: ERROR: Problem contains processor column with zero width' success = .false. return endif limits(np+1) = limits(np) + nc enddo ! Subdivide matrix by subtracting rank 1 modifications do i=1,np_cols-1 n = limits(i) d(n) = d(n)-abs(e(n)) d(n+1) = d(n+1)-abs(e(n)) enddo ! Solve sub problems on processsor columns nc = limits(my_pcol) ! column after which my problem starts if(np_cols>1) then nev1 = l_cols ! all eigenvectors are needed else nev1 = MIN(nev,l_cols) endif call solve_tridi_col(l_cols, nev1, nc, d(nc+1), e(nc+1), q, ldq, nblk, & matrixCols, mpi_comm_rows, wantDebug, success) if (.not.(success)) then #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_tridi") #endif return endif ! If there is only 1 processor column, we are done if(np_cols==1) then deallocate(limits) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_tridi") #endif return endif ! Set index arrays for Q columns ! Dense distribution scheme: allocate(l_col(na)) allocate(p_col(na)) n = 0 do np=0,np_cols-1 nc = local_index(na, np, np_cols, nblk, -1) do i=1,nc n = n+1 l_col(n) = i p_col(n) = np enddo enddo ! Block cyclic distribution scheme, only nev columns are set: allocate(l_col_bc(na)) allocate(p_col_bc(na)) p_col_bc(:) = -1 l_col_bc(:) = -1 do i = 0, na-1, nblk*np_cols do j = 0, np_cols-1 do n = 1, nblk if(i+j*nblk+n <= MIN(nev,na)) then p_col_bc(i+j*nblk+n) = j l_col_bc(i+j*nblk+n) = i/np_cols + n endif enddo enddo enddo ! Recursively merge sub problems call merge_recursive(0, np_cols, wantDebug, success) if (.not.(success)) then #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_tridi") #endif return endif deallocate(limits,l_col,p_col,l_col_bc,p_col_bc) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_tridi") #endif return contains recursive subroutine merge_recursive(np_off, nprocs, wantDebug, success) implicit none ! noff is always a multiple of nblk_ev ! nlen-noff is always > nblk_ev integer :: np_off, nprocs integer :: np1, np2, noff, nlen, nmid, n, & mpi_status(mpi_status_size) logical, intent(in) :: wantDebug logical, intent(out) :: success success = .true. if (nprocs<=1) then ! Safety check only if (wantDebug) write(error_unit,*) "ELPA1_merge_recursive: INTERNAL error merge_recursive: nprocs=",nprocs success = .false. return endif ! Split problem into 2 subproblems of size np1 / np2 np1 = nprocs/2 np2 = nprocs-np1 if(np1 > 1) call merge_recursive(np_off, np1, wantDebug, success) if (.not.(success)) return if(np2 > 1) call merge_recursive(np_off+np1, np2, wantDebug, success) if (.not.(success)) return noff = limits(np_off) nmid = limits(np_off+np1) - noff nlen = limits(np_off+nprocs) - noff if(my_pcol==np_off) then do n=np_off+np1,np_off+nprocs-1 call mpi_send(d(noff+1),nmid,MPI_REAL8,n,1,mpi_comm_cols,mpierr) enddo endif if(my_pcol>=np_off+np1 .and. my_pcol=np_off .and. my_pcol2*min_submatrix_size) n = ((n+3)/4)*2 ! the bigger one of the two halves, we want EVEN boundaries ndiv = ndiv*2 enddo ! If there is only 1 processor row and not all eigenvectors are needed ! and the matrix size is big enough, then use 2 subdivisions ! so that merge_systems is called once and only the needed ! eigenvectors are calculated for the final problem. if(np_rows==1 .and. nev2*min_submatrix_size) ndiv = 2 allocate(limits(0:ndiv)) limits(0) = 0 limits(ndiv) = na n = ndiv do while(n>1) n = n/2 ! n is always a power of 2 do i=0,ndiv-1,2*n ! We want to have even boundaries (for cache line alignments) limits(i+n) = limits(i) + ((limits(i+2*n)-limits(i)+3)/4)*2 enddo enddo ! Calculate the maximum size of a subproblem max_size = 0 do i=1,ndiv max_size = MAX(max_size,limits(i)-limits(i-1)) enddo ! Subdivide matrix by subtracting rank 1 modifications do i=1,ndiv-1 n = limits(i) d(n) = d(n)-abs(e(n)) d(n+1) = d(n+1)-abs(e(n)) enddo if(np_rows==1) then ! For 1 processor row there may be 1 or 2 subdivisions do n=0,ndiv-1 noff = limits(n) ! Start of subproblem nlen = limits(n+1)-noff ! Size of subproblem call solve_tridi_single(nlen,d(noff+1),e(noff+1), & q(nqoff+noff+1,noff+1),ubound(q,dim=1), wantDebug, success) if (.not.(success)) return enddo else ! Solve sub problems in parallel with solve_tridi_single ! There is at maximum 1 subproblem per processor allocate(qmat1(max_size,max_size)) allocate(qmat2(max_size,max_size)) qmat1 = 0 ! Make sure that all elements are defined if(my_prow < ndiv) then noff = limits(my_prow) ! Start of subproblem nlen = limits(my_prow+1)-noff ! Size of subproblem call solve_tridi_single(nlen,d(noff+1),e(noff+1),qmat1, & ubound(qmat1,dim=1), wantDebug, success) if (.not.(success)) return endif ! Fill eigenvectors in qmat1 into global matrix q do np = 0, ndiv-1 noff = limits(np) nlen = limits(np+1)-noff call MPI_Bcast(d(noff+1),nlen,MPI_REAL8,np,mpi_comm_rows,mpierr) qmat2 = qmat1 call MPI_Bcast(qmat2,max_size*max_size,MPI_REAL8,np,mpi_comm_rows,mpierr) do i=1,nlen call distribute_global_column(qmat2(1,i), q(1,noff+i), nqoff+noff, nlen, my_prow, np_rows, nblk) enddo enddo deallocate(qmat1, qmat2) endif ! Allocate and set index arrays l_col and p_col allocate(l_col(na), p_col_i(na), p_col_o(na)) do i=1,na l_col(i) = i p_col_i(i) = 0 p_col_o(i) = 0 enddo ! Merge subproblems n = 1 do while(n 1d-14) then write(error_unit,'(a,i8,2g25.16)') '***WARNING: Monotony error dste**:',i+1,d(i),d(i+1) else write(error_unit,'(a,i8,2g25.16)') 'Info: Monotony error dste{dc,qr}:',i+1,d(i),d(i+1) write(error_unit,'(a)') 'The eigenvalues from a lapack call are not sorted to machine precision.' write(error_unit,'(a)') 'In this extent, this is completely harmless.' write(error_unit,'(a)') 'Still, we keep this info message just in case.' end if allocate(qtmp(nlen)) dtmp = d(i+1) qtmp(1:nlen) = q(1:nlen,i+1) do j=i,1,-1 if(dtmp=npc_0+npc_n) return ! Determine number of "next" and "prev" column for ring sends if(my_pcol == npc_0+npc_n-1) then np_next = npc_0 else np_next = my_pcol + 1 endif if(my_pcol == npc_0) then np_prev = npc_0+npc_n-1 else np_prev = my_pcol - 1 endif call check_monotony(nm,d,'Input1',wantDebug, success) if (.not.(success)) return call check_monotony(na-nm,d(nm+1),'Input2',wantDebug, success) if (.not.(success)) return ! Get global number of processors and my processor number. ! Please note that my_proc does not need to match any real processor number, ! it is just used for load balancing some loops. n_procs = np_rows*npc_n my_proc = my_prow*npc_n + (my_pcol-npc_0) ! Row major ! Local limits of the rows of Q l_rqs = local_index(nqoff+1 , my_prow, np_rows, nblk, +1) ! First row of Q l_rqm = local_index(nqoff+nm, my_prow, np_rows, nblk, -1) ! Last row <= nm l_rqe = local_index(nqoff+na, my_prow, np_rows, nblk, -1) ! Last row of Q l_rnm = l_rqm-l_rqs+1 ! Number of local rows <= nm l_rows = l_rqe-l_rqs+1 ! Total number of local rows ! My number of local columns l_cols = COUNT(p_col(1:na)==my_pcol) ! Get max number of local columns max_local_cols = 0 do np = npc_0, npc_0+npc_n-1 max_local_cols = MAX(max_local_cols,COUNT(p_col(1:na)==np)) enddo ! Calculations start here beta = abs(e) sig = sign(1.d0,e) ! Calculate rank-1 modifier z z(:) = 0 if(MOD((nqoff+nm-1)/nblk,np_rows)==my_prow) then ! nm is local on my row do i = 1, na if(p_col(i)==my_pcol) z(i) = q(l_rqm,l_col(i)) enddo endif if(MOD((nqoff+nm)/nblk,np_rows)==my_prow) then ! nm+1 is local on my row do i = 1, na if(p_col(i)==my_pcol) z(i) = z(i) + sig*q(l_rqm+1,l_col(i)) enddo endif call global_gather(z, na) ! Normalize z so that norm(z) = 1. Since z is the concatenation of ! two normalized vectors, norm2(z) = sqrt(2). z = z/sqrt(2.0d0) rho = 2.*beta ! Calculate index for merging both systems by ascending eigenvalues call DLAMRG( nm, na-nm, d, 1, 1, idx ) ! Calculate the allowable deflation tolerance zmax = maxval(abs(z)) dmax = maxval(abs(d)) EPS = DLAMCH( 'Epsilon' ) TOL = 8.*EPS*MAX(dmax,zmax) ! If the rank-1 modifier is small enough, no more needs to be done ! except to reorganize D and Q IF( RHO*zmax <= TOL ) THEN ! Rearrange eigenvalues tmp = d do i=1,na d(i) = tmp(idx(i)) enddo ! Rearrange eigenvectors call resort_ev(idx, na) #ifdef HAVE_DETAILED_TIMINGS call timer%stop("merge_systems") #endif return ENDIF ! Merge and deflate system na1 = 0 na2 = 0 ! COLTYP: ! 1 : non-zero in the upper half only; ! 2 : dense; ! 3 : non-zero in the lower half only; ! 4 : deflated. coltyp(1:nm) = 1 coltyp(nm+1:na) = 3 do i=1,na if(rho*abs(z(idx(i))) <= tol) then ! Deflate due to small z component. na2 = na2+1 d2(na2) = d(idx(i)) idx2(na2) = idx(i) coltyp(idx(i)) = 4 else if(na1>0) then ! Check if eigenvalues are close enough to allow deflation. S = Z(idx(i)) C = Z1(na1) ! Find sqrt(a**2+b**2) without overflow or ! destructive underflow. TAU = DLAPY2( C, S ) T = D1(na1) - D(idx(i)) C = C / TAU S = -S / TAU IF( ABS( T*C*S ) <= TOL ) THEN ! Deflation is possible. na2 = na2+1 Z1(na1) = TAU d2new = D(idx(i))*C**2 + D1(na1)*S**2 d1new = D(idx(i))*S**2 + D1(na1)*C**2 ! D(idx(i)) >= D1(na1) and C**2 + S**2 == 1.0 ! This means that after the above transformation it must be ! D1(na1) <= d1new <= D(idx(i)) ! D1(na1) <= d2new <= D(idx(i)) ! ! D1(na1) may get bigger but it is still smaller than the next D(idx(i+1)) ! so there is no problem with sorting here. ! d2new <= D(idx(i)) which means that it might be smaller than D2(na2-1) ! which makes a check (and possibly a resort) necessary. ! ! The above relations may not hold exactly due to numeric differences ! so they have to be enforced in order not to get troubles with sorting. if(d1newD(idx(i))) d1new = D(idx(i)) if(d2newD(idx(i))) d2new = D(idx(i)) D1(na1) = d1new do j=na2-1,1,-1 if(d2new2) then ! Solve secular equation z(1:na1) = 1 #ifdef WITH_OPENMP z_p(1:na1,:) = 1 #endif dbase(1:na1) = 0 ddiff(1:na1) = 0 info = 0 #ifdef WITH_OPENMP #ifdef HAVE_DETAILED_TIMINGS call timer%start("OpenMP parallel") #endif !$OMP PARALLEL PRIVATE(i,my_thread,delta,s,info,j) my_thread = omp_get_thread_num() !$OMP DO #endif DO i = my_proc+1, na1, n_procs ! work distributed over all processors call DLAED4(na1, i, d1, z1, delta, rho, s, info) ! s is not used! if(info/=0) then ! If DLAED4 fails (may happen especially for LAPACK versions before 3.2) ! use the more stable bisection algorithm in solve_secular_equation ! print *,'ERROR DLAED4 n=',na1,'i=',i,' Using Bisection' call solve_secular_equation(na1, i, d1, z1, delta, rho, s) endif ! Compute updated z #ifdef WITH_OPENMP do j=1,na1 if(i/=j) z_p(j,my_thread) = z_p(j,my_thread)*( delta(j) / (d1(j)-d1(i)) ) enddo z_p(i,my_thread) = z_p(i,my_thread)*delta(i) #else do j=1,na1 if(i/=j) z(j) = z(j)*( delta(j) / (d1(j)-d1(i)) ) enddo z(i) = z(i)*delta(i) #endif ! store dbase/ddiff if(i1) then if(np_rem==npc_0) then np_rem = npc_0+npc_n-1 else np_rem = np_rem-1 endif call MPI_Sendrecv_replace(qtmp1, l_rows*max_local_cols, MPI_REAL8, & np_next, 1111, np_prev, 1111, & mpi_comm_cols, mpi_status, mpierr) endif ! Gather the parts in d1 and z which are fitting to qtmp1. ! This also delivers nnzu/nnzl for proc np_rem nnzu = 0 nnzl = 0 do i=1,na1 if(p_col(idx1(i))==np_rem) then if(coltyp(idx1(i))==1 .or. coltyp(idx1(i))==2) then nnzu = nnzu+1 d1u(nnzu) = d1(i) zu (nnzu) = z (i) endif if(coltyp(idx1(i))==3 .or. coltyp(idx1(i))==2) then nnzl = nnzl+1 d1l(nnzl) = d1(i) zl (nnzl) = z (i) endif endif enddo ! Set the deflated eigenvectors in Q (comming from proc np_rem) ndef = MAX(nnzu,nnzl) ! Remote counter in input matrix do i = 1, na j = idx(i) if(j>na1) then if(p_col(idx2(j-na1))==np_rem) then ndef = ndef+1 if(p_col_out(i)==my_pcol) & q(l_rqs:l_rqe,l_col_out(i)) = qtmp1(1:l_rows,ndef) endif endif enddo do ns = 0, nqcols1-1, max_strip ! strimining loop ncnt = MIN(max_strip,nqcols1-ns) ! number of columns in this strip ! Get partial result from (output) Q do i = 1, ncnt qtmp2(1:l_rows,i) = q(l_rqs:l_rqe,l_col_out(idxq1(i+ns))) enddo ! Compute eigenvectors of the rank-1 modified matrix. ! Parts for multiplying with upper half of Q: do i = 1, ncnt j = idx(idxq1(i+ns)) ! Calculate the j-th eigenvector of the deflated system ! See above why we are doing it this way! tmp(1:nnzu) = d1u(1:nnzu)-dbase(j) call v_add_s(tmp,nnzu,ddiff(j)) ev(1:nnzu,i) = zu(1:nnzu) / tmp(1:nnzu) * ev_scale(j) enddo ! Multiply old Q with eigenvectors (upper half) if(l_rnm>0 .and. ncnt>0 .and. nnzu>0) & call dgemm('N','N',l_rnm,ncnt,nnzu,1.d0,qtmp1,ubound(qtmp1,dim=1),ev,ubound(ev,dim=1), & 1.d0,qtmp2(1,1),ubound(qtmp2,dim=1)) ! Compute eigenvectors of the rank-1 modified matrix. ! Parts for multiplying with lower half of Q: do i = 1, ncnt j = idx(idxq1(i+ns)) ! Calculate the j-th eigenvector of the deflated system ! See above why we are doing it this way! tmp(1:nnzl) = d1l(1:nnzl)-dbase(j) call v_add_s(tmp,nnzl,ddiff(j)) ev(1:nnzl,i) = zl(1:nnzl) / tmp(1:nnzl) * ev_scale(j) enddo ! Multiply old Q with eigenvectors (lower half) if(l_rows-l_rnm>0 .and. ncnt>0 .and. nnzl>0) & call dgemm('N','N',l_rows-l_rnm,ncnt,nnzl,1.d0,qtmp1(l_rnm+1,1),ubound(qtmp1,dim=1),ev,ubound(ev,dim=1), & 1.d0,qtmp2(l_rnm+1,1),ubound(qtmp2,dim=1)) ! Put partial result into (output) Q do i = 1, ncnt q(l_rqs:l_rqe,l_col_out(idxq1(i+ns))) = qtmp2(1:l_rows,i) enddo enddo enddo deallocate(ev, qtmp1, qtmp2) endif #ifdef HAVE_DETAILED_TIMINGS call timer%stop("merge_systems") #endif return !------------------------------------------------------------------------------- contains subroutine add_tmp(d1, dbase, ddiff, z, ev_scale_value, na1,i) implicit none integer, intent(in) :: na1, i real*8, intent(in) :: d1(:), dbase(:), ddiff(:), z(:) real*8, intent(inout) :: ev_scale_value real*8 :: tmp(1:na1) ! tmp(1:na1) = z(1:na1) / delta(1:na1,i) ! original code ! tmp(1:na1) = z(1:na1) / (d1(1:na1)-d(i))! bad results ! All we want to calculate is tmp = (d1(1:na1)-dbase(i))+ddiff(i) ! in exactly this order, but we want to prevent compiler optimization tmp(1:na1) = d1(1:na1) -dbase(i) call v_add_s(tmp(1:na1),na1,ddiff(i)) tmp(1:na1) = z(1:na1) / tmp(1:na1) ev_scale_value = 1.0/sqrt(dot_product(tmp(1:na1),tmp(1:na1))) end subroutine add_tmp subroutine resort_ev(idx_ev, nLength) implicit none integer, intent(in) :: nLength integer :: idx_ev(nLength) integer :: i, nc, pc1, pc2, lc1, lc2, l_cols_out real*8, allocatable :: qtmp(:,:) if(l_rows==0) return ! My processor column has no work to do ! Resorts eigenvectors so that q_new(:,i) = q_old(:,idx_ev(i)) l_cols_out = COUNT(p_col_out(1:na)==my_pcol) allocate(qtmp(l_rows,l_cols_out)) nc = 0 do i=1,na pc1 = p_col(idx_ev(i)) lc1 = l_col(idx_ev(i)) pc2 = p_col_out(i) if(pc2<0) cycle ! This column is not needed in output if(pc2==my_pcol) nc = nc+1 ! Counter for output columns if(pc1==my_pcol) then if(pc2==my_pcol) then ! send and recieve column are local qtmp(1:l_rows,nc) = q(l_rqs:l_rqe,lc1) else call mpi_send(q(l_rqs,lc1),l_rows,MPI_REAL8,pc2,mod(i,4096),mpi_comm_cols,mpierr) endif else if(pc2==my_pcol) then call mpi_recv(qtmp(1,nc),l_rows,MPI_REAL8,pc1,mod(i,4096),mpi_comm_cols,mpi_status,mpierr) endif enddo ! Insert qtmp into (output) q nc = 0 do i=1,na pc2 = p_col_out(i) lc2 = l_col_out(i) if(pc2==my_pcol) then nc = nc+1 q(l_rqs:l_rqe,lc2) = qtmp(1:l_rows,nc) endif enddo deallocate(qtmp) end subroutine resort_ev subroutine transform_columns(col1, col2) implicit none integer col1, col2 integer pc1, pc2, lc1, lc2 if(l_rows==0) return ! My processor column has no work to do pc1 = p_col(col1) lc1 = l_col(col1) pc2 = p_col(col2) lc2 = l_col(col2) if(pc1==my_pcol) then if(pc2==my_pcol) then ! both columns are local tmp(1:l_rows) = q(l_rqs:l_rqe,lc1)*qtrans(1,1) + q(l_rqs:l_rqe,lc2)*qtrans(2,1) q(l_rqs:l_rqe,lc2) = q(l_rqs:l_rqe,lc1)*qtrans(1,2) + q(l_rqs:l_rqe,lc2)*qtrans(2,2) q(l_rqs:l_rqe,lc1) = tmp(1:l_rows) else call mpi_sendrecv(q(l_rqs,lc1),l_rows,MPI_REAL8,pc2,1, & tmp,l_rows,MPI_REAL8,pc2,1, & mpi_comm_cols,mpi_status,mpierr) q(l_rqs:l_rqe,lc1) = q(l_rqs:l_rqe,lc1)*qtrans(1,1) + tmp(1:l_rows)*qtrans(2,1) endif else if(pc2==my_pcol) then call mpi_sendrecv(q(l_rqs,lc2),l_rows,MPI_REAL8,pc1,1, & tmp,l_rows,MPI_REAL8,pc1,1, & mpi_comm_cols,mpi_status,mpierr) q(l_rqs:l_rqe,lc2) = tmp(1:l_rows)*qtrans(1,2) + q(l_rqs:l_rqe,lc2)*qtrans(2,2) endif end subroutine transform_columns subroutine global_gather(z, n) ! This routine sums up z over all processors. ! It should only be used for gathering distributed results, ! i.e. z(i) should be nonzero on exactly 1 processor column, ! otherways the results may be numerically different on different columns implicit none integer n real*8 z(n) real*8 tmp(n) if(npc_n==1 .and. np_rows==1) return ! nothing to do ! Do an mpi_allreduce over processor rows call mpi_allreduce(z, tmp, n, MPI_REAL8, MPI_SUM, mpi_comm_rows, mpierr) ! If only 1 processor column, we are done if(npc_n==1) then z(:) = tmp(:) return endif ! If all processor columns are involved, we can use mpi_allreduce if(npc_n==np_cols) then call mpi_allreduce(tmp, z, n, MPI_REAL8, MPI_SUM, mpi_comm_cols, mpierr) return endif ! Do a ring send over processor columns z(:) = 0 do np = 1, npc_n z(:) = z(:) + tmp(:) call MPI_Sendrecv_replace(z, n, MPI_REAL8, np_next, 1111, np_prev, 1111, & mpi_comm_cols, mpi_status, mpierr) enddo end subroutine global_gather subroutine global_product(z, n) ! This routine calculates the global product of z. implicit none integer n real*8 z(n) real*8 tmp(n) if(npc_n==1 .and. np_rows==1) return ! nothing to do ! Do an mpi_allreduce over processor rows call mpi_allreduce(z, tmp, n, MPI_REAL8, MPI_PROD, mpi_comm_rows, mpierr) ! If only 1 processor column, we are done if(npc_n==1) then z(:) = tmp(:) return endif ! If all processor columns are involved, we can use mpi_allreduce if(npc_n==np_cols) then call mpi_allreduce(tmp, z, n, MPI_REAL8, MPI_PROD, mpi_comm_cols, mpierr) return endif ! We send all vectors to the first proc, do the product there ! and redistribute the result. if(my_pcol == npc_0) then z(1:n) = tmp(1:n) do np = npc_0+1, npc_0+npc_n-1 call mpi_recv(tmp,n,MPI_REAL8,np,1111,mpi_comm_cols,mpi_status,mpierr) z(1:n) = z(1:n)*tmp(1:n) enddo do np = npc_0+1, npc_0+npc_n-1 call mpi_send(z,n,MPI_REAL8,np,1111,mpi_comm_cols,mpierr) enddo else call mpi_send(tmp,n,MPI_REAL8,npc_0,1111,mpi_comm_cols,mpierr) call mpi_recv(z ,n,MPI_REAL8,npc_0,1111,mpi_comm_cols,mpi_status,mpierr) endif end subroutine global_product subroutine check_monotony(n,d,text, wantDebug, success) ! This is a test routine for checking if the eigenvalues are monotonically increasing. ! It is for debug purposes only, an error should never be triggered! implicit none integer :: n real*8 :: d(n) character*(*) :: text integer :: i logical, intent(in) :: wantDebug logical, intent(out) :: success success = .true. do i=1,n-1 if(d(i+1) 0 and d(i+1) > d(i) ! ! but this routine will not terminate with error if these are not satisfied ! (it will normally converge to a pole in this case). ! ! The output in DELTA(j) is always (D(j) - lambda_I), even for the cases ! N=1 and N=2 which is not compatible with DLAED4. ! Thus this routine shouldn't be used for these cases as a simple replacement ! of DLAED4. ! ! The arguments are the same as in DLAED4 (with the exception of the INFO argument): ! ! ! N (input) INTEGER ! The length of all arrays. ! ! I (input) INTEGER ! The index of the eigenvalue to be computed. 1 <= I <= N. ! ! D (input) DOUBLE PRECISION array, dimension (N) ! The original eigenvalues. It is assumed that they are in ! order, D(I) < D(J) for I < J. ! ! Z (input) DOUBLE PRECISION array, dimension (N) ! The components of the updating vector. ! ! DELTA (output) DOUBLE PRECISION array, dimension (N) ! DELTA contains (D(j) - lambda_I) in its j-th component. ! See remark above about DLAED4 compatibility! ! ! RHO (input) DOUBLE PRECISION ! The scalar in the symmetric updating formula. ! ! DLAM (output) DOUBLE PRECISION ! The computed lambda_I, the I-th updated eigenvalue. !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer n, i real*8 d(n), z(n), delta(n), rho, dlam integer iter real*8 a, b, x, y, dshift ! In order to obtain sufficient numerical accuracy we have to shift the problem ! either by d(i) or d(i+1), whichever is closer to the solution ! Upper and lower bound of the shifted solution interval are a and b #ifdef HAVE_DETAILED_TIMINGS call timer%start("solve_secular_equation") #endif if(i==n) then ! Special case: Last eigenvalue ! We shift always by d(n), lower bound is d(n), ! upper bound is determined by a guess: dshift = d(n) delta(:) = d(:) - dshift a = 0. ! delta(n) b = rho*SUM(z(:)**2) + 1. ! rho*SUM(z(:)**2) is the lower bound for the guess else ! Other eigenvalues: lower bound is d(i), upper bound is d(i+1) ! We check the sign of the function in the midpoint of the interval ! in order to determine if eigenvalue is more close to d(i) or d(i+1) x = 0.5*(d(i)+d(i+1)) y = 1. + rho*SUM(z(:)**2/(d(:)-x)) if(y>0) then ! solution is next to d(i) dshift = d(i) else ! solution is next to d(i+1) dshift = d(i+1) endif delta(:) = d(:) - dshift a = delta(i) b = delta(i+1) endif ! Bisection: do iter=1,200 ! Interval subdivision x = 0.5*(a+b) if(x==a .or. x==b) exit ! No further interval subdivisions possible if(abs(x) < 1.d-200) exit ! x next to pole ! evaluate value at x y = 1. + rho*SUM(z(:)**2/(delta(:)-x)) if(y==0) then ! found exact solution exit elseif(y>0) then b = x else a = x endif enddo ! Solution: dlam = x + dshift delta(:) = delta(:) - x #ifdef HAVE_DETAILED_TIMINGS call timer%stop("solve_secular_equation") #endif end subroutine solve_secular_equation !------------------------------------------------------------------------------- integer function local_index(idx, my_proc, num_procs, nblk, iflag) !------------------------------------------------------------------------------- ! local_index: returns the local index for a given global index ! If the global index has no local index on the ! processor my_proc behaviour is defined by iflag ! ! Parameters ! ! idx Global index ! ! my_proc Processor row/column for which to calculate the local index ! ! num_procs Total number of processors along row/column ! ! nblk Blocksize ! ! iflag Controls the behaviour if idx is not on local processor ! iflag< 0 : Return last local index before that row/col ! iflag==0 : Return 0 ! iflag> 0 : Return next local index after that row/col !------------------------------------------------------------------------------- implicit none integer idx, my_proc, num_procs, nblk, iflag integer iblk iblk = (idx-1)/nblk ! global block number, 0 based if(mod(iblk,num_procs) == my_proc) then ! block is local, always return local row/col number local_index = (iblk/num_procs)*nblk + mod(idx-1,nblk) + 1 else ! non local block if(iflag == 0) then local_index = 0 else local_index = (iblk/num_procs)*nblk if(mod(iblk,num_procs) > my_proc) local_index = local_index + nblk if(iflag>0) local_index = local_index + 1 endif endif end function local_index !------------------------------------------------------------------------------- subroutine cholesky_real(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, wantDebug, success) !------------------------------------------------------------------------------- ! cholesky_real: Cholesky factorization of a real symmetric matrix ! ! Parameters ! ! na Order of matrix ! ! a(lda,matrixCols) Distributed matrix which should be factorized. ! Distribution is like in Scalapack. ! Only upper triangle is needs to be set. ! On return, the upper triangle contains the Cholesky factor ! and the lower triangle is set to 0. ! ! lda Leading dimension of a ! matrixCols local columns of matrix a ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer :: na, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols real*8 :: a(lda,matrixCols) integer :: my_prow, my_pcol, np_rows, np_cols, mpierr integer :: l_cols, l_rows, l_col1, l_row1, l_colx, l_rowx integer :: n, nc, i, info integer :: lcs, lce, lrs, lre integer :: tile_size, l_rows_tile, l_cols_tile real*8, allocatable :: tmp1(:), tmp2(:,:), tmatr(:,:), tmatc(:,:) logical, intent(in) :: wantDebug logical, intent(out) :: success #ifdef HAVE_DETAILED_TIMINGS call timer%start("cholesky_real") #endif call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) success = .true. ! Matrix is split into tiles; work is done only for tiles on the diagonal or above tile_size = nblk*least_common_multiple(np_rows,np_cols) ! minimum global tile size tile_size = ((128*max(np_rows,np_cols)-1)/tile_size+1)*tile_size ! make local tiles at least 128 wide l_rows_tile = tile_size/np_rows ! local rows of a tile l_cols_tile = tile_size/np_cols ! local cols of a tile l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local cols of a allocate(tmp1(nblk*nblk)) allocate(tmp2(nblk,nblk)) tmp1 = 0 tmp2 = 0 allocate(tmatr(l_rows,nblk)) allocate(tmatc(l_cols,nblk)) tmatr = 0 tmatc = 0 do n = 1, na, nblk ! Calculate first local row and column of the still remaining matrix ! on the local processor l_row1 = local_index(n, my_prow, np_rows, nblk, +1) l_col1 = local_index(n, my_pcol, np_cols, nblk, +1) l_rowx = local_index(n+nblk, my_prow, np_rows, nblk, +1) l_colx = local_index(n+nblk, my_pcol, np_cols, nblk, +1) if(n+nblk > na) then ! This is the last step, just do a Cholesky-Factorization ! of the remaining block if(my_prow==prow(n, nblk, np_rows) .and. my_pcol==pcol(n, nblk, np_cols)) then call dpotrf('U',na-n+1,a(l_row1,l_col1),lda,info) if(info/=0) then if (wantDebug) write(error_unit,*) "ELPA1_cholesky_real: Error in dpotrf" success = .false. return endif endif exit ! Loop endif if(my_prow==prow(n, nblk, np_rows)) then if(my_pcol==pcol(n, nblk, np_cols)) then ! The process owning the upper left remaining block does the ! Cholesky-Factorization of this block call dpotrf('U',nblk,a(l_row1,l_col1),lda,info) if(info/=0) then if (wantDebug) write(error_unit,*) "ELPA1_cholesky_real: Error in dpotrf" success = .false. return endif nc = 0 do i=1,nblk tmp1(nc+1:nc+i) = a(l_row1:l_row1+i-1,l_col1+i-1) nc = nc+i enddo endif call MPI_Bcast(tmp1,nblk*(nblk+1)/2,MPI_REAL8,pcol(n, nblk, np_cols),mpi_comm_cols,mpierr) nc = 0 do i=1,nblk tmp2(1:i,i) = tmp1(nc+1:nc+i) nc = nc+i enddo if(l_cols-l_colx+1>0) & call dtrsm('L','U','T','N',nblk,l_cols-l_colx+1,1.d0,tmp2,ubound(tmp2,dim=1),a(l_row1,l_colx),lda) endif do i=1,nblk if(my_prow==prow(n, nblk, np_rows)) tmatc(l_colx:l_cols,i) = a(l_row1+i-1,l_colx:l_cols) if(l_cols-l_colx+1>0) & call MPI_Bcast(tmatc(l_colx,i),l_cols-l_colx+1,MPI_REAL8,prow(n, nblk, np_rows),mpi_comm_rows,mpierr) enddo ! this has to be checked since it was changed substantially when doing type safe call elpa_transpose_vectors_real (tmatc, ubound(tmatc,dim=1), mpi_comm_cols, & tmatr, ubound(tmatr,dim=1), mpi_comm_rows, & n, na, nblk, nblk) do i=0,(na-1)/tile_size lcs = max(l_colx,i*l_cols_tile+1) lce = min(l_cols,(i+1)*l_cols_tile) lrs = l_rowx lre = min(l_rows,(i+1)*l_rows_tile) if(lce0) & call DTRMM('L','U','N','N',nb,l_cols-l_colx+1,1.d0,tmp2,ubound(tmp2,dim=1),a(l_row1,l_colx),lda) if(l_colx<=l_cols) tmat2(1:nb,l_colx:l_cols) = a(l_row1:l_row1+nb-1,l_colx:l_cols) if(my_pcol==pcol(n, nblk, np_cols)) tmat2(1:nb,l_col1:l_col1+nb-1) = tmp2(1:nb,1:nb) ! tmp2 has the lower left triangle 0 endif if(l_row1>1) then if(my_pcol==pcol(n, nblk, np_cols)) then tmat1(1:l_row1-1,1:nb) = a(1:l_row1-1,l_col1:l_col1+nb-1) a(1:l_row1-1,l_col1:l_col1+nb-1) = 0 endif do i=1,nb call MPI_Bcast(tmat1(1,i),l_row1-1,MPI_REAL8,pcol(n, nblk, np_cols),mpi_comm_cols,mpierr) enddo endif if(l_cols-l_col1+1>0) & call MPI_Bcast(tmat2(1,l_col1),(l_cols-l_col1+1)*nblk,MPI_REAL8,prow(n, nblk, np_rows),mpi_comm_rows,mpierr) if(l_row1>1 .and. l_cols-l_col1+1>0) & call dgemm('N','N',l_row1-1,l_cols-l_col1+1,nb, -1.d0, & tmat1,ubound(tmat1,dim=1),tmat2(1,l_col1),ubound(tmat2,dim=1), & 1.d0, a(1,l_col1),lda) enddo deallocate(tmp1, tmp2, tmat1, tmat2) end subroutine invert_trm_real !------------------------------------------------------------------------------- subroutine cholesky_complex(na, a, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols, wantDebug, success) !------------------------------------------------------------------------------- ! cholesky_complex: Cholesky factorization of a complex hermitian matrix ! ! Parameters ! ! na Order of matrix ! ! a(lda,matriCols) Distributed matrix which should be factorized. ! Distribution is like in Scalapack. ! Only upper triangle is needs to be set. ! On return, the upper triangle contains the Cholesky factor ! and the lower triangle is set to 0. ! ! lda Leading dimension of a ! matrixCols local columns of matrix a ! ! nblk blocksize of cyclic distribution, must be the same in both directions! ! ! mpi_comm_rows ! mpi_comm_cols ! MPI-Communicators for rows/columns ! !------------------------------------------------------------------------------- #ifdef HAVE_DETAILED_TIMINGS use timings #endif implicit none integer :: na, lda, nblk, matrixCols, mpi_comm_rows, mpi_comm_cols complex*16 :: a(lda,matrixCols) integer :: my_prow, my_pcol, np_rows, np_cols, mpierr integer :: l_cols, l_rows, l_col1, l_row1, l_colx, l_rowx integer :: n, nc, i, info integer :: lcs, lce, lrs, lre integer :: tile_size, l_rows_tile, l_cols_tile complex*16, allocatable :: tmp1(:), tmp2(:,:), tmatr(:,:), tmatc(:,:) logical, intent(in) :: wantDebug logical, intent(out) :: success #ifdef HAVE_DETAILED_TIMINGS call timer%start("cholesky_complex") #endif success = .true. call mpi_comm_rank(mpi_comm_rows,my_prow,mpierr) call mpi_comm_size(mpi_comm_rows,np_rows,mpierr) call mpi_comm_rank(mpi_comm_cols,my_pcol,mpierr) call mpi_comm_size(mpi_comm_cols,np_cols,mpierr) ! Matrix is split into tiles; work is done only for tiles on the diagonal or above tile_size = nblk*least_common_multiple(np_rows,np_cols) ! minimum global tile size tile_size = ((128*max(np_rows,np_cols)-1)/tile_size+1)*tile_size ! make local tiles at least 128 wide l_rows_tile = tile_size/np_rows ! local rows of a tile l_cols_tile = tile_size/np_cols ! local cols of a tile l_rows = local_index(na, my_prow, np_rows, nblk, -1) ! Local rows of a l_cols = local_index(na, my_pcol, np_cols, nblk, -1) ! Local cols of a allocate(tmp1(nblk*nblk)) allocate(tmp2(nblk,nblk)) tmp1 = 0 tmp2 = 0 allocate(tmatr(l_rows,nblk)) allocate(tmatc(l_cols,nblk)) tmatr = 0 tmatc = 0 do n = 1, na, nblk ! Calculate first local row and column of the still remaining matrix ! on the local processor l_row1 = local_index(n, my_prow, np_rows, nblk, +1) l_col1 = local_index(n, my_pcol, np_cols, nblk, +1) l_rowx = local_index(n+nblk, my_prow, np_rows, nblk, +1) l_colx = local_index(n+nblk, my_pcol, np_cols, nblk, +1) if(n+nblk > na) then ! This is the last step, just do a Cholesky-Factorization ! of the remaining block if(my_prow==prow(n, nblk, np_rows) .and. my_pcol==pcol(n, nblk, np_cols)) then call zpotrf('U',na-n+1,a(l_row1,l_col1),lda,info) if(info/=0) then if (wantDebug) write(error_unit,*) "ELPA1_cholesky_complex: Error in zpotrf" success = .false. return endif endif exit ! Loop endif if(my_prow==prow(n, nblk, np_rows)) then if(my_pcol==pcol(n, nblk, np_cols)) then ! The process owning the upper left remaining block does the ! Cholesky-Factorization of this block call zpotrf('U',nblk,a(l_row1,l_col1),lda,info) if(info/=0) then if (wantDebug) write(error_unit,*) "ELPA1_cholesky_complex: Error in zpotrf" success = .false. return endif nc = 0 do i=1,nblk tmp1(nc+1:nc+i) = a(l_row1:l_row1+i-1,l_col1+i-1) nc = nc+i enddo endif call MPI_Bcast(tmp1,nblk*(nblk+1)/2,MPI_DOUBLE_COMPLEX,pcol(n, nblk, np_cols),mpi_comm_cols,mpierr) nc = 0 do i=1,nblk tmp2(1:i,i) = tmp1(nc+1:nc+i) nc = nc+i enddo if(l_cols-l_colx+1>0) & call ztrsm('L','U','C','N',nblk,l_cols-l_colx+1,(1.d0,0.d0),tmp2,ubound(tmp2,dim=1),a(l_row1,l_colx),lda) endif do i=1,nblk if(my_prow==prow(n, nblk, np_rows)) tmatc(l_colx:l_cols,i) = conjg(a(l_row1+i-1,l_colx:l_cols)) if(l_cols-l_colx+1>0) & call MPI_Bcast(tmatc(l_colx,i),l_cols-l_colx+1,MPI_DOUBLE_COMPLEX,prow(n, nblk, np_rows),mpi_comm_rows,mpierr) enddo ! this has to be checked since it was changed substantially when doing type safe call elpa_transpose_vectors_complex (tmatc, ubound(tmatc,dim=1), mpi_comm_cols, & tmatr, ubound(tmatr,dim=1), mpi_comm_rows, & n, na, nblk, nblk) do i=0,(na-1)/tile_size lcs = max(l_colx,i*l_cols_tile+1) lce = min(l_cols,(i+1)*l_cols_tile) lrs = l_rowx lre = min(l_rows,(i+1)*l_rows_tile) if(lce0) & call ZTRMM('L','U','N','N',nb,l_cols-l_colx+1,(1.d0,0.d0),tmp2,ubound(tmp2,dim=1),a(l_row1,l_colx),lda) if(l_colx<=l_cols) tmat2(1:nb,l_colx:l_cols) = a(l_row1:l_row1+nb-1,l_colx:l_cols) if(my_pcol==pcol(n, nblk, np_cols)) tmat2(1:nb,l_col1:l_col1+nb-1) = tmp2(1:nb,1:nb) ! tmp2 has the lower left triangle 0 endif if(l_row1>1) then if(my_pcol==pcol(n, nblk, np_cols)) then tmat1(1:l_row1-1,1:nb) = a(1:l_row1-1,l_col1:l_col1+nb-1) a(1:l_row1-1,l_col1:l_col1+nb-1) = 0 endif do i=1,nb call MPI_Bcast(tmat1(1,i),l_row1-1,MPI_DOUBLE_COMPLEX,pcol(n, nblk, np_cols),mpi_comm_cols,mpierr) enddo endif if(l_cols-l_col1+1>0) & call MPI_Bcast(tmat2(1,l_col1),(l_cols-l_col1+1)*nblk,MPI_DOUBLE_COMPLEX,prow(n, nblk, np_rows),mpi_comm_rows,mpierr) if(l_row1>1 .and. l_cols-l_col1+1>0) & call ZGEMM('N','N',l_row1-1,l_cols-l_col1+1,nb, (-1.d0,0.d0), & tmat1,ubound(tmat1,dim=1),tmat2(1,l_col1),ubound(tmat2,dim=1), & (1.d0,0.d0), a(1,l_col1),lda) enddo deallocate(tmp1, tmp2, tmat1, tmat2) end subroutine invert_trm_complex ! -------------------------------------------------------------------------------------------------- integer function least_common_multiple(a, b) ! Returns the least common multiple of a and b ! There may be more efficient ways to do this, we use the most simple approach implicit none integer, intent(in) :: a, b do least_common_multiple = a, a*(b-1), a if(mod(least_common_multiple,b)==0) exit enddo ! if the loop is left regularly, least_common_multiple = a*b end function ! -------------------------------------------------------------------------------------------------- subroutine hh_transform_real(alpha, xnorm_sq, xf, tau) ! Similar to LAPACK routine DLARFP, but uses ||x||**2 instead of x(:) ! and returns the factor xf by which x has to be scaled. ! It also hasn't the special handling for numbers < 1.d-300 or > 1.d150 ! since this would be expensive for the parallel implementation. implicit none real*8, intent(inout) :: alpha real*8, intent(in) :: xnorm_sq real*8, intent(out) :: xf, tau real*8 BETA if( XNORM_SQ==0. ) then if( ALPHA>=0. ) then TAU = 0. else TAU = 2. ALPHA = -ALPHA endif XF = 0. else BETA = SIGN( SQRT( ALPHA**2 + XNORM_SQ ), ALPHA ) ALPHA = ALPHA + BETA IF( BETA<0 ) THEN BETA = -BETA TAU = -ALPHA / BETA ELSE ALPHA = XNORM_SQ / ALPHA TAU = ALPHA / BETA ALPHA = -ALPHA END IF XF = 1./ALPHA ALPHA = BETA endif end subroutine ! -------------------------------------------------------------------------------------------------- subroutine hh_transform_complex(alpha, xnorm_sq, xf, tau) ! Similar to LAPACK routine ZLARFP, but uses ||x||**2 instead of x(:) ! and returns the factor xf by which x has to be scaled. ! It also hasn't the special handling for numbers < 1.d-300 or > 1.d150 ! since this would be expensive for the parallel implementation. implicit none complex*16, intent(inout) :: alpha real*8, intent(in) :: xnorm_sq complex*16, intent(out) :: xf, tau real*8 ALPHR, ALPHI, BETA ALPHR = DBLE( ALPHA ) ALPHI = DIMAG( ALPHA ) if( XNORM_SQ==0. .AND. ALPHI==0. ) then if( ALPHR>=0. ) then TAU = 0. else TAU = 2. ALPHA = -ALPHA endif XF = 0. else BETA = SIGN( SQRT( ALPHR**2 + ALPHI**2 + XNORM_SQ ), ALPHR ) ALPHA = ALPHA + BETA IF( BETA<0 ) THEN BETA = -BETA TAU = -ALPHA / BETA ELSE ALPHR = ALPHI * (ALPHI/DBLE( ALPHA )) ALPHR = ALPHR + XNORM_SQ/DBLE( ALPHA ) TAU = DCMPLX( ALPHR/BETA, -ALPHI/BETA ) ALPHA = DCMPLX( -ALPHR, ALPHI ) END IF XF = 1./ALPHA ALPHA = BETA endif end subroutine ! -------------------------------------------------------------------------------------------------- end module ELPA1 ! -------------------------------------------------------------------------------------------------- ! Please note that the following routines are outside of the module ELPA1 ! so that they can be used with real or complex data ! -------------------------------------------------------------------------------------------------- ! to do: write interface and put them into module !------------------------------------------------------------------------------- !-------------------------------------------------------------------------------