Commit 21ab7bf6 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

Merge branch 'new_data_structure' into 'ducc0'

New data structure

See merge request !39
parents 7634e14d 392889d3
Pipeline #82315 passed with stages
in 12 minutes and 48 seconds
0.6.0:
- general:
- multi-threading improvements contributed by Peter Bell
- wgridder:
- new, smaller internal data structure
0.5.0:
- wgridder:
- internally used grid size is now chosen automatically, and the parameters
"nu" and "nv" are ignored; they will be removed in ducc1.
0.3.0:
- general:
- The package should now be installable from PyPI via pip even on MacOS.
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2019-2020 Max-Planck-Society
from time import time
import ducc0.wgridder as wgridder
import matplotlib.pyplot as plt
import numpy as np
def get_indices(name):
from os.path import join
from casacore.tables import table
with table(join(name, 'POLARIZATION'), readonly=True, ack=False) as t:
pol = list(t.getcol('CORR_TYPE')[0])
if set(pol) <= set([5, 6, 7, 8]):
ind = [pol.index(5), pol.index(8)]
else:
ind = [pol.index(9), pol.index(12)]
return ind
def determine_weighting(t):
fullwgt = False
weightcol = "WEIGHT"
try:
t.getcol("WEIGHT_SPECTRUM", startrow=0, nrow=1)
weightcol = "WEIGHT_SPECTRUM"
fullwgt = True
except:
pass
return fullwgt, weightcol
def extra_checks(t):
if len(set(t.getcol('FIELD_ID'))) != 1:
raise RuntimeError
if len(set(t.getcol('DATA_DESC_ID'))) != 1:
raise RuntimeError
def read_ms_i(name):
# Assumptions:
# - Only one field
# - Only one spectral window
# - Flag both LL and RR if one is flagged
from os.path import join
from casacore.tables import table
with table(join(name, 'SPECTRAL_WINDOW'), readonly=True, ack=False) as t:
freq = t.getcol('CHAN_FREQ')[0]
nchan = freq.shape[0]
ind = get_indices(name)
with table(name, readonly=True, ack=False) as t:
fullwgt, weightcol = determine_weighting(t)
extra_checks(t)
nrow = t.nrows()
active_rows = np.ones(nrow, dtype=np.bool)
active_channels = np.zeros(nchan, dtype=np.bool)
step = max(1, nrow//100) # how many rows to read in every step
# determine which subset of rows/channels we need to input
start = 0
while start < nrow:
stop = min(nrow, start+step)
tflags = t.getcol('FLAG', startrow=start, nrow=stop-start)
ncorr = tflags.shape[2]
tflags = tflags[..., ind]
tflags = np.any(tflags.astype(np.bool), axis=-1)
twgt = t.getcol(weightcol, startrow=start, nrow=stop-start)[..., ind]
twgt = 1/np.sum(1/twgt, axis=-1)
tflags[twgt==0] = True
active_rows[start:stop] = np.invert(np.all(tflags, axis=-1))
active_channels = np.logical_or(active_channels, np.invert(np.all(tflags, axis=0)))
start = stop
nrealrows, nrealchan = np.sum(active_rows), np.sum(active_channels)
start, realstart = 0, 0
vis = np.empty((nrealrows, nrealchan), dtype=np.complex64)
wgtshp = (nrealrows, nrealchan) if fullwgt else (nrealrows,)
wgt = np.empty(wgtshp, dtype=np.float32)
flags = np.empty((nrealrows, nrealchan), dtype=np.bool)
while start < nrow:
stop = min(nrow, start+step)
realstop = realstart+np.sum(active_rows[start:stop])
if realstop > realstart:
allrows = stop-start == realstop-realstart
tvis = t.getcol("DATA", startrow=start, nrow=stop-start)[..., ind]
tvis = np.sum(tvis, axis=-1)
if not allrows:
tvis = tvis[active_rows[start:stop]]
tvis = tvis[:, active_channels]
tflags = t.getcol('FLAG', startrow=start, nrow=stop-start)[..., ind]
tflags = np.any(tflags.astype(np.bool), axis=-1)
if not allrows:
tflags = tflags[active_rows[start:stop]]
tflags = tflags[:, active_channels]
twgt = t.getcol(weightcol, startrow=start, nrow=stop-start)[..., ind]
twgt = 1/np.sum(1/twgt, axis=-1)
if not allrows:
twgt = twgt[active_rows[start:stop]]
if fullwgt:
twgt = twgt[:, active_channels]
tflags[twgt==0] = True
vis[realstart:realstop] = tvis
wgt[realstart:realstop] = twgt
flags[realstart:realstop] = tflags
start, realstart = stop, realstop
uvw = t.getcol("UVW")[active_rows]
print('# Rows: {} ({} fully flagged)'.format(nrow, nrow-vis.shape[0]))
print('# Channels: {} ({} fully flagged)'.format(nchan, nchan-vis.shape[1]))
print('# Correlations: {}'.format(ncorr))
print('Full weights' if fullwgt else 'Row-only weights')
nflagged = np.sum(flags) + (nrow-nrealrows)*nchan + (nchan-nrealchan)*nrow
print("{} % flagged".format(nflagged/(nrow*nchan)*100))
freq = freq[active_channels]
# blow up wgt to the right dimensions if necessary
if not fullwgt:
wgt = np.broadcast_to(wgt.reshape((-1,1)), vis.shape)
return (np.ascontiguousarray(uvw),
np.ascontiguousarray(freq),
np.ascontiguousarray(vis),
np.ascontiguousarray(wgt) if fullwgt else wgt,
1-flags.astype(np.uint8))
def main():
# ms, fov_deg = '/home/martin/ms/supernovashell.55.7+3.4.spw0.ms', 2.
# ms, fov_deg = '/home/martin/ms/1052736496-averaged.ms', 45.
ms, fov_deg = '/home/martin/ms/1052735056.ms', 45.
# ms, fov_deg = '/home/martin/ms/cleaned_G330.89-0.36.ms', 2.
uvw, freq, vis, wgt, flags = read_ms_i(ms)
npixdirty = 1200
DEG2RAD = np.pi/180
pixsize = fov_deg/npixdirty*DEG2RAD
nthreads = 2
epsilon = 1e-4
print('Start gridding...')
do_wstacking = True
t0 = time()
dirty = wgridder.ms2dirty(uvw, freq, vis, wgt, npixdirty, npixdirty, pixsize,
pixsize, 0, 0, epsilon, do_wstacking, nthreads, verbosity=1, mask=flags)
print('Done')
t = time() - t0
print("{} s".format(t))
t0 = time()
_ = wgridder.dirty2ms(uvw, freq, dirty, wgt, pixsize,
pixsize, 0, 0, epsilon, do_wstacking, nthreads, verbosity=1, mask=flags)
print('Done')
t = time() - t0
print("{} s".format(t))
print("{} visibilities/thread/s".format(np.sum(wgt != 0)/nthreads/t))
plt.imshow(dirty.T, origin='lower')
plt.show()
if __name__ == "__main__":
main()
#ifndef GRIDDER_CXX_H
#define GRIDDER_CXX_H
/*
* This file is part of nifty_gridder.
*
......@@ -22,6 +19,9 @@
/* Copyright (C) 2019-2020 Max-Planck-Society
Author: Martin Reinecke */
#ifndef GRIDDER_CXX_H
#define GRIDDER_CXX_H
#include <iostream>
#include <algorithm>
#include <cstdlib>
......@@ -46,6 +46,19 @@ namespace detail_gridder {
using namespace std;
template<typename T> complex<T> hsum_cmplx(native_simd<T> vr, native_simd<T> vi)
{ return complex<T>(reduce(vr, std::plus<>()), reduce(vi, std::plus<>())); }
#if (defined(__AVX__) && (!defined(__AVX512F__)))
inline complex<float> hsum_cmplx(native_simd<float> vr, native_simd<float> vi)
{
auto t1 = _mm256_hadd_ps(vr, vi);
auto t2 = _mm_hadd_ps(_mm256_extractf128_ps(t1, 0), _mm256_extractf128_ps(t1, 1));
t2 += _mm_shuffle_ps(t2, t2, _MM_SHUFFLE(1,0,3,2));
return complex<float>(t2[0], t2[1]);
}
#endif
template<size_t ndim> void checkShape
(const array<size_t, ndim> &shp1, const array<size_t, ndim> &shp2)
{ MR_assert(shp1==shp2, "shape mismatch"); }
......@@ -63,9 +76,9 @@ template<typename T> void complex2hartley
MR_assert(grid.conformable(grid2), "shape mismatch");
size_t nu=grid.shape(0), nv=grid.shape(1);
execStatic(nu, nthreads, 0, [&](Scheduler &sched)
execParallel(nu, nthreads, [&](size_t lo, size_t hi)
{
while (auto rng=sched.getNext()) for(auto u=rng.lo; u<rng.hi; ++u)
for(auto u=lo; u<hi; ++u)
{
size_t xu = (u==0) ? 0 : nu-u;
for (size_t v=0; v<nv; ++v)
......@@ -84,9 +97,9 @@ template<typename T> void hartley2complex
MR_assert(grid.conformable(grid2), "shape mismatch");
size_t nu=grid.shape(0), nv=grid.shape(1);
execStatic(nu, nthreads, 0, [&](Scheduler &sched)
execParallel(nu, nthreads, [&](size_t lo, size_t hi)
{
while (auto rng=sched.getNext()) for(auto u=rng.lo; u<rng.hi; ++u)
for(auto u=lo; u<hi; ++u)
{
size_t xu = (u==0) ? 0 : nu-u;
for (size_t v=0; v<nv; ++v)
......@@ -118,9 +131,10 @@ template<typename T> void hartley2_2D(mav<T,2> &arr, size_t vlim,
}
else
r2r_separable_hartley(farr, farr, {0,1}, T(1), nthreads);
execStatic((nu+1)/2-1, nthreads, 0, [&](Scheduler &sched)
execParallel((nu+1)/2-1, nthreads, [&](size_t lo, size_t hi)
{
while (auto rng=sched.getNext()) for(auto i=rng.lo+1; i<rng.hi+1; ++i)
for(auto i=lo+1; i<hi+1; ++i)
for(size_t j=1; j<(nv+1)/2; ++j)
{
T a = arr(i,j);
......@@ -135,13 +149,35 @@ template<typename T> void hartley2_2D(mav<T,2> &arr, size_t vlim,
});
}
using idx_t = uint32_t;
class Uvwidx
{
public:
uint16_t tile_u, tile_v, minplane;
Uvwidx() {}
Uvwidx(uint16_t tile_u_, uint16_t tile_v_, uint16_t minplane_)
: tile_u(tile_u_), tile_v(tile_v_), minplane(minplane_) {}
uint64_t idx() const
{ return (uint64_t(tile_u)<<32) + (uint64_t(tile_v)<<16) + minplane; }
bool operator!=(const Uvwidx &other) const
{ return idx()!=other.idx(); }
bool operator<(const Uvwidx &other) const
{ return idx()<other.idx(); }
};
struct RowChan
class RowchanRange
{
idx_t row, chan;
public:
uint32_t row;
uint16_t ch_begin, ch_end;
RowchanRange(uint32_t row_, uint16_t ch_begin_, uint16_t ch_end_)
: row(row_), ch_begin(ch_begin_), ch_end(ch_end_) {}
};
using VVR = vector<pair<Uvwidx, vector<RowchanRange>>>;
struct UVW
{
double u, v, w;
......@@ -163,26 +199,18 @@ class Baselines
protected:
vector<UVW> coord;
vector<double> f_over_c;
idx_t nrows, nchan;
idx_t shift, mask;
size_t nrows, nchan;
double umax, vmax;
public:
Baselines() = default;
template<typename T> Baselines(const mav<T,2> &coord_,
const mav<T,1> &freq, bool negate_v=false)
{
constexpr double speedOfLight = 299792458.;
MR_assert(coord_.shape(1)==3, "dimension mismatch");
auto hugeval = size_t(~(idx_t(0)));
MR_assert(coord_.shape(0)<hugeval, "too many entries in MS");
MR_assert(coord_.shape(1)<hugeval, "too many entries in MS");
MR_assert(coord_.size()<hugeval, "too many entries in MS");
nrows = coord_.shape(0);
nchan = freq.shape(0);
shift=0;
while((idx_t(1)<<shift)<nchan) ++shift;
mask=(idx_t(1)<<shift)-1;
MR_assert(nrows*(mask+1)<hugeval, "too many entries in MS");
f_over_c.resize(nchan);
double fcmax = 0;
for (size_t i=0; i<nchan; ++i)
......@@ -204,144 +232,111 @@ class Baselines
vmax *= fcmax;
}
RowChan getRowChan(idx_t index) const
{ return RowChan{index>>shift, index&mask}; }
UVW effectiveCoord(const RowChan &rc) const
{ return coord[rc.row]*f_over_c[rc.chan]; }
UVW effectiveCoord(idx_t index) const
{ return effectiveCoord(getRowChan(index)); }
UVW effectiveCoord(size_t row, size_t chan) const
{ return coord[row]*f_over_c[chan]; }
size_t Nrows() const { return nrows; }
size_t Nchannels() const { return nchan; }
idx_t getIdx(idx_t irow, idx_t ichan) const
{ return ichan+(irow<<shift); }
double Umax() const { return umax; }
double Vmax() const { return vmax; }
};
template<typename T> class GridderConfig
constexpr int logsquare=4;
template<typename T> class Params
{
protected:
size_t nx_dirty, ny_dirty, nu, nv;
double epsilon, ofactor;
private:
bool gridding;
TimerHierarchy timers;
const mav<complex<T>,2> &ms_in;
mav<complex<T>,2> &ms_out;
const mav<T,2> &dirty_in;
mav<T,2> &dirty_out;
const mav<T,2> &wgt;
const mav<uint8_t,2> &mask;
double pixsize_x, pixsize_y;
size_t nxdirty, nydirty;
double epsilon;
bool do_wgridding;
size_t nthreads;
size_t verbosity;
bool negate_v, divide_by_n;
Baselines bl;
VVR ranges;
double wmin_d, wmax_d;
size_t nvis;
double wmin, dw;
size_t nplanes;
double nm1min;
size_t nu, nv;
double ofactor;
// FIXME: this should probably be done more cleanly
public:
TimerHierarchy &timers;
shared_ptr<HornerKernel<T>> krn;
protected:
double psx, psy;
size_t supp, nsafe;
size_t nthreads;
double ushift, vshift;
int maxiu0, maxiv0;
size_t vlim;
bool uv_side_fast;
T phase (T x, T y, T w, bool adjoint) const
static T phase (T x, T y, T w, bool adjoint)
{
constexpr T pi = T(3.141592653589793238462643383279502884197);
T tmp = 1-x-y;
if (tmp<=0) return 1; // no phase factor beyond the horizon
if (tmp<=0) return 0; // no phase factor beyond the horizon
T nm1 = (-x-y)/(sqrt(tmp)+1); // more accurate form of sqrt(1-x-y)-1
T phs = 2*pi*w*nm1;
if (adjoint) phs *= -1;
return phs;
}
public:
GridderConfig(size_t nxdirty, size_t nydirty, size_t nu_, size_t nv_,
size_t kidx, double epsilon_, double pixsize_x, double pixsize_y,
const Baselines &baselines, size_t nthreads_, TimerHierarchy &timers_)
: nx_dirty(nxdirty), ny_dirty(nydirty), nu(nu_), nv(nv_),
epsilon(epsilon_),
ofactor(min(double(nu)/nxdirty, double(nv)/nydirty)),
timers(timers_),
krn(selectKernel<T>(ofactor, epsilon,kidx)),
psx(pixsize_x), psy(pixsize_y),
supp(krn->support()), nsafe((supp+1)/2),
nthreads(nthreads_),
ushift(supp*(-0.5)+1+nu), vshift(supp*(-0.5)+1+nv),
maxiu0((nu+nsafe)-supp), maxiv0((nv+nsafe)-supp),
vlim(min(nv/2, size_t(nv*baselines.Vmax()*psy+0.5*supp+1))),
uv_side_fast(true)
void grid2dirty_post(mav<T,2> &tmav, mav<T,2> &dirty) const
{
size_t vlim2 = (nydirty+1)/2+(supp+1)/2;
if (vlim2<vlim)
checkShape(dirty.shape(), {nxdirty, nydirty});
auto cfu = krn->corfunc(nxdirty/2+1, 1./nu, nthreads);
auto cfv = krn->corfunc(nydirty/2+1, 1./nv, nthreads);
execParallel(nxdirty, nthreads, [&](size_t lo, size_t hi)
{
vlim = vlim2;
uv_side_fast = false;
}
MR_assert(nu>=2*nsafe, "nu too small");
MR_assert(nv>=2*nsafe, "nv too small");
MR_assert((nx_dirty&1)==0, "nx_dirty must be even");
MR_assert((ny_dirty&1)==0, "ny_dirty must be even");
MR_assert((nu&1)==0, "nu must be even");
MR_assert((nv&1)==0, "nv must be even");
MR_assert(epsilon>0, "epsilon must be positive");
MR_assert(pixsize_x>0, "pixsize_x must be positive");
MR_assert(pixsize_y>0, "pixsize_y must be positive");
}
size_t Nxdirty() const { return nx_dirty; }
size_t Nydirty() const { return ny_dirty; }
double Pixsize_x() const { return psx; }
double Pixsize_y() const { return psy; }
size_t Nu() const { return nu; }
size_t Nv() const { return nv; }
size_t Supp() const { return supp; }
size_t Nsafe() const { return nsafe; }
size_t Nthreads() const { return nthreads; }
double Epsilon() const { return epsilon; }
double Ofactor() const { return ofactor; }
void grid2dirty_post(mav<T,2> &tmav,
mav<T,2> &dirty) const
{
checkShape(dirty.shape(), {nx_dirty, ny_dirty});
auto cfu = krn->corfunc(nx_dirty/2+1, 1./nu, nthreads);
auto cfv = krn->corfunc(ny_dirty/2+1, 1./nv, nthreads);
execStatic(nx_dirty, nthreads, 0, [&](Scheduler &sched)
{
while (auto rng=sched.getNext()) for(auto i=rng.lo; i<rng.hi; ++i)
for (auto i=lo; i<hi; ++i)
{
int icfu = abs(int(nx_dirty/2)-int(i));
for (size_t j=0; j<ny_dirty; ++j)
int icfu = abs(int(nxdirty/2)-int(i));
for (size_t j=0; j<nydirty; ++j)
{
int icfv = abs(int(ny_dirty/2)-int(j));
size_t i2 = nu-nx_dirty/2+i;
int icfv = abs(int(nydirty/2)-int(j));
size_t i2 = nu-nxdirty/2+i;
if (i2>=nu) i2-=nu;
size_t j2 = nv-ny_dirty/2+j;
size_t j2 = nv-nydirty/2+j;
if (j2>=nv) j2-=nv;
dirty.v(i,j) = tmav(i2,j2)*T(cfu[icfu]*cfv[icfv]);
}
}
});
}
void grid2dirty_post2(
mav<complex<T>,2> &tmav, mav<T,2> &dirty, T w) const
void grid2dirty_post2(mav<complex<T>,2> &tmav, mav<T,2> &dirty, T w) const
{
checkShape(dirty.shape(), {nx_dirty,ny_dirty});
double x0 = -0.5*nx_dirty*psx,
y0 = -0.5*ny_dirty*psy;
execStatic(nx_dirty/2+1, nthreads, 0, [&](Scheduler &sched)
checkShape(dirty.shape(), {nxdirty,nydirty});
double x0 = -0.5*nxdirty*pixsize_x,
y0 = -0.5*nydirty*pixsize_y;
execParallel(nxdirty/2+1, nthreads, [&](size_t lo, size_t hi)
{
using vtype = native_simd<T>;
constexpr size_t vlen=vtype::size();
size_t nvec = (ny_dirty/2+1+(vlen-1))/vlen;
size_t nvec = (nydirty/2+1+(vlen-1))/vlen;
vector<vtype> ph(nvec), sp(nvec), cp(nvec);
while (auto rng=sched.getNext()) for(auto i=rng.lo; i<rng.hi; ++i)
for (auto i=lo; i<hi; ++i)
{
T fx = T(x0+i*psx);
T fx = T(x0+i*pixsize_x);
fx *= fx;
size_t ix = nu-nx_dirty/2+i;
size_t ix = nu-nxdirty/2+i;
if (ix>=nu) ix-=nu;
size_t i2 = nx_dirty-i;
size_t ix2 = nu-nx_dirty/2+i2;
size_t i2 = nxdirty-i;
size_t ix2 = nu-nxdirty/2+i2;
if (ix2>=nu) ix2-=nu;
for (size_t j=0; j<=ny_dirty/2; ++j)
for (size_t j=0; j<=nydirty/2; ++j)
{
T fy = T(y0+j*psy);
T fy = T(y0+j*pixsize_y);
ph[j/vlen][j%vlen] = phase(fx, fy*fy, w, true);
}
for (size_t j=0; j<nvec; ++j)
......@@ -351,17 +346,17 @@ template<typename T> class GridderConfig
for (size_t k=0; k<vlen; ++k)
cp[j][k]=cos(ph[j][k]);
if ((i>0)&&(i<i2))
for (size_t j=0, jx=nv-ny_dirty/2; j<ny_dirty; ++j, jx=(jx+1>=nv)? jx+1-nv : jx+1)
for (size_t j=0, jx=nv-nydirty/2; j<nydirty; ++j, jx=(jx+1>=nv)? jx+1-nv : jx+1)
{
size_t j2 = min(j, ny_dirty-j);
size_t j2 = min(j, nydirty-j);
T re = cp[j2/vlen][j2%vlen], im = sp[j2/vlen][j2%vlen];
dirty.v(i,j) += tmav(ix,jx).real()*re - tmav(ix,jx).imag()*im;
dirty.v(i2,j) += tmav(ix2,jx).real()*re - tmav(ix2,jx).imag()*im;
}
else
for (size_t j=0, jx=nv-ny_dirty/2; j<ny_dirty; ++j, jx=(jx+1>=nv)? jx+1-nv : jx+1)
for (size_t j=0, jx=nv-nydirty/2; j<nydirty; ++j, jx=(jx+1>=nv)? jx+1-nv : jx+1)
{
size_t j2 = min(j, ny_dirty-j);
size_t j2 = min(j, nydirty-j);
T re = cp[j2/vlen][j2%vlen], im = sp[j2/vlen][j2%vlen];
dirty.v(i,j) += tmav(ix,jx).real()*re - tmav(ix,jx).imag()*im; // lower left
}
......@@ -369,7 +364,7 @@ template<typename T> class GridderConfig
});
}
void grid2dirty_overwrite(mav<T,2> &grid, mav<T,2> &dirty) const
void grid2dirty_overwrite(mav<T,2> &grid, mav<T,2> &dirty)
{
timers.push("FFT");
checkShape(grid.shape(), {nu,nv});
......@@ -380,7 +375,7 @@ template<typename T> class GridderConfig
}
void grid2dirty_c_overwrite_wscreen_add
(mav<complex<T>,2> &grid, mav<T,2> &dirty, T w) const
(mav<complex<T>,2> &grid, mav<T,2> &dirty, T w)
{
timers.push("FFT");
checkShape(grid.shape(), {nu,nv});
......@@ -403,60 +398,87 @@ template<typename T> class GridderConfig
timers.pop();
}
void dirty2grid_pre(const mav<T,2> &dirty,
mav<T,2> &grid) const
void dirty2grid_pre(const mav<T,2> &dirty, mav<T,2> &grid)