# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright(C) 2019-2020 Max-Planck-Society # Author: Philipp Arras, Philipp Frank, Philipp Haim, Reimar Leike, # Jakob Knollmueller import sys import gc from mpi4py import MPI from functools import reduce from operator import add from time import time import nifty6 as ift import src as vlbi from config import comm, nranks, rank, master from config import doms, eps, min_timestamps_per_bin, nthreads from config import sky_movie_mf as sky from config import startt, dt, npixt def stat_plotting(pos, KL): if master: sc = ift.StatCalculator() sc_mean = ift.StatCalculator() sc_spectral = ift.StatCalculator() skysamples = {'hi': [], 'lo': []} # CAUTION: the loop and the if-clause cannot be interchanged! for ss in KL.samples: if master: samp = sky(pos+ss) skysamples['hi'].append(samp['hi']) skysamples['lo'].append(samp['lo']) sc.add(samp) sc_mean.add(0.5*(samp['hi']+samp['lo']).val) sc_spectral.add(2 * (samp['lo']-samp['hi']).val / (samp['lo']+samp['hi']).val ) def optimization_heuristic(ii, likelihoods): lh_full, lh_full_amp, lh_full_ph, lh_cut, lh_cut_amp, lh_cut_ph = likelihoods N_samples = 10 * (1 + ii // 8) N_iterations = 4 * (4 + ii // 4) if ii<50 else 20 eps = 0.1 clvl = 3 if ii < 20 else 2 ic_newton = ift.AbsDeltaEnergyController(eps, iteration_limit=N_iterations, name=f'it_{ii}' if master else None, convergence_level=clvl) if ii < 50: minimizer = ift.VL_BFGS(ic_newton) else: minimizer = ift.NewtonCG(ic_newton) if ii < 30: if ii % 2 == 0 or ii < 10: lh = lh_cut elif ii % 4 == 1: lh = lh_cut_amp else: lh = lh_cut_ph else: if ii % 2 == 0 or ii > 50: lh = lh_full elif ii % 4 == 1: lh = lh_full_amp else: lh = lh_full_ph return minimizer, N_samples, N_iterations, lh def build_likelihood(rawdd, startt, npixt, dt, mode): lh = [] active_inds = [] for freq in vlbi.data.FREQS: rawd = rawdd[freq] args = {'tmin': startt, 'tmax': npixt*dt, 'delta_t': dt} for ii, dd in enumerate(vlbi.time_binning(rawd, **args)): if len(dd) == 0: continue ind = str(ii) + "_" + freq active_inds.append(ind) nfft = vlbi.RadioResponse(doms, dd['uv'], eps, nthreads) vis = ift.makeField(nfft.target, dd['vis']) llh = [] if mode in ['amp', 'full']: vis2closampl, evalsampl = vlbi.Visibilities2ClosureAmplitudes(dd) llh.append(ift.GaussianEnergy(mean=vis2closampl(vis)) @ vis2closampl) if mode in ['ph', 'full']: vis2closph, evalsph, _ = vlbi.Visibilities2ClosurePhases(dd) llh.append(ift.GaussianEnergy(mean=vis2closph(vis)) @ vis2closph) llh_op = reduce(add, llh) @ nfft.ducktape(ind) if mode == 'full' and freq == vlbi.data.FREQS[0]: ift.extra.check_jacobian_consistency(llh_op, ift.from_random(llh_op.domain), tol=1e-5, ntries=10) lh.append(llh_op) conv = vlbi.DomainTuple2MultiField(sky.target, active_inds) return reduce(add, lh) @ conv def setup(): if len(sys.argv) != 3: raise RuntimeError _, pre_data, fname_input = sys.argv rawd = {} for freq in vlbi.data.FREQS: rawd[freq] = vlbi.combined_data(f'data/{pre_data}', [freq], min_timestamps_per_bin) if master else None if nranks > 1: rawd = comm.bcast(rawd) pre_output = pre_data lh_full = build_likelihood(rawd, startt, npixt, dt, 'full') lh_full_ph = build_likelihood(rawd, startt, npixt, dt, 'ph') lh_full_amp = build_likelihood(rawd, startt, npixt, dt, 'amp') lh_cut = build_likelihood(rawd, startt, npixt//2, dt, 'full') lh_cut_ph = build_likelihood(rawd, startt, npixt//2, dt, 'ph') lh_cut_amp = build_likelihood(rawd, startt, npixt//2, dt, 'amp') pos = vlbi.load_hdf5(fname_input, sky.domain) if master else None if nranks > 1: pos = comm.bcast(pos) ic = ift.AbsDeltaEnergyController(0.5, iteration_limit=200, name=f'Sampling(task {rank})', convergence_level=3) if master: t0 = time() (lh_full @ sky)(pos) print(f'Likelihood call: {1000*(time()-t0):.0f} ms') return pos, sky, ic, pre_output, (lh_full, lh_full_amp, lh_full_ph, lh_cut, lh_cut_amp, lh_cut_ph) # Encapsulate everything in functions, to avoid as many (unintended) global variables as possible def main(): pos, sky, ic, pre_output, likelihoods = setup() for ii in range(60): gc.collect() minimizer, N_samples, N_iterations, lh = optimization_heuristic(ii, likelihoods) if master: print(f'Iter: {ii}, N_samples: {N_samples}, N_iter: {N_iterations}') ll = lh @ sky H = ift.StandardHamiltonian(ll, ic) KL = ift.MetricGaussianKL(pos, H, N_samples, comm=comm, mirror_samples=True) del ll, H gc.collect() KL, _ = minimizer(KL) pos = KL.position vlbi.save_state(sky, KL.position, f'{pre_output}_', ii, samples=KL.samples, master=master) del KL gc.collect() if __name__ == '__main__': with open("initial_random_state.txt", "rb") as f: ift.random.setState(f.read()) main()