From d2403a84a66da0d6b0c5acddbe8b0eff217c4178 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 5 Aug 2021 15:38:27 +0200 Subject: [PATCH 01/35] Data import: potentially ignore flags --- misc/ms2npz.py | 2 ++ resolve/data/ms_import.py | 24 +++++++++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index a9e6ee77..598344a4 100644 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -18,6 +18,7 @@ if __name__ == "__main__": parser.add_argument("--ch-begin", type=int) parser.add_argument("--ch-end", type=int) parser.add_argument("--ch-jump", type=int) + parser.add_argument("--ignore-flags", action="store_true") parser.add_argument("ms") parser.add_argument( "polarization_mode", @@ -35,6 +36,7 @@ if __name__ == "__main__": args.spectral_window, args.polarization_mode, slice(args.ch_begin, args.ch_end, args.ch_jump), + args.ignore_flags ) for ifield, oo in enumerate(obs): fname = join(args.output_folder, f"{name}field{ifield}.npz") diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 34105deb..bef17c8b 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -24,6 +24,7 @@ def ms2observations( spectral_window, polarizations="all", channel_slice=slice(None), + ignore_flags=False, ): """Read and convert a given measurement set into an array of :class:`Observation` @@ -50,6 +51,9 @@ def ms2observations( channel_slice : slice Slice of selected channels. Default: select all channels FIXME Select channels by indices + ignore_flags : bool + If True, the whole measurement set is imported irrespective of the + flags. Default is false. Returns ------- @@ -131,6 +135,7 @@ def ms2observations( pol_summation, with_calib_info, channel_slice, + ignore_flags, ) vis[wgt == 0] = 0.0 vis = _ms2resolve_transpose(vis) @@ -167,6 +172,7 @@ def read_ms_i( pol_summation, with_calib_info, channel_slice, + ignore_flags, ): freq = np.array(freq) my_asserteq(freq.ndim, 1) @@ -197,9 +203,7 @@ def read_ms_i( while start < nrow: print("First pass:", f"{(start/nrow*100):.1f}%", end="\r") stop = min(nrow, start + step) - tflags = t.getcol("FLAG", startrow=start, nrow=stop - start)[ - ..., pol_indices - ] + tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[ ..., pol_indices ] @@ -273,16 +277,15 @@ def read_ms_i( tvis = tvis[active_rows[start:stop]] tvis = tvis[:, active_channels] - tflags = t.getcol("FLAG", startrow=start, nrow=stop - start)[ - ..., pol_indices - ] + tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) if not allrows: tflags = tflags[active_rows[start:stop]] tflags = tflags[:, active_channels] assert twgt.ndim == tflags.ndim == 3 assert tflags.dtype == np.bool - twgt = twgt * (~tflags) + if not ignore_flags: + twgt = twgt * (~tflags) if pol_summation: assert twgt.shape[2] == 2 # Noise-weighted average @@ -322,3 +325,10 @@ def ms_n_spectral_windows(ms): with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: freq = t.getcol("CHAN_FREQ") return freq.shape[0] + + +def _conditional_flags(table, start, stop, pol_indices, ignore): + tflags = table.getcol("FLAG", startrow=start, nrow=stop - start)[..., pol_indices] + if ignore: + tflags = np.zeros_like(tflags) + return tflags -- GitLab From e6505e23fdb2439fe0d6a36507fb478961e05e85 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 6 Aug 2021 17:12:08 +0200 Subject: [PATCH 02/35] Support export of autocorrelations only --- misc/ms2npz.py | 10 +++++++++- resolve/data/observation.py | 4 ++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 598344a4..811cc287 100644 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -19,6 +19,7 @@ if __name__ == "__main__": parser.add_argument("--ch-end", type=int) parser.add_argument("--ch-jump", type=int) parser.add_argument("--ignore-flags", action="store_true") + parser.add_argument("--autocorrelations-only", action="store_true") parser.add_argument("ms") parser.add_argument( "polarization_mode", @@ -39,6 +40,13 @@ if __name__ == "__main__": args.ignore_flags ) for ifield, oo in enumerate(obs): - fname = join(args.output_folder, f"{name}field{ifield}.npz") + if args.autocorrelations_only: + oo = oo.restrict_to_autocorrelations() + auto = "autocorrelationsonly" + else: + auto = "" + fname = join(args.output_folder, f"{name}field{ifield}{args.data_column}{auto}.npz") print(f"Save {fname}") oo.save(fname, args.compress) + if oo.vis.size == 0: + print(f"WARNING: {fname} is empty") diff --git a/resolve/data/observation.py b/resolve/data/observation.py index 2dbe6c15..ab2e16fd 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -433,6 +433,10 @@ class Observation(BaseObservation): pol = self._polarization.restrict_to_stokes_i() return Observation(self._antpos, vis, wgt, pol, self._freq, self._direction) + def restrict_to_autocorrelations(self): + slc = self._antpos.ant1 == self._antpos.ant2 + return self[slc] + def move_time(self, t0): antpos = self._antpos.move_time(t0) return Observation( -- GitLab From 102bf40b1ba859afc7b8202617f054d63864bfdd Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Tue, 10 Aug 2021 12:31:12 +0200 Subject: [PATCH 03/35] Add documentation --- resolve/multi_frequency/operators.py | 33 +++++++++++++++++----------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/resolve/multi_frequency/operators.py b/resolve/multi_frequency/operators.py index 583e6707..067d00ba 100644 --- a/resolve/multi_frequency/operators.py +++ b/resolve/multi_frequency/operators.py @@ -1,29 +1,36 @@ # SPDX-License-Identifier: GPL-3.0-or-later -# Copyright(C) 2013-2020 Max-Planck-Society +# Copyright(C) 2013-2021 Max-Planck-Society # Authors: Philipp Frank, Philipp Arras, Philipp Haim import numpy as np import nifty8 as ift -from ..util import my_asserteq +from ..util import my_assert, my_assert_isinstance, my_asserteq +from .irg_space import IRGSpace class WienerIntegrations(ift.LinearOperator): - def __init__(self, freqdomain, imagedomain): - # FIXME Write interface checks - self._target = ift.makeDomain((freqdomain, imagedomain)) - dom = list(self._target) - dom = ift.UnstructuredDomain((2, freqdomain.size - 1)), imagedomain + """Operator that performs the integrations necessary for an integrated + Wiener process. + + Parameters + ---------- + time_domain : IRGSpace + Domain that contains the temporal information of the process. + + remaining_domain : DomainTuple or Domain + All integrations are handled independently for this domain. + """ + def __init__(self, time_domain, remaining_domain): + my_assert_isinstance(time_domain, IRGSpace) + self._target = ift.makeDomain((time_domain, remaining_domain)) + dom = ift.UnstructuredDomain((2, time_domain.size - 1)), remaining_domain self._domain = ift.makeDomain(dom) - self._volumes = freqdomain.dvol[:, None, None] + self._volumes = time_domain.dvol[:, None, None] self._capability = self.TIMES | self.ADJOINT_TIMES def apply(self, x, mode): - # FIXME If it turns out that this operator is a performance - # bottleneck we can try implement it in parallel in C++. But - # it may be hard to achieve good scaling because I think it - # becomes memory bound quickly. self._check_input(x, mode) first, second = (0,), (1,) from_second = (slice(1, None),) @@ -50,7 +57,7 @@ class WienerIntegrations(ift.LinearOperator): def IntWProcessInitialConditions(a0, b0, wfop): for op in [a0, b0, wfop]: - ift.is_operator(op) + my_assert(ift.is_operator(op)) my_asserteq(a0.target, b0.target, ift.makeDomain(wfop.target[1])) bc = _FancyBroadcast(wfop.target) factors = ift.full(wfop.target[0], 0) -- GitLab From 2bf1fda1b30d02b17195ee3fcc4a41c19ab831e5 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Tue, 10 Aug 2021 15:42:55 +0200 Subject: [PATCH 04/35] MfWeightingInterpolation takes Field --- resolve/multi_frequency/operators.py | 2 +- test/test_general.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/resolve/multi_frequency/operators.py b/resolve/multi_frequency/operators.py index 067d00ba..11aae3db 100644 --- a/resolve/multi_frequency/operators.py +++ b/resolve/multi_frequency/operators.py @@ -97,7 +97,7 @@ class MfWeightingInterpolation(ift.LinearOperator): # FIXME Try to unify all those operators which loop over freq dimension self._ops = [] for ii in range(nfreq): - op = ift.LinearInterpolator(domain[1], eff_uvw[:, :, ii]) + op = ift.LinearInterpolator(domain[1], eff_uvw.val[:, :, ii]) self._ops.append(op) my_asserteq(self.target.shape[0], 1) diff --git a/test/test_general.py b/test/test_general.py index ec799a03..e5e2f997 100644 --- a/test/test_general.py +++ b/test/test_general.py @@ -309,9 +309,11 @@ def test_randomonmaster(): def test_mfweighting(): nrow = 100 nchan = 4 - effuv = ift.random.current_rng().random((nrow, nchan)) - dom = ift.UnstructuredDomain(nchan), ift.RGSpace(npix, 2 * np.max(effuv) / npix) - op = rve.MfWeightingInterpolation(effuv[None], dom) + npol = 1 + effuv = ift.random.current_rng().random((npol, nrow, nchan)) + effuv = ift.makeField([ift.UnstructuredDomain(ii) for ii in effuv.shape], effuv) + dom = ift.UnstructuredDomain(nchan), ift.RGSpace(npix, 2 * np.max(effuv.val) / npix) + op = rve.MfWeightingInterpolation(effuv, dom) ift.extra.check_linear_operator(op) -- GitLab From 82503ac179bed53f094122d0ba85d6dba9cd5e8e Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 12:07:44 +0200 Subject: [PATCH 05/35] Cosmetics --- misc/ms2npz.py | 1 + resolve/data/ms_import.py | 29 +++++++---------------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 811cc287..cbe817c8 100644 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -1,3 +1,4 @@ +#!/usr/bin/python3 # SPDX-License-Identifier: GPL-3.0-or-later # Copyright(C) 2019-2020 Max-Planck-Society # Author: Philipp Arras diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index bef17c8b..3bf5beca 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -1,15 +1,16 @@ # SPDX-License-Identifier: GPL-3.0-or-later # Copyright(C) 2019-2021 Max-Planck-Society +import os from os.path import isdir, join, splitext import numpy as np +from ..util import my_assert, my_assert_isinstance, my_asserteq from .antenna_positions import AntennaPositions from .direction import Direction from .observation import Observation from .polarization import Polarization -from ..util import my_assert, my_asserteq, my_assert_isinstance def ms_table(path): @@ -17,15 +18,9 @@ def ms_table(path): return table(path, readonly=True, ack=False) -def ms2observations( - ms, - data_column, - with_calib_info, - spectral_window, - polarizations="all", - channel_slice=slice(None), - ignore_flags=False, -): +def ms2observations(ms, data_column, with_calib_info, spectral_window, + polarizations="all", channel_slice=slice(None), + ignore_flags=False): """Read and convert a given measurement set into an array of :class:`Observation` If WEIGHT_SPECTRUM is available this column is used for weighting. @@ -162,18 +157,8 @@ def _determine_weighting(t): return fullwgt, weightcol -def read_ms_i( - name, - data_column, - freq, - field, - spectral_window, - pol_indices, - pol_summation, - with_calib_info, - channel_slice, - ignore_flags, -): +def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, + pol_summation, with_calib_info, channel_slice, ignore_flags): freq = np.array(freq) my_asserteq(freq.ndim, 1) my_assert(len(freq) > 0) -- GitLab From 27dce8a0a1faa885acdbe8642cccfe970becbb86 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 12:07:55 +0200 Subject: [PATCH 06/35] Support "." as ms location --- resolve/data/ms_import.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 3bf5beca..6cb27daa 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -63,7 +63,11 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, """ if ms[-1] == "/": ms = ms[:-1] - if not isdir(ms) or splitext(ms)[1].lower() != ".ms": + if not isdir(ms): + raise RuntimeError + if ms == ".": + ms = os.getcwd() + if splitext(ms)[1].lower() != ".ms": raise RuntimeError if isinstance(polarizations, str): polarizations = [polarizations] -- GitLab From 6d789a3e346f91061a3b96625476ac189362e1fe Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 12:08:10 +0200 Subject: [PATCH 07/35] Docs --- misc/ms2npz.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index cbe817c8..97ee29b5 100644 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -20,7 +20,9 @@ if __name__ == "__main__": parser.add_argument("--ch-end", type=int) parser.add_argument("--ch-jump", type=int) parser.add_argument("--ignore-flags", action="store_true") - parser.add_argument("--autocorrelations-only", action="store_true") + parser.add_argument("--autocorrelations-only", action="store_true", + help=("If this flag is set, all autocorrelations are imported " + "irrespective of whether they are flagged or not.")) parser.add_argument("ms") parser.add_argument( "polarization_mode", -- GitLab From 831bbffde5f165c0e10a5c0bfc225a0d38c3b7c9 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 13:14:57 +0200 Subject: [PATCH 08/35] Add support for auxiliary MS tables --- misc/ms2npz.py | 0 resolve/data/auxiliary_table.py | 45 +++++++++++++++++++++++++++++++ resolve/data/ms_import.py | 11 +++++++- resolve/data/observation.py | 47 +++++++++++++++++++++++++++------ 4 files changed, 94 insertions(+), 9 deletions(-) mode change 100644 => 100755 misc/ms2npz.py create mode 100644 resolve/data/auxiliary_table.py diff --git a/misc/ms2npz.py b/misc/ms2npz.py old mode 100644 new mode 100755 diff --git a/resolve/data/auxiliary_table.py b/resolve/data/auxiliary_table.py new file mode 100644 index 00000000..eb06aea9 --- /dev/null +++ b/resolve/data/auxiliary_table.py @@ -0,0 +1,45 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# Copyright(C) 2021 Max-Planck-Society +# Author: Philipp Arras + +import numpy as np + +from ..util import compare_attributes, my_assert, my_assert_isinstance, my_asserteq + + +class AuxiliaryTable: + def __init__(self, inp_dict): + my_assert_isinstance(inp_dict, dict) + nrow = None + for kk, lst in inp_dict.items(): + my_assert_isinstance(kk, str) + my_assert_isinstance(lst, (list, np.ndarray)) + if nrow is None: + nrow = len(lst) + my_asserteq(nrow, len(lst)) + my_asserteq(type(elem) for elem in lst) + self._dct = inp_dict + self._eq_attributes = [self._dct] + + def __eq__(self, other): + if not isinstance(other, type(self)): + return False + return compare_attributes(self, other, self._eq_attributes) + + def __str__(self): + s = ["AuxiliaryTable:"] + for kk, lst in self._dct.items(): + if isinstance(lst, np.ndarray): + s.append(f" {kk:<20} {str(lst.shape):>10} {str(lst.dtype):>15}") + else: + s.append(f" {kk:<20} {len(lst):>10} {str(type(lst[0])):>15}") + return "\n".join(s) + + def to_list(self): + return [list(self._dct.keys())] + [ss for ss in self._dct.values()] + + @staticmethod + def from_list(lst): + keys = lst[0] + dct = {kk: lst[ii+1] for ii, kk in enumerate(keys)} + return AuxiliaryTable(dct) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 6cb27daa..b0a48ed7 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -8,6 +8,7 @@ import numpy as np from ..util import my_assert, my_assert_isinstance, my_asserteq from .antenna_positions import AntennaPositions +from .auxiliary_table import AuxiliaryTable from .direction import Direction from .observation import Observation from .polarization import Polarization @@ -120,6 +121,12 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, dirs.append(Direction(pc[0], equinox)) dirs = tuple(dirs) + auxtables = {} + with ms_table(join(ms, "ANTENNA")) as t: + keys = ["NAME", "STATION", "TYPE", "MOUNT", "POSITION", "OFFSET", + "DISH_DIAMETER"] + auxtables["ANTENNA"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) + # FIXME Determine which observation is calibration observation # FIXME Import name of source observations = [] @@ -140,7 +147,9 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, vis = _ms2resolve_transpose(vis) wgt = _ms2resolve_transpose(wgt) antpos = AntennaPositions(uvw, ant1, ant2, time) - observations.append(Observation(antpos, vis, wgt, polobj, freq_out, direction)) + obs = Observation(antpos, vis, wgt, polobj, freq_out, direction, + auxiliary_tables=auxtables) + observations.append(obs) return observations diff --git a/resolve/data/observation.py b/resolve/data/observation.py index ab2e16fd..e0455ae6 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -2,16 +2,17 @@ # Copyright(C) 2019-2021 Max-Planck-Society # Author: Philipp Arras -import numpy as np - import nifty8 as ift +import numpy as np -from .antenna_positions import AntennaPositions from ..constants import SPEEDOFLIGHT -from .direction import Direction, Directions from ..mpi import onlymaster +from ..util import (compare_attributes, my_assert, my_assert_isinstance, + my_asserteq) +from .antenna_positions import AntennaPositions +from .auxiliary_table import AuxiliaryTable +from .direction import Direction, Directions from .polarization import Polarization -from ..util import compare_attributes, my_assert, my_assert_isinstance, my_asserteq class BaseObservation: @@ -157,7 +158,7 @@ class BaseObservation: return NotImplementedError def __eq__(self, other): - if not isinstance(other, self.__class__): + if not isinstance(other, type(self)): return False if ( self._vis.dtype != other._vis.dtype @@ -268,13 +269,15 @@ class Observation(BaseObservation): Contains the measured frequencies. Shape (n_channels) direction : Direction Direction information of the data set. + auxiliary_tables : dict + Dictionary of auxiliary tables. Default: None. Note ---- vis and weight must have the same shape. """ - def __init__(self, antenna_positions, vis, weight, polarization, freq, direction): + def __init__(self, antenna_positions, vis, weight, polarization, freq, direction, auxiliary_tables=None): nrows = len(antenna_positions) my_assert_isinstance(direction, Direction) my_assert_isinstance(polarization, Polarization) @@ -304,8 +307,15 @@ class Observation(BaseObservation): self._freq = freq self._direction = direction - self._eq_attributes = "_direction", "_polarization", "_freq", "_antpos", "_vis", "_weight" + if auxiliary_tables is not None: + my_assert_isinstance(auxiliary_tables, dict) + for kk, vv in auxiliary_tables.items(): + my_assert_isinstance(vv, AuxiliaryTable) + my_assert_isinstance(kk, str) + self._auxiliary_tables = auxiliary_tables + self._eq_attributes = ("_direction", "_polarization", "_freq", + "_antpos", "_vis", "_weight", "_auxiliary_tables") @onlymaster def save(self, file_name, compress): @@ -320,6 +330,10 @@ class Observation(BaseObservation): if vv is None: vv = np.array([]) dct[f"antpos{ii}"] = vv + if self._auxiliary_tables is not None: + for kk, auxtable in self._auxiliary_tables.items(): + for ii, elem in enumerate(auxtable.to_list()): + dct[f"auxtable_{kk}_{ii}"] = elem f = np.savez_compressed if compress else np.savez f(file_name, **dct) @@ -332,6 +346,22 @@ class Observation(BaseObservation): if val.size == 0: val = None antpos.append(val) + + # Load auxtables + keys = set(kk.split("_")[1] for kk in dct.keys() if kk[:8] == "auxtable") + if len(keys) == 0: + auxtables = None + else: + auxtables = {} + for kk in keys: + ii = 0 + inp = [] + while f"auxtable_{kk}_{ii}" in dct.keys(): + inp.append(dct[f"auxtable_{kk}_{ii}"]) + ii += 1 + auxtables[kk] = AuxiliaryTable.from_list(inp) + # /Load auxtables + pol = Polarization.from_list(dct["polarization"]) direction = Direction.from_list(dct["direction"]) slc = slice(None) if lo_hi_index is None else slice(*lo_hi_index) @@ -343,6 +373,7 @@ class Observation(BaseObservation): pol, dct["freq"][slc], direction, + auxtables ) def flags_to_nan(self): -- GitLab From a46de75524b4bc8592ccb36cb75dc793d56f246f Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 15:30:40 +0200 Subject: [PATCH 09/35] Improve comparison --- resolve/data/auxiliary_table.py | 2 +- resolve/util.py | 54 ++++++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/resolve/data/auxiliary_table.py b/resolve/data/auxiliary_table.py index eb06aea9..58365b8c 100644 --- a/resolve/data/auxiliary_table.py +++ b/resolve/data/auxiliary_table.py @@ -19,7 +19,7 @@ class AuxiliaryTable: my_asserteq(nrow, len(lst)) my_asserteq(type(elem) for elem in lst) self._dct = inp_dict - self._eq_attributes = [self._dct] + self._eq_attributes = "_dct", def __eq__(self, other): if not isinstance(other, type(self)): diff --git a/resolve/util.py b/resolve/util.py index 4c344455..b8eda2b3 100644 --- a/resolve/util.py +++ b/resolve/util.py @@ -28,20 +28,46 @@ def my_assert_isinstance(*args): def compare_attributes(obj0, obj1, attribute_list): - for a in attribute_list: - compare = getattr(obj0, a) != getattr(obj1, a) - if type(compare) is bool: - if compare: - return False - continue - if isinstance(compare, ift.MultiField): - for vv in compare.val.values(): - if vv.any(): - return False - continue - if compare.any(): - return False - return True + return all(_fancy_equal(getattr(obj0, a), getattr(obj1, a)) + for a in attribute_list) + + +def _fancy_equal(o1, o2): + if not _types_equal(o1, o2): + return False + + # Turn MultiField into dict + if isinstance(o1, ift.MultiField): + o1, o2 = o1.val, o2.val + + # Compare dicts + if isinstance(o1, dict): + return _deep_equal(o1, o2) + + # Compare simple objects and np.ndarrays + return _compare_simple_or_array(o1, o2) + + +def _deep_equal(a, b): + if not isinstance(a, dict) or not isinstance(b, dict): + raise TypeError + + if a.keys() != b.keys(): + return False + + return all(_compare_simple_or_array(a[kk], b[kk]) for kk in a.keys()) + + +def _compare_simple_or_array(a, b): + equal = a == b + try: + return bool(equal) + except ValueError: + return equal.all() + + +def _types_equal(a, b): + return type(a) == type(b) class Reshaper(ift.LinearOperator): -- GitLab From 64dfd86c4dba93cccc420aa5e698b5301f6b1404 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 15:35:56 +0200 Subject: [PATCH 10/35] Expose auxiliary tables --- resolve/data/observation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index e0455ae6..a3a561e2 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -487,6 +487,9 @@ class Observation(BaseObservation): def antenna_positions(self): return self._antpos + def auxiliary_table(name): + return self._auxiliary_tables[name] + def effective_uvw(self): out = np.einsum("ij,k->jik", self.uvw, self._freq / SPEEDOFLIGHT) my_asserteq(out.shape, (3, self.nrow, self.nfreq)) -- GitLab From 55bc2826d8d7e33b8fa7de4849e8dc069e7c8e85 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 12 Aug 2021 18:28:36 +0200 Subject: [PATCH 11/35] Add more aux tables --- resolve/data/ms_import.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index b0a48ed7..292f7b69 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -123,9 +123,17 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, auxtables = {} with ms_table(join(ms, "ANTENNA")) as t: - keys = ["NAME", "STATION", "TYPE", "MOUNT", "POSITION", "OFFSET", - "DISH_DIAMETER"] + keys = ["NAME", "STATION", "TYPE", "MOUNT", "POSITION", "OFFSET", "DISH_DIAMETER"] auxtables["ANTENNA"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) + with ms_table(join(ms, "FIELD")) as t: + keys = ["NAME", "CODE", "TIME", "NUM_POLY", "DELAY_DIR", "PHASE_DIR", "REFERENCE_DIR", + "SOURCE_ID"] + auxtables["FIELD"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) + with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: + keys = ["NAME", "REF_FREQUENCY", "CHAN_FREQ", "CHAN_WIDTH", "MEAS_FREQ_REF", "EFFECTIVE_BW", + "RESOLUTION", "TOTAL_BANDWIDTH", "NET_SIDEBAND", "IF_CONV_CHAIN", "FREQ_GROUP", + "FREQ_GROUP_NAME"] + auxtables["SPECTRAL_WINDOW"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) # FIXME Determine which observation is calibration observation # FIXME Import name of source -- GitLab From 21fcc98bfebdd2c8e121cb75014e8a85927aa84d Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 09:09:18 +0200 Subject: [PATCH 12/35] fixup! Add more aux tables --- resolve/data/observation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index a3a561e2..fa602c42 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -333,7 +333,7 @@ class Observation(BaseObservation): if self._auxiliary_tables is not None: for kk, auxtable in self._auxiliary_tables.items(): for ii, elem in enumerate(auxtable.to_list()): - dct[f"auxtable_{kk}_{ii}"] = elem + dct[f"auxtable_{kk}_{ii:>04}"] = elem f = np.savez_compressed if compress else np.savez f(file_name, **dct) @@ -348,7 +348,7 @@ class Observation(BaseObservation): antpos.append(val) # Load auxtables - keys = set(kk.split("_")[1] for kk in dct.keys() if kk[:8] == "auxtable") + keys = set(kk[9:-5] for kk in dct.keys() if kk[:9] == "auxtable_") if len(keys) == 0: auxtables = None else: @@ -356,8 +356,8 @@ class Observation(BaseObservation): for kk in keys: ii = 0 inp = [] - while f"auxtable_{kk}_{ii}" in dct.keys(): - inp.append(dct[f"auxtable_{kk}_{ii}"]) + while f"auxtable_{kk}_{ii:>04}" in dct.keys(): + inp.append(dct[f"auxtable_{kk}_{ii:>04}"]) ii += 1 auxtables[kk] = AuxiliaryTable.from_list(inp) # /Load auxtables -- GitLab From 51e6fbb5f8ef223877c359a9208f8d4dad846114 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 09:12:29 +0200 Subject: [PATCH 13/35] Fixup --- resolve/data/observation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index fa602c42..8e035487 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -487,7 +487,7 @@ class Observation(BaseObservation): def antenna_positions(self): return self._antpos - def auxiliary_table(name): + def auxiliary_table(self, name): return self._auxiliary_tables[name] def effective_uvw(self): -- GitLab From cd2bf383614517fcd39afb33c69b86b0d3378077 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 10:32:05 +0200 Subject: [PATCH 14/35] More general ms_analysis --- misc/ms_summary.py | 12 ++++++++++++ resolve/data/ms_import.py | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) mode change 100644 => 100755 misc/ms_summary.py diff --git a/misc/ms_summary.py b/misc/ms_summary.py old mode 100644 new mode 100755 index c36bed83..20f781c0 --- a/misc/ms_summary.py +++ b/misc/ms_summary.py @@ -1,8 +1,10 @@ +#!/usr/bin/python3 # SPDX-License-Identifier: GPL-3.0-or-later # Copyright(C) 2021 Max-Planck-Society # Author: Philipp Arras import argparse +from os.path import join import resolve as rve @@ -16,3 +18,13 @@ if __name__ == "__main__": print() print(f"The data set has {nspec} spectral windows.") print() + + with rve.ms_table(join(args.ms, "FIELD")) as t: + name = t.getcol("NAME") + refdir = t.getcol("REFERENCE_DIR") + deldir = t.getcol("DELAY_DIR") + phdir = t.getcol("PHASE_DIR") + + print("NAME REFERNCE_DIR DELAY_DIR PHASE_DIR") + for nn, rd, dd, pd in zip(name, refdir, deldir, phdir): + print(nn, rd, dd, pd) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 292f7b69..aa7fe428 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -329,8 +329,8 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, def ms_n_spectral_windows(ms): with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: - freq = t.getcol("CHAN_FREQ") - return freq.shape[0] + n_spectral_windows = t.nrows() + return n_spectral_windows def _conditional_flags(table, start, stop, pol_indices, ignore): -- GitLab From 341f972760a55907d343a6980fc6cafb462e897d Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 11:32:19 +0200 Subject: [PATCH 15/35] Cosmetics --- misc/ms2npz.py | 5 +++-- resolve/data/ms_import.py | 17 +++++------------ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 97ee29b5..7c4fa716 100755 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -21,8 +21,9 @@ if __name__ == "__main__": parser.add_argument("--ch-jump", type=int) parser.add_argument("--ignore-flags", action="store_true") parser.add_argument("--autocorrelations-only", action="store_true", - help=("If this flag is set, all autocorrelations are imported " - "irrespective of whether they are flagged or not.")) + help=("If this flag is set, all autocorrelations are " + "imported irrespective of whether they are " + "flagged or not.")) parser.add_argument("ms") parser.add_argument( "polarization_mode", diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index aa7fe428..d089f299 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -275,19 +275,20 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, twgt = twgt[active_rows[start:stop]] twgt = twgt[:, active_channels] - tvis = t.getcol(data_column, startrow=start, nrow=stop - start)[ - ..., pol_indices - ] + # Vis + tvis = t.getcol(data_column, startrow=start, nrow=stop - start)[..., pol_indices] assert tvis.dtype == np.complex64 if not allrows: tvis = tvis[active_rows[start:stop]] tvis = tvis[:, active_channels] + # Flags tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) if not allrows: tflags = tflags[active_rows[start:stop]] tflags = tflags[:, active_channels] + # Polarization summation assert twgt.ndim == tflags.ndim == 3 assert tflags.dtype == np.bool if not ignore_flags: @@ -316,15 +317,7 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, freq = freq[active_channels] my_asserteq(wgt.shape, vis.shape) - return ( - np.ascontiguousarray(uvw), - ant1, - ant2, - time, - np.ascontiguousarray(freq), - np.ascontiguousarray(vis), - np.ascontiguousarray(wgt), - ) + return {"uvw": uvw, "ant1": ant1, "ant2": ant2, "time": time, "freq": freq, "vis": vis, "wgt": wgt, "ptg": ptg} def ms_n_spectral_windows(ms): -- GitLab From 6008e8da4f9d0a2b300913ded29c86e5b12ff529 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 11:32:47 +0200 Subject: [PATCH 16/35] How to deal with empty ms --- misc/ms2npz.py | 2 ++ resolve/data/ms_import.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 7c4fa716..0735f7bb 100755 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -44,6 +44,8 @@ if __name__ == "__main__": args.ignore_flags ) for ifield, oo in enumerate(obs): + if oo is None: + continue if args.autocorrelations_only: oo = oo.restrict_to_autocorrelations() auto = "autocorrelationsonly" diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index d089f299..6ca2f20a 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -242,7 +242,7 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, start = stop nrealrows, nrealchan = np.sum(active_rows), np.sum(active_channels) if nrealrows == 0 or nrealchan == 0: - raise RuntimeError("Empty data set") + return None # Create output arrays if pol_summation: -- GitLab From 4f07aca134223e7afa621877251e8be57ad7197f Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 11:33:05 +0200 Subject: [PATCH 17/35] Cosmetics --- resolve/data/ms_import.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 6ca2f20a..ec454729 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -210,9 +210,7 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, print("First pass:", f"{(start/nrow*100):.1f}%", end="\r") stop = min(nrow, start + step) tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) - twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[ - ..., pol_indices - ] + twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[..., pol_indices] if channel_slice != slice(None): tchslcflags = np.ones_like(tflags) tchslcflags[:, channel_slice] = False -- GitLab From eb0d774055f9c3e7a5588b85b1cdf4ba5c6ff3d5 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 11:33:28 +0200 Subject: [PATCH 18/35] Restructure and add pointing support --- misc/ms_summary.py | 5 ++ resolve/data/ms_import.py | 120 ++++++++++++++++++++++++-------------- 2 files changed, 81 insertions(+), 44 deletions(-) diff --git a/misc/ms_summary.py b/misc/ms_summary.py index 20f781c0..22550dd9 100755 --- a/misc/ms_summary.py +++ b/misc/ms_summary.py @@ -28,3 +28,8 @@ if __name__ == "__main__": print("NAME REFERNCE_DIR DELAY_DIR PHASE_DIR") for nn, rd, dd, pd in zip(name, refdir, deldir, phdir): print(nn, rd, dd, pd) + + with rve.ms_table(join(args.ms, "POINTING")) as t: + print(t.getcol("ANTENNA_ID")) + print(t.getcol("TIME")) + print(t.getcol("DIRECTION").shape) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index ec454729..3ca3187d 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -84,8 +84,6 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, my_assert_isinstance(spectral_window, int) my_assert(spectral_window >= 0) my_assert(spectral_window < ms_n_spectral_windows(ms)) - with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: - freq = t.getcol("CHAN_FREQ")[spectral_window] # Polarization with ms_table(join(ms, "POLARIZATION")) as t: @@ -110,11 +108,15 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, # Field with ms_table(join(ms, "FIELD")) as t: - equinox = t.coldesc("REFERENCE_DIR")["desc"]["keywords"]["MEASINFO"]["Ref"] - equinox = str(equinox)[1:] # FIXME Put proper support for equinox here - if equinox == "1950_VLA": - equinox = 1950 + # FIXME Eventually get rid of this and use auxiliary table + try: + equinox = t.coldesc("REFERENCE_DIR")["desc"]["keywords"]["MEASINFO"]["Ref"] + equinox = str(equinox)[1:] + if equinox == "1950_VLA": + equinox = 1950 + except KeyError: + equinox = 2000 dirs = [] for pc in t.getcol("REFERENCE_DIR"): my_asserteq(pc.shape, (1, 2)) @@ -125,37 +127,32 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, with ms_table(join(ms, "ANTENNA")) as t: keys = ["NAME", "STATION", "TYPE", "MOUNT", "POSITION", "OFFSET", "DISH_DIAMETER"] auxtables["ANTENNA"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) - with ms_table(join(ms, "FIELD")) as t: - keys = ["NAME", "CODE", "TIME", "NUM_POLY", "DELAY_DIR", "PHASE_DIR", "REFERENCE_DIR", - "SOURCE_ID"] - auxtables["FIELD"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: keys = ["NAME", "REF_FREQUENCY", "CHAN_FREQ", "CHAN_WIDTH", "MEAS_FREQ_REF", "EFFECTIVE_BW", "RESOLUTION", "TOTAL_BANDWIDTH", "NET_SIDEBAND", "IF_CONV_CHAIN", "FREQ_GROUP", "FREQ_GROUP_NAME"] - auxtables["SPECTRAL_WINDOW"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) + dct = {kk: t.getcol(kk, startrow=spectral_window, nrow=1) for kk in keys} + auxtables["SPECTRAL_WINDOW"] = AuxiliaryTable(dct) # FIXME Determine which observation is calibration observation - # FIXME Import name of source observations = [] for ifield, direction in enumerate(dirs): - uvw, ant1, ant2, time, freq_out, vis, wgt = read_ms_i( - ms, - data_column, - freq, - ifield, - spectral_window, - pol_ind, - pol_summation, - with_calib_info, - channel_slice, - ignore_flags, - ) - vis[wgt == 0] = 0.0 - vis = _ms2resolve_transpose(vis) - wgt = _ms2resolve_transpose(wgt) - antpos = AntennaPositions(uvw, ant1, ant2, time) - obs = Observation(antpos, vis, wgt, polobj, freq_out, direction, + with ms_table(join(ms, "FIELD")) as t: + keys = ["NAME", "CODE", "TIME", "NUM_POLY", "DELAY_DIR", "PHASE_DIR", "REFERENCE_DIR", + "SOURCE_ID"] + dct = {kk: t.getcol(kk, startrow=ifield, nrow=1) for kk in keys} + auxtables["FIELD"] = AuxiliaryTable(dct) + + mm = read_ms_i(ms, data_column, ifield, spectral_window, pol_ind, pol_summation, + with_calib_info, channel_slice, ignore_flags,) + if mm is None: + print(f"{ms}, field #{ifield} is empty or fully flagged") + observations.append(None) + continue + if mm["ptg"] is not None: + raise NotImplementedError + antpos = AntennaPositions(mm["uvw"], mm["ant1"], mm["ant2"], mm["time"]) + obs = Observation(antpos, mm["vis"], mm["wgt"], polobj, mm["freq"], direction, auxiliary_tables=auxtables) observations.append(obs) return observations @@ -178,12 +175,18 @@ def _determine_weighting(t): return fullwgt, weightcol -def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, - pol_summation, with_calib_info, channel_slice, ignore_flags): - freq = np.array(freq) - my_asserteq(freq.ndim, 1) +def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summation, + with_calib_info, channel_slice, ignore_flags): + + # Freq + with ms_table(join(name, "SPECTRAL_WINDOW")) as t: + freq = t.getcol("CHAN_FREQ", startrow=spectral_window, nrow=1) + my_asserteq(freq.ndim, 2) + my_asserteq(freq.shape[0], 1) + freq = freq[0] my_assert(len(freq) > 0) nchan = len(freq) + assert pol_indices is None or isinstance(pol_indices, list) if pol_indices is None: pol_indices = slice(None) @@ -242,7 +245,11 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, if nrealrows == 0 or nrealchan == 0: return None - # Create output arrays + # Freq + freq = freq[active_channels] + + # Vis, wgt, (flags) + with ms_table(name) as t: if pol_summation: npol = 1 else: @@ -263,9 +270,8 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, if realstop > realstart: allrows = stop - start == realstop - realstart - twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[ - ..., pol_indices - ] + # Weights + twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[..., pol_indices] assert twgt.dtype == np.float32 if not fullwgt: twgt = np.repeat(twgt[:, None], nchan, axis=1) @@ -302,19 +308,45 @@ def read_ms_i(name, data_column, freq, field, spectral_window, pol_indices, wgt[realstart:realstop] = twgt start, realstart = stop, realstop - uvw = t.getcol("UVW")[active_rows] - if with_calib_info: + print("Selected:", 10 * " ") + print(f" shape: {vis.shape}") + print(f" flagged: {(1.0-np.sum(wgt!=0)/wgt.size)*100:.1f} %") + + # UVW + with ms_table(name) as t: + uvw = np.ascontiguousarray(t.getcol("UVW")[active_rows]) + + # Calibration info + if with_calib_info: + with ms_table(name) as t: ant1 = np.ascontiguousarray(t.getcol("ANTENNA1")[active_rows]) ant2 = np.ascontiguousarray(t.getcol("ANTENNA2")[active_rows]) time = np.ascontiguousarray(t.getcol("TIME")[active_rows]) + else: + ant1 = ant2 = time = None + + # Pointing + with ms_table(join(name, "POINTING")) as t: + if t.nrows() == 0: + ptg = None else: - ant1 = ant2 = time = None - print("Selected:", 10 * " ") - print(f" shape: {vis.shape}") - print(f" flagged: {(1.0-np.sum(wgt!=0)/wgt.size)*100:.1f} %") - freq = freq[active_channels] + ptg = np.empty((nrealrows, 1, 2), dtype=np.float64) + start, realstart = 0, 0 + while start < nrow: + print("Second pass:", f"{(start/nrow*100):.1f}%", end="\r") + stop = min(nrow, start + step) + realstop = realstart + np.sum(active_rows[start:stop]) + if realstop > realstart: + allrows = stop - start == realstop - realstart + tptg = t.getcol("DIRECTION", startrow=start, nrow=stop - start) + tptg = tptg[active_rows[start:stop]] + ptg[realstart:realstop] = tptg + start, realstart = stop, realstop my_asserteq(wgt.shape, vis.shape) + vis = np.ascontiguousarray(_ms2resolve_transpose(vis)) + wgt = np.ascontiguousarray(_ms2resolve_transpose(wgt)) + vis[wgt == 0] = 0.0 return {"uvw": uvw, "ant1": ant1, "ant2": ant2, "time": time, "freq": freq, "vis": vis, "wgt": wgt, "ptg": ptg} -- GitLab From 5819d35eb47206a27d80da6c37a8b909d6d5cef8 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 13 Aug 2021 11:57:20 +0200 Subject: [PATCH 19/35] NIFTy_7 -> NIFTy_8 --- misc/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/Dockerfile b/misc/Dockerfile index d7253334..82e729f4 100644 --- a/misc/Dockerfile +++ b/misc/Dockerfile @@ -2,8 +2,8 @@ FROM kernsuite/base:latest ENV DEBIAN_FRONTEND noninteractive RUN docker-apt-install dysco python3-casacore RUN apt-get update -qq && apt-get install -qq python3-pip python3-matplotlib git -RUN pip3 install scipy pybind11 git+https://gitlab.mpcdf.mpg.de/ift/nifty.git@NIFTy_7 -RUN pip3 install git+https://gitlab.mpcdf.mpg.de/mtr/ducc.git@ducc0 +RUN pip3 install scipy pybind11 git+https://gitlab.mpcdf.mpg.de/ift/nifty.git@NIFTy_8 +RUN pip3 install ducc0 COPY . /resolve RUN cd resolve && python3 setup.py install ENTRYPOINT ["python3", "resolve/misc/ms2npz.py"] -- GitLab From 5a541c08bfeeede8b6d8b75ab361277980b78bf1 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 26 Aug 2021 14:40:56 +0200 Subject: [PATCH 20/35] Data can be selected by frequency --- misc/ms2npz.py | 38 ++++++++++++++++++++++++++++++++++++-- misc/ms_summary.py | 14 +++++++++++--- resolve/constants.py | 27 +++++++++++++++++++++++++++ resolve/data/ms_import.py | 2 -- 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 0735f7bb..73505c28 100755 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -4,9 +4,10 @@ # Author: Philipp Arras import argparse -from os.path import join, split, splitext from os import makedirs +from os.path import join, split, splitext +import numpy as np import resolve as rve if __name__ == "__main__": @@ -19,6 +20,8 @@ if __name__ == "__main__": parser.add_argument("--ch-begin", type=int) parser.add_argument("--ch-end", type=int) parser.add_argument("--ch-jump", type=int) + parser.add_argument("--freq-begin") + parser.add_argument("--freq-end") parser.add_argument("--ignore-flags", action="store_true") parser.add_argument("--autocorrelations-only", action="store_true", help=("If this flag is set, all autocorrelations are " @@ -30,6 +33,37 @@ if __name__ == "__main__": help="Can be 'stokesiavg', 'stokesi', 'all', or something like 'LL' or 'XY'.", ) args = parser.parse_args() + if args.freq_begin is not None and args.freq_end is not None: + assert args.ch_begin is None + assert args.ch_jump is None + assert args.ch_end is None + assert args.spectral_window == 0 + + f0 = rve.str2val(args.freq_begin) + f1 = rve.str2val(args.freq_end) + + # Determine spectral window + with rve.ms_table(join(args.ms, "SPECTRAL_WINDOW")) as t: + for ii in range(t.nrows()): + c = t.getcol("CHAN_FREQ", startrow=ii, nrow=1)[0] + fmin, fmax = min(c), max(c) + if fmin <= f0 <= fmax and fmin <= f1 <= fmax: + break + print(f"Load spectral window {ii}") + + # Determine channels + if np.all(np.diff(c) > 0): + begin = np.searchsorted(c, f0) + end = np.searchsorted(c, f1) + elif np.all(np.diff(c) < 0): + begin = c.size - np.searchsorted(c[::-1], f0) + end = c.size - np.searchsorted(c[::-1], f1) + else: + raise RuntimeError("Channels are not sorted") + channels = slice(begin, end) + else: + channels = slice(args.ch_begin, args.ch_end, args.ch_jump), + makedirs(args.output_folder, exist_ok=True) name = splitext(split(args.ms)[1])[0] nspec = rve.ms_n_spectral_windows(args.ms) @@ -40,7 +74,7 @@ if __name__ == "__main__": args.include_calibration_info, args.spectral_window, args.polarization_mode, - slice(args.ch_begin, args.ch_end, args.ch_jump), + channels, args.ignore_flags ) for ifield, oo in enumerate(obs): diff --git a/misc/ms_summary.py b/misc/ms_summary.py index 22550dd9..7a1734dd 100755 --- a/misc/ms_summary.py +++ b/misc/ms_summary.py @@ -15,9 +15,17 @@ if __name__ == "__main__": nspec = rve.ms_n_spectral_windows(args.ms) from casacore.tables import tablesummary tablesummary(args.ms) - print() - print(f"The data set has {nspec} spectral windows.") - print() + + with rve.ms_table(join(args.ms, "SPECTRAL_WINDOW")) as t: + for ii in range(nspec): + print(f"Spectral window #{ii}") + chans = t.getcol("CHAN_FREQ", startrow=ii, nrow=1) + + print(f"Shape: {chans.shape}") + print(f"f1-f0: {(chans[0][1]-chans[0][0])/1e6} MHz") + print("Frequencies (GHz)") + print(chans/1e9) + print() with rve.ms_table(join(args.ms, "FIELD")) as t: name = t.getcol("NAME") diff --git a/resolve/constants.py b/resolve/constants.py index ffd0999e..0a537533 100644 --- a/resolve/constants.py +++ b/resolve/constants.py @@ -38,3 +38,30 @@ def str2rad(s): if unit == kk: return float(s[:nn]) * c[kk] raise RuntimeError("Unit not understood") + + +def str2val(s): + """Convert string of number and unit to value. + + Support the following keys: p n mu m (nothing) k M G T + + Parameters + ---------- + s : str + TODO + + """ + c = { + "p": 1e-12, + "n": 1e-9, + "mu": 1e-6, + "m": 1e-3, + "k": 1e3, + "M": 1e6, + "G": 1e9, + "T": 1e12 + } + keys = set(c.keys()) + if s[-1] in keys: + return float(s[:-1]) * c[s[-1]] + return float(s) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 3ca3187d..1dd4310b 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -68,8 +68,6 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, raise RuntimeError if ms == ".": ms = os.getcwd() - if splitext(ms)[1].lower() != ".ms": - raise RuntimeError if isinstance(polarizations, str): polarizations = [polarizations] if ( -- GitLab From 57dae67b922a9c52968ea6157789db8c4bf30656 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 26 Aug 2021 18:00:40 +0200 Subject: [PATCH 21/35] FIXME --- resolve/data/ms_import.py | 1 + 1 file changed, 1 insertion(+) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 1dd4310b..073f76a2 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -139,6 +139,7 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, keys = ["NAME", "CODE", "TIME", "NUM_POLY", "DELAY_DIR", "PHASE_DIR", "REFERENCE_DIR", "SOURCE_ID"] dct = {kk: t.getcol(kk, startrow=ifield, nrow=1) for kk in keys} + # FIXME Somehow not the correct field is loaded here auxtables["FIELD"] = AuxiliaryTable(dct) mm = read_ms_i(ms, data_column, ifield, spectral_window, pol_ind, pol_summation, -- GitLab From 6ad51f87233d2f8f6b6b5041849cf1c3150f5a1b Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 27 Aug 2021 11:32:50 +0200 Subject: [PATCH 22/35] Correctly read in polarization index --- resolve/data/ms_import.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 073f76a2..67e502cb 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -83,12 +83,15 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, my_assert(spectral_window >= 0) my_assert(spectral_window < ms_n_spectral_windows(ms)) + with ms_table(join(ms, "DATA_DESCRIPTION")) as t: + polid = t.getcol("POLARIZATION_ID")[spectral_window] + # Polarization with ms_table(join(ms, "POLARIZATION")) as t: - pol = t.getcol("CORR_TYPE") + pol = t.getcol("CORR_TYPE", startrow=polid, nrow=1) my_asserteq(pol.ndim, 2) my_asserteq(pol.shape[0], 1) - pol = list(pol[0]) # Not clear what the first dimension is used for + pol = list(pol[0]) polobj = Polarization(pol) if polarizations[0] == "stokesi": polarizations = ["LL", "RR"] if polobj.circular() else ["XX", "YY"] -- GitLab From 443373372c770bd4cfc628a68785d1e5a7bd1cd6 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 27 Aug 2021 11:41:44 +0200 Subject: [PATCH 23/35] Fixup --- resolve/plotter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resolve/plotter.py b/resolve/plotter.py index 598a7081..9d4de344 100644 --- a/resolve/plotter.py +++ b/resolve/plotter.py @@ -203,7 +203,8 @@ def _plot_nifty(state, op, kwargs, fname): sc.add(op.force(ss)) p = ift.Plot() p.add(sc.mean, **kwargs) - p.add(sc.var.sqrt() / sc.mean) + if len(state) > 1: + p.add(sc.var.sqrt() / sc.mean) p.output(nx=2, ny=1, xsize=2 * UNIT, ysize=UNIT, name=fname) else: pos = state if isinstance(state, ift.MultiField) else state.mean -- GitLab From f7c0476171116d2c0b05bb1d141536741bbd885d Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Mon, 30 Aug 2021 17:12:25 +0200 Subject: [PATCH 24/35] Cleanup ms import and use auxiliary tables more --- misc/ms2npz.py | 2 +- resolve/data/auxiliary_table.py | 12 ++- resolve/data/ms_import.py | 161 +++++++++++++++++--------------- resolve/data/observation.py | 39 ++++---- resolve/util.py | 8 +- test/test_general.py | 9 +- 6 files changed, 127 insertions(+), 104 deletions(-) diff --git a/misc/ms2npz.py b/misc/ms2npz.py index 73505c28..7d8f427c 100755 --- a/misc/ms2npz.py +++ b/misc/ms2npz.py @@ -62,7 +62,7 @@ if __name__ == "__main__": raise RuntimeError("Channels are not sorted") channels = slice(begin, end) else: - channels = slice(args.ch_begin, args.ch_end, args.ch_jump), + channels = slice(args.ch_begin, args.ch_end, args.ch_jump) makedirs(args.output_folder, exist_ok=True) name = splitext(split(args.ms)[1])[0] diff --git a/resolve/data/auxiliary_table.py b/resolve/data/auxiliary_table.py index 58365b8c..a2afcb71 100644 --- a/resolve/data/auxiliary_table.py +++ b/resolve/data/auxiliary_table.py @@ -13,7 +13,8 @@ class AuxiliaryTable: nrow = None for kk, lst in inp_dict.items(): my_assert_isinstance(kk, str) - my_assert_isinstance(lst, (list, np.ndarray)) + if not isinstance(lst, (list, np.ndarray)): + raise RuntimeError(f"{kk} neither list nor np.ndarray") if nrow is None: nrow = len(lst) my_asserteq(nrow, len(lst)) @@ -35,6 +36,15 @@ class AuxiliaryTable: s.append(f" {kk:<20} {len(lst):>10} {str(type(lst[0])):>15}") return "\n".join(s) + def __getitem__(self, key): + return self._dct[key] + + def __contains__(self, key): + return key in self._dct + + def row(self, i): + return AuxiliaryTable({kk: vv[i:i+1] for kk, vv in self._dct.items()}) + def to_list(self): return [list(self._dct.keys())] + [ss for ss in self._dct.values()] diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 67e502cb..e3214afd 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -19,9 +19,16 @@ def ms_table(path): return table(path, readonly=True, ack=False) +def _pol_id(ms_path, spectral_window): + """Return id for indexing polarization table for a given spectral window.""" + with ms_table(join(ms_path, "DATA_DESCRIPTION")) as t: + polid = t.getcol("POLARIZATION_ID")[spectral_window] + return polid + + + def ms2observations(ms, data_column, with_calib_info, spectral_window, - polarizations="all", channel_slice=slice(None), - ignore_flags=False): + polarizations="all", channels=slice(None), ignore_flags=False): """Read and convert a given measurement set into an array of :class:`Observation` If WEIGHT_SPECTRUM is available this column is used for weighting. @@ -38,15 +45,15 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, Reads in all information necessary for calibration, if True. If only imaging shall be performed, this can be set to False. spectral_window : int - Index of spectral window which shall be imported. + Index of spectral window that shall be imported. polarizations "all": All polarizations are imported. "stokesi": Only LL/RR or XX/YY polarizations are imported. "stokesiavg": Only LL/RR or XX/YY polarizations are imported and averaged on the fly. List of strings: Strings can be "XX", "YY", "XY", "YX", "LL", "LR", "RL", "RR". The respective polarizations are loaded. - channel_slice : slice - Slice of selected channels. Default: select all channels - FIXME Select channels by indices + channels : slice or list + Select channels. Can be either a slice object or an index list. Default: + select all channels ignore_flags : bool If True, the whole measurement set is imported irrespective of the flags. Default is false. @@ -58,95 +65,63 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, Note ---- - We cannot import multiple spectral windows into one Observation instance + Multiple spectral windows are not imported into one Observation instance because it is not guaranteed by the measurement set data structure that all baselines are present in all spectral windows. """ + # Input checks if ms[-1] == "/": ms = ms[:-1] if not isdir(ms): raise RuntimeError if ms == ".": ms = os.getcwd() - if isinstance(polarizations, str): - polarizations = [polarizations] - if ( - "stokesiavg" in polarizations - or "stokesi" in polarizations - or "all" in polarizations - ) and len(polarizations) > 1: - raise ValueError - - # Spectral windows - my_assert_isinstance(channel_slice, slice) + if isinstance(polarizations, list): + for ll in polarizations: + my_assert(ll in ["XX", "YY", "XY", "YX", "LL", "LR", "RL", "RR"]) + my_asserteq(len(set(polarizations)), len(polarizations)) + else: + my_assert(polarizations in ["stokesi", "stokesiavg", "all"]) + my_assert_isinstance(channels, (slice, list)) my_assert_isinstance(spectral_window, int) my_assert(spectral_window >= 0) my_assert(spectral_window < ms_n_spectral_windows(ms)) - - with ms_table(join(ms, "DATA_DESCRIPTION")) as t: - polid = t.getcol("POLARIZATION_ID")[spectral_window] + # /Input checks # Polarization with ms_table(join(ms, "POLARIZATION")) as t: - pol = t.getcol("CORR_TYPE", startrow=polid, nrow=1) - my_asserteq(pol.ndim, 2) - my_asserteq(pol.shape[0], 1) - pol = list(pol[0]) + pol = t.getcol("CORR_TYPE", startrow=_pol_id(ms, spectral_window), nrow=1)[0] polobj = Polarization(pol) - if polarizations[0] == "stokesi": - polarizations = ["LL", "RR"] if polobj.circular() else ["XX", "YY"] - if polarizations[0] == "stokesiavg": + if polarizations == "stokesiavg": pol_ind = polobj.stokes_i_indices() polobj = Polarization.trivial() pol_summation = True - elif polarizations[0] == "all": + elif polarizations == "all": pol_ind = None pol_summation = False else: + if polarizations == "stokesi": + polarizations = ["LL", "RR"] if polobj.circular() else ["XX", "YY"] polobj = polobj.restrict_by_name(polarizations) pol_ind = [polobj.to_str_list().index(ii) for ii in polarizations] pol_summation = False - # Field - with ms_table(join(ms, "FIELD")) as t: - # FIXME Put proper support for equinox here - # FIXME Eventually get rid of this and use auxiliary table - try: - equinox = t.coldesc("REFERENCE_DIR")["desc"]["keywords"]["MEASINFO"]["Ref"] - equinox = str(equinox)[1:] - if equinox == "1950_VLA": - equinox = 1950 - except KeyError: - equinox = 2000 - dirs = [] - for pc in t.getcol("REFERENCE_DIR"): - my_asserteq(pc.shape, (1, 2)) - dirs.append(Direction(pc[0], equinox)) - dirs = tuple(dirs) - - auxtables = {} - with ms_table(join(ms, "ANTENNA")) as t: - keys = ["NAME", "STATION", "TYPE", "MOUNT", "POSITION", "OFFSET", "DISH_DIAMETER"] - auxtables["ANTENNA"] = AuxiliaryTable({kk: t.getcol(kk) for kk in keys}) - with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: - keys = ["NAME", "REF_FREQUENCY", "CHAN_FREQ", "CHAN_WIDTH", "MEAS_FREQ_REF", "EFFECTIVE_BW", - "RESOLUTION", "TOTAL_BANDWIDTH", "NET_SIDEBAND", "IF_CONV_CHAIN", "FREQ_GROUP", - "FREQ_GROUP_NAME"] - dct = {kk: t.getcol(kk, startrow=spectral_window, nrow=1) for kk in keys} - auxtables["SPECTRAL_WINDOW"] = AuxiliaryTable(dct) - - # FIXME Determine which observation is calibration observation observations = [] - for ifield, direction in enumerate(dirs): - with ms_table(join(ms, "FIELD")) as t: - keys = ["NAME", "CODE", "TIME", "NUM_POLY", "DELAY_DIR", "PHASE_DIR", "REFERENCE_DIR", - "SOURCE_ID"] - dct = {kk: t.getcol(kk, startrow=ifield, nrow=1) for kk in keys} - # FIXME Somehow not the correct field is loaded here - auxtables["FIELD"] = AuxiliaryTable(dct) + for ifield in range(ms_n_fields(ms)): + auxtables = {} + auxtables["ANTENNA"] = _import_aux_table(ms, "ANTENNA") + auxtables["STATE"] = _import_aux_table(ms, "STATE") + auxtables["SPECTRAL_WINDOW"] = _import_aux_table(ms, "SPECTRAL_WINDOW", row=spectral_window, skip=["ASSOC_NATURE"]) + sf = _source_and_field_table(ms, spectral_window, ifield) + if sf is None: + print(f"Field {ifield} cannot be found in SOURCE table") + observations.append(None) + continue + auxtables = {**auxtables, **sf} + print(f"Work on Field {ifield}: {auxtables['SOURCE']['NAME'][0]}") mm = read_ms_i(ms, data_column, ifield, spectral_window, pol_ind, pol_summation, - with_calib_info, channel_slice, ignore_flags,) + with_calib_info, channels, ignore_flags) if mm is None: print(f"{ms}, field #{ifield} is empty or fully flagged") observations.append(None) @@ -154,7 +129,7 @@ def ms2observations(ms, data_column, with_calib_info, spectral_window, if mm["ptg"] is not None: raise NotImplementedError antpos = AntennaPositions(mm["uvw"], mm["ant1"], mm["ant2"], mm["time"]) - obs = Observation(antpos, mm["vis"], mm["wgt"], polobj, mm["freq"], direction, + obs = Observation(antpos, mm["vis"], mm["wgt"], polobj, mm["freq"], auxiliary_tables=auxtables) observations.append(obs) return observations @@ -178,14 +153,12 @@ def _determine_weighting(t): def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summation, - with_calib_info, channel_slice, ignore_flags): + with_calib_info, channels, ignore_flags): # Freq with ms_table(join(name, "SPECTRAL_WINDOW")) as t: - freq = t.getcol("CHAN_FREQ", startrow=spectral_window, nrow=1) - my_asserteq(freq.ndim, 2) - my_asserteq(freq.shape[0], 1) - freq = freq[0] + freq = t.getcol("CHAN_FREQ", startrow=spectral_window, nrow=1)[0] + my_asserteq(freq.ndim, 1) my_assert(len(freq) > 0) nchan = len(freq) @@ -216,10 +189,11 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat stop = min(nrow, start + step) tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[..., pol_indices] - if channel_slice != slice(None): - tchslcflags = np.ones_like(tflags) - tchslcflags[:, channel_slice] = False - tflags = np.logical_or(tflags, tchslcflags) + + tchslcflags = np.ones_like(tflags) + tchslcflags[:, channels] = False + tflags = np.logical_or(tflags, tchslcflags) + if not fullwgt: twgt = np.repeat(twgt[:, None], nchan, axis=1) my_asserteq(twgt.ndim, 3) @@ -358,8 +332,43 @@ def ms_n_spectral_windows(ms): return n_spectral_windows +def ms_n_fields(ms): + with ms_table(join(ms, "FIELD")) as t: + n = t.nrows() + return n + + def _conditional_flags(table, start, stop, pol_indices, ignore): tflags = table.getcol("FLAG", startrow=start, nrow=stop - start)[..., pol_indices] if ignore: tflags = np.zeros_like(tflags) return tflags + + +def _import_aux_table(ms, table_name, row=None, skip=[]): + with ms_table(join(ms, table_name)) as t: + keys = filter(lambda s: s not in skip, t.colnames()) + if row is None: + dct = {kk: t.getcol(kk) for kk in keys} + else: + dct = {kk: t.getcol(kk, startrow=row, nrow=1) for kk in keys} + aux_table = AuxiliaryTable(dct) + return aux_table + + +def _source_and_field_table(ms, spectral_window, ifield): + source_table = _import_aux_table(ms, "SOURCE", skip=["POSITION", "TRANSITION", "REST_FREQUENCY", + "SYSVEL", "SOURCE_MODEL", "PULSAR_ID"]) + field_table = _import_aux_table(ms, "FIELD", row=ifield) + source_id = field_table["SOURCE_ID"][0] + ind = np.where(np.logical_and(source_table["SOURCE_ID"] == source_id, + np.logical_or(source_table["SPECTRAL_WINDOW_ID"] == spectral_window, + source_table["SPECTRAL_WINDOW_ID"] == -1)))[0] + if len(ind) == 0: + return None + elif len(ind) == 1: + ind = ind[0] + else: + raise RuntimeError + + return {"SOURCE": source_table.row(ind), "FIELD": field_table} diff --git a/resolve/data/observation.py b/resolve/data/observation.py index 8e035487..73407d08 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -267,8 +267,6 @@ class Observation(BaseObservation): Polarization information of the data set. freq : numpy.ndarray Contains the measured frequencies. Shape (n_channels) - direction : Direction - Direction information of the data set. auxiliary_tables : dict Dictionary of auxiliary tables. Default: None. @@ -277,9 +275,8 @@ class Observation(BaseObservation): vis and weight must have the same shape. """ - def __init__(self, antenna_positions, vis, weight, polarization, freq, direction, auxiliary_tables=None): + def __init__(self, antenna_positions, vis, weight, polarization, freq, auxiliary_tables=None): nrows = len(antenna_positions) - my_assert_isinstance(direction, Direction) my_assert_isinstance(polarization, Polarization) my_assert_isinstance(antenna_positions, AntennaPositions) my_asserteq(weight.shape, vis.shape) @@ -305,7 +302,6 @@ class Observation(BaseObservation): self._weight = weight self._polarization = polarization self._freq = freq - self._direction = direction if auxiliary_tables is not None: my_assert_isinstance(auxiliary_tables, dict) @@ -314,8 +310,8 @@ class Observation(BaseObservation): my_assert_isinstance(kk, str) self._auxiliary_tables = auxiliary_tables - self._eq_attributes = ("_direction", "_polarization", "_freq", - "_antpos", "_vis", "_weight", "_auxiliary_tables") + self._eq_attributes = ("_polarization", "_freq", "_antpos", "_vis", "_weight", + "_auxiliary_tables") @onlymaster def save(self, file_name, compress): @@ -324,7 +320,6 @@ class Observation(BaseObservation): weight=self._weight, freq=self._freq, polarization=self._polarization.to_list(), - direction=self._direction.to_list(), ) for ii, vv in enumerate(self._antpos.to_list()): if vv is None: @@ -363,7 +358,6 @@ class Observation(BaseObservation): # /Load auxtables pol = Polarization.from_list(dct["polarization"]) - direction = Direction.from_list(dct["direction"]) slc = slice(None) if lo_hi_index is None else slice(*lo_hi_index) # FIXME Put barrier here that makes sure that only one full Observation is loaded at a time return Observation( @@ -372,7 +366,6 @@ class Observation(BaseObservation): dct["weight"][..., slc], pol, dct["freq"][slc], - direction, auxtables ) @@ -382,7 +375,7 @@ class Observation(BaseObservation): vis = self._vis.copy() vis[self.flags.val] = np.nan return Observation(self._antpos, vis, self._weight, self._polarization, self._freq, - self._direction) + self._auxiliary_tables) @staticmethod def load_mf(file_name, n_imaging_bands, comm=None): @@ -413,13 +406,14 @@ class Observation(BaseObservation): return obs_list, nu0 def __getitem__(self, slc): + # FIXME Do I need to change something in self._auxiliary_tables? return Observation( self._antpos[slc], self._vis[:, slc], self._weight[:, slc], self._polarization, self._freq, - self._direction, + self._auxiliary_tables, ) def get_freqs(self, frequency_list): @@ -435,16 +429,18 @@ class Observation(BaseObservation): return self.get_freqs_by_slice(mask) def get_freqs_by_slice(self, slc): + # FIXME Do I need to change something in self._auxiliary_tables? return Observation( self._antpos, self._vis[..., slc], self._weight[..., slc], self._polarization, self._freq[slc], - self._direction, + self._auxiliary_tables, ) def average_stokesi(self): + # FIXME Do I need to change something in self._auxiliary_tables? my_asserteq(self._vis.shape[0], 2) my_asserteq(self._polarization.restrict_to_stokes_i(), self._polarization) vis = np.sum(self._weight * self._vis, axis=0)[None] @@ -453,22 +449,24 @@ class Observation(BaseObservation): vis /= wgt + np.ones_like(wgt) * invmask vis[invmask] = 0.0 return Observation( - self._antpos, vis, wgt, Polarization.trivial(), self._freq, self._direction + self._antpos, vis, wgt, Polarization.trivial(), self._freq, self._auxiliary_tables ) def restrict_to_stokesi(self): + # FIXME Do I need to change something in self._auxiliary_tables? my_asserteq(self._vis.shape[0], 4) ind = self._polarization.stokes_i_indices() vis = self._vis[ind] wgt = self._weight[ind] pol = self._polarization.restrict_to_stokes_i() - return Observation(self._antpos, vis, wgt, pol, self._freq, self._direction) + return Observation(self._antpos, vis, wgt, pol, self._freq, self._auxiliary_tables) def restrict_to_autocorrelations(self): slc = self._antpos.ant1 == self._antpos.ant2 return self[slc] def move_time(self, t0): + # FIXME Do I need to change something in self._auxiliary_tables? antpos = self._antpos.move_time(t0) return Observation( antpos, @@ -476,7 +474,7 @@ class Observation(BaseObservation): self._weight, self._polarization, self._freq, - self._direction, + self._auxiliary_tables ) @property @@ -487,6 +485,15 @@ class Observation(BaseObservation): def antenna_positions(self): return self._antpos + @property + def direction(self): + if self._auxiliary_tables is not None and "FIELD" in self._auxiliary_tables: + equinox = 2000 # FIXME Figure out how to extract this from a measurement set + refdir = self._auxiliary_tables["FIELD"]["REFERENCE_DIR"][0] + my_asserteq(refdir.shape[0] == 0) + return Direction(refdir[0], equinox) + return None + def auxiliary_table(self, name): return self._auxiliary_tables[name] diff --git a/resolve/util.py b/resolve/util.py index b8eda2b3..8093ef2e 100644 --- a/resolve/util.py +++ b/resolve/util.py @@ -60,10 +60,10 @@ def _deep_equal(a, b): def _compare_simple_or_array(a, b): equal = a == b - try: - return bool(equal) - except ValueError: - return equal.all() + if isinstance(equal, np.ndarray): + return np.all(equal) + assert isinstance(equal, bool) + return equal def _types_equal(a, b): diff --git a/test/test_general.py b/test/test_general.py index e5e2f997..13845c50 100644 --- a/test/test_general.py +++ b/test/test_general.py @@ -42,10 +42,7 @@ freqdomain = rve.IRGSpace(np.linspace(1000, 1050, num=10)) FACETS = [(1, 1), (2, 2), (2, 1), (1, 4)] -@pmp( - "ms", - ("CYG-ALL-2052-2MHZ", "CYG-D-6680-64CH-10S", "AM754_A030124_flagged", "1052735056"), -) +@pmp("ms", ("CYG-ALL-2052-2MHZ", "CYG-D-6680-64CH-10S", "AM754_A030124_flagged")) @pmp("with_calib_info", (False, True)) @pmp("compress", (False, True)) def test_save_and_load_observation(ms, with_calib_info, compress): @@ -58,11 +55,11 @@ def test_save_and_load_observation(ms, with_calib_info, compress): assert ob == ob1 -@pmp("slc", [slice(3), slice(14, 15), slice(None, None, None), slice(None, None, 5)]) +@pmp("slc", [slice(3), slice(14, 15), [14, 15], slice(None, None, None), slice(None, None, 5)]) def test_sliced_readin(slc): ms = f"{direc}CYG-D-6680-64CH-10S.ms" obs0 = rve.ms2observations(ms, "DATA", False, 0)[0] - obs = rve.ms2observations(ms, "DATA", False, 0, channel_slice=slc)[0] + obs = rve.ms2observations(ms, "DATA", False, 0, channels=slc)[0] ch0 = obs0.weight.val[..., slc] ch = obs.weight.val assert ch0.ndim == 3 -- GitLab From 443fcff27ef6a2174b8a190d602f8e54995d0d60 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Tue, 31 Aug 2021 13:01:35 +0200 Subject: [PATCH 25/35] ms_import.py: Refactoring --- resolve/data/ms_import.py | 174 ++++++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 75 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index e3214afd..c764a9e7 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -2,14 +2,13 @@ # Copyright(C) 2019-2021 Max-Planck-Society import os -from os.path import isdir, join, splitext +from os.path import isdir, join import numpy as np from ..util import my_assert, my_assert_isinstance, my_asserteq from .antenna_positions import AntennaPositions from .auxiliary_table import AuxiliaryTable -from .direction import Direction from .observation import Observation from .polarization import Polarization @@ -26,7 +25,6 @@ def _pol_id(ms_path, spectral_window): return polid - def ms2observations(ms, data_column, with_calib_info, spectral_window, polarizations="all", channels=slice(None), ignore_flags=False): """Read and convert a given measurement set into an array of :class:`Observation` @@ -140,91 +138,44 @@ def _ms2resolve_transpose(arr): return np.ascontiguousarray(np.transpose(arr, (2, 0, 1))) -def _determine_weighting(t): - fullwgt = False - weightcol = "WEIGHT" - try: - t.getcol("WEIGHT_SPECTRUM", startrow=0, nrow=1) - weightcol = "WEIGHT_SPECTRUM" - fullwgt = True - except RuntimeError: - pass +def _determine_weighting(ms): + with ms_table(ms) as t: + if "WEIGHT_SPECTRUM" in t.colnames(): + weightcol = "WEIGHT_SPECTRUM" + fullwgt = True + else: + weightcol = "WEIGHT" + fullwgt = False return fullwgt, weightcol def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summation, with_calib_info, channels, ignore_flags): - - # Freq - with ms_table(join(name, "SPECTRAL_WINDOW")) as t: - freq = t.getcol("CHAN_FREQ", startrow=spectral_window, nrow=1)[0] - my_asserteq(freq.ndim, 1) - my_assert(len(freq) > 0) - nchan = len(freq) - assert pol_indices is None or isinstance(pol_indices, list) if pol_indices is None: pol_indices = slice(None) if pol_summation: my_asserteq(len(pol_indices), 2) + # Check if data column is available and get shape with ms_table(name) as t: - # FIXME Get rid of fullwgt - fullwgt, weightcol = _determine_weighting(t) - nrow = t.nrows() - nmspol = t.getcol("FLAG", startrow=0, nrow=1).shape[2] - print("Measurement set visibilities:") - print(f" shape: ({nrow}, {nchan}, {nmspol})") - active_rows = np.ones(nrow, dtype=np.bool) - active_channels = np.zeros(nchan, dtype=np.bool) - step = max(1, nrow // 100) # how many rows to read in every step - - # Check if data column is available - t.getcol(data_column, startrow=0, nrow=10) - - # Determine which subset of rows/channels we need to input - start = 0 - while start < nrow: - print("First pass:", f"{(start/nrow*100):.1f}%", end="\r") - stop = min(nrow, start + step) - tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) - twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[..., pol_indices] - - tchslcflags = np.ones_like(tflags) - tchslcflags[:, channels] = False - tflags = np.logical_or(tflags, tchslcflags) - - if not fullwgt: - twgt = np.repeat(twgt[:, None], nchan, axis=1) - my_asserteq(twgt.ndim, 3) - npol = tflags.shape[2] - if pol_summation: - tflags = np.any(tflags.astype(np.bool), axis=2)[..., None] - twgt = np.sum(twgt, axis=2)[..., None] - tflags[twgt == 0] = True - - # Select field and spectral window - tfieldid = t.getcol("FIELD_ID", startrow=start, nrow=stop - start) - tflags[tfieldid != field] = True - tspw = t.getcol("DATA_DESC_ID", startrow=start, nrow=stop - start) - tflags[tspw != spectral_window] = True - - # Inactive if all polarizations are flagged - assert tflags.ndim == 3 - tflags = np.all(tflags, axis=2) - active_rows[start:stop] = np.invert(np.all(tflags, axis=1)) - active_channels = np.logical_or( - active_channels, np.invert(np.all(tflags, axis=0)) - ) - start = stop - nrealrows, nrealchan = np.sum(active_rows), np.sum(active_channels) - if nrealrows == 0 or nrealchan == 0: - return None + nmspol = t.getcol(data_column, startrow=0, nrow=3).shape[2] + nrow = t.nrow() + print("Measurement set visibilities:") + print(f" shape: ({nrow}, {_ms_nchannels(name, spectral_window)}, {nmspol})") + + active_rows, active_channels = _first_pass(name, field, spectral_window, channels, pol_indices, + pol_summation, ignore_flags) + nrealrows, nrealchan = np.sum(active_rows), np.sum(active_channels) + if nrealrows == 0 or nrealchan == 0: + return None # Freq - freq = freq[active_channels] + freq = _ms_channels(name, spectral_window)[active_channels] # Vis, wgt, (flags) + fullwgt, weightcol = _determine_weighting(name) + nchan = _ms_nchannels(name, spectral_window) with ms_table(name) as t: if pol_summation: npol = 1 @@ -241,7 +192,7 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat start, realstart = 0, 0 while start < nrow: print("Second pass:", f"{(start/nrow*100):.1f}%", end="\r") - stop = min(nrow, start + step) + stop = _ms_stop(start, nrow) realstop = realstart + np.sum(active_rows[start:stop]) if realstop > realstart: allrows = stop - start == realstop - realstart @@ -310,7 +261,7 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat start, realstart = 0, 0 while start < nrow: print("Second pass:", f"{(start/nrow*100):.1f}%", end="\r") - stop = min(nrow, start + step) + stop = _ms_stop(start, nrow) realstop = realstart + np.sum(active_rows[start:stop]) if realstop > realstart: allrows = stop - start == realstop - realstart @@ -323,7 +274,80 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat vis = np.ascontiguousarray(_ms2resolve_transpose(vis)) wgt = np.ascontiguousarray(_ms2resolve_transpose(wgt)) vis[wgt == 0] = 0.0 - return {"uvw": uvw, "ant1": ant1, "ant2": ant2, "time": time, "freq": freq, "vis": vis, "wgt": wgt, "ptg": ptg} + return {"uvw": uvw, "ant1": ant1, "ant2": ant2, "time": time, "freq": freq, "vis": vis, + "wgt": wgt, "ptg": ptg} + + +def _first_pass(ms, field, spectral_window, channels, pol_indices, pol_summation, ignore_flags): + """Go through measurement set and determine which rows and which channels are active for a given + field and a given spectral window. + """ + fullwgt, weightcol = _determine_weighting(ms) + nchan = _ms_nchannels(ms, spectral_window) + with ms_table(ms) as t: + nrow = t.nrows() + active_rows = np.ones(nrow, dtype=np.bool) + active_channels = np.zeros(nchan, dtype=np.bool) + + # Determine which subset of rows/channels we need to input + start = 0 + while start < nrow: + print("First pass:", f"{(start/nrow*100):.1f}%", end="\r") + stop = _ms_stop(start, nrow) + tflags = _conditional_flags(t, start, stop, pol_indices, ignore_flags) + twgt = t.getcol(weightcol, startrow=start, nrow=stop - start)[..., pol_indices] + + tchslcflags = np.ones_like(tflags) + tchslcflags[:, channels] = False + tflags = np.logical_or(tflags, tchslcflags) + + if not fullwgt: + twgt = np.repeat(twgt[:, None], nchan, axis=1) + my_asserteq(twgt.ndim, 3) + if pol_summation: + tflags = np.any(tflags.astype(np.bool), axis=2)[..., None] + twgt = np.sum(twgt, axis=2)[..., None] + tflags[twgt == 0] = True + + # Select field and spectral window + tfieldid = t.getcol("FIELD_ID", startrow=start, nrow=stop - start) + tflags[tfieldid != field] = True + tspw = t.getcol("DATA_DESC_ID", startrow=start, nrow=stop - start) + tflags[tspw != spectral_window] = True + + # Inactive if all polarizations are flagged + assert tflags.ndim == 3 + tflags = np.all(tflags, axis=2) + active_rows[start:stop] = np.invert(np.all(tflags, axis=1)) + active_channels = np.logical_or( + active_channels, np.invert(np.all(tflags, axis=0)) + ) + start = stop + return active_rows, active_channels + + +def _ms_stop(start, nrow): + """Compute sensible step size for going through the rows of a measurement set and return stop + index. + """ + step = max(1, nrow // 100) + return min(nrow, start + step) + + +def _ms_nchannels(ms, spectral_window): + """Return number of channels in a given spectral window of a measurement set. + """ + return len(_ms_channels(ms, spectral_window)) + + +def _ms_channels(ms, spectral_window): + """Return frequencies of channels in a given spectral window of a measurement set. + """ + with ms_table(join(ms, "SPECTRAL_WINDOW")) as t: + freq = t.getcol("CHAN_FREQ", startrow=spectral_window, nrow=1)[0] + my_asserteq(freq.ndim, 1) + my_assert(len(freq) > 0) + return freq def ms_n_spectral_windows(ms): -- GitLab From c0ed2537c61357c85978a160d4f891a050af812c Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Tue, 31 Aug 2021 13:04:29 +0200 Subject: [PATCH 26/35] fixup! ms_import.py: Refactoring --- resolve/data/ms_import.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index c764a9e7..0678a31c 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -160,7 +160,7 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat # Check if data column is available and get shape with ms_table(name) as t: nmspol = t.getcol(data_column, startrow=0, nrow=3).shape[2] - nrow = t.nrow() + nrow = t.nrows() print("Measurement set visibilities:") print(f" shape: ({nrow}, {_ms_nchannels(name, spectral_window)}, {nmspol})") @@ -180,10 +180,10 @@ def read_ms_i(name, data_column, field, spectral_window, pol_indices, pol_summat if pol_summation: npol = 1 else: - if pol_indices != slice(None): - npol = len(pol_indices) + if pol_indices == slice(None): + npol = nmspol else: - npol = npol + npol = len(pol_indices) shp = (nrealrows, nrealchan, npol) vis = np.empty(shp, dtype=np.complex64) wgt = np.empty(shp, dtype=np.float32) -- GitLab From c68d914cd750f0fad58d97630e35a5263acc74f0 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Tue, 31 Aug 2021 13:32:35 +0200 Subject: [PATCH 27/35] Cosmetics --- resolve/data/ms_import.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resolve/data/ms_import.py b/resolve/data/ms_import.py index 0678a31c..f16ce733 100644 --- a/resolve/data/ms_import.py +++ b/resolve/data/ms_import.py @@ -319,9 +319,7 @@ def _first_pass(ms, field, spectral_window, channels, pol_indices, pol_summation assert tflags.ndim == 3 tflags = np.all(tflags, axis=2) active_rows[start:stop] = np.invert(np.all(tflags, axis=1)) - active_channels = np.logical_or( - active_channels, np.invert(np.all(tflags, axis=0)) - ) + active_channels = np.logical_or(active_channels, np.invert(np.all(tflags, axis=0))) start = stop return active_rows, active_channels -- GitLab From f55f8c5d33f03b1c356e4f8a06bd99dc159a218f Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Wed, 1 Sep 2021 13:51:03 +0200 Subject: [PATCH 28/35] Add polarization space --- resolve/__init__.py | 1 + resolve/polarization_space.py | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 resolve/polarization_space.py diff --git a/resolve/__init__.py b/resolve/__init__.py index 4a1ade5b..68cdefef 100644 --- a/resolve/__init__.py +++ b/resolve/__init__.py @@ -23,6 +23,7 @@ from .plotter import MfPlotter, Plotter from .points import PointInserter from .data.polarization import Polarization from .polarization_matrix_exponential import * +from .polarization_space import * from .response import MfResponse, ResponseDistributor, StokesIResponse, SingleResponse from .simple_operators import * from .util import * diff --git a/resolve/polarization_space.py b/resolve/polarization_space.py new file mode 100644 index 00000000..380681fb --- /dev/null +++ b/resolve/polarization_space.py @@ -0,0 +1,45 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# Copyright(C) 2013-2021 Max-Planck-Society + +import nifty8 as ift + +from .util import my_assert + + +class PolarizationSpace(ift.UnstructuredDomain): + """ + + Parameters + ---------- + coordinates : np.ndarray + Must be sorted and strictly ascending. + """ + + _needed_for_hash = ["_shape", "_lbl"] + + def __init__(self, polarization_labels): + if isinstance(polarization_labels, str): + polarization_labels = (polarization_labels,) + self._lbl = tuple(polarization_labels) + for ll in self._lbl: + my_assert(ll in ["I", "Q", "U", "V", "LL", "LR", "RL", "RR", "XX", "XY", "YX", "YY"]) + super(PolarizationSpace, self).__init__(len(self._lbl)) + + def __repr__(self): + return f"PolarizationSpace({self._lbl})" + + @property + def labels(self): + return self._lbl -- GitLab From e3577ea3ae45f6152e83ec442999f7ca669d8dfc Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Wed, 1 Sep 2021 13:53:39 +0200 Subject: [PATCH 29/35] Observation: add ant and time to interface --- resolve/data/observation.py | 12 ++++++++++++ resolve/polarization_space.py | 17 +++-------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index 73407d08..269d00f8 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -485,6 +485,18 @@ class Observation(BaseObservation): def antenna_positions(self): return self._antpos + @property + def ant1(self): + return self._antpos.ant1 + + @property + def ant2(self): + return self._antpos.ant2 + + @property + def time(self): + return self._antpos.time + @property def direction(self): if self._auxiliary_tables is not None and "FIELD" in self._auxiliary_tables: diff --git a/resolve/polarization_space.py b/resolve/polarization_space.py index 380681fb..5cb209e8 100644 --- a/resolve/polarization_space.py +++ b/resolve/polarization_space.py @@ -1,17 +1,6 @@ -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -# Copyright(C) 2013-2021 Max-Planck-Society +# SPDX-License-Identifier: GPL-3.0-or-later +# Copyright(C) 2021 Max-Planck-Society +# Author: Philipp Arras, Jakob Knollmüller import nifty8 as ift -- GitLab From 6a0ead301e59f93e7b027848ee91fc756c7d6ba3 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Wed, 1 Sep 2021 15:58:18 +0200 Subject: [PATCH 30/35] SingleResponse: make mask optional --- resolve/data/observation.py | 8 ++++++++ resolve/response.py | 5 +++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index 269d00f8..ba72f0ad 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -452,6 +452,14 @@ class Observation(BaseObservation): self._antpos, vis, wgt, Polarization.trivial(), self._freq, self._auxiliary_tables ) + def restrict_by_time(self, tmin, tmax): + ind = np.logical_and(tmin <= self.time , self.time < tmax) + return self[ind] + + def restrict_by_freq(self, fmin, fmax): + ind = np.logical_and(fmin <= self.freq, self.freq < fmax) + return self.get_freqs_by_slice(ind) + def restrict_to_stokesi(self): # FIXME Do I need to change something in self._auxiliary_tables? my_asserteq(self._vis.shape[0], 4) diff --git a/resolve/response.py b/resolve/response.py index c9d66ee3..5e074cb9 100644 --- a/resolve/response.py +++ b/resolve/response.py @@ -222,7 +222,7 @@ class FullResponse(ift.LinearOperator): class SingleResponse(ift.LinearOperator): - def __init__(self, domain, uvw, freq, mask, facets=(1, 1)): + def __init__(self, domain, uvw, freq, mask=None, facets=(1, 1)): my_assert_isinstance(facets, tuple) for ii in range(2): if domain.shape[ii] % facets[ii] != 0: @@ -234,7 +234,6 @@ class SingleResponse(ift.LinearOperator): self._args = { "uvw": uvw, "freq": freq, - "mask": mask.astype(np.uint8), "pixsize_x": self._domain[0].distances[0], "pixsize_y": self._domain[0].distances[1], "epsilon": epsilon(), @@ -242,6 +241,8 @@ class SingleResponse(ift.LinearOperator): "nthreads": nthreads(), "flip_v": True, } + if mask is not None: + self._args["mask"] = mask.astype(np.uint8) self._vol = self._domain[0].scalar_dvol self._target_dtype = np_dtype(True) self._domain_dtype = np_dtype(False) -- GitLab From 9096a07d9254ec2bf29a992ee4ce03a7e87e5b84 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Wed, 1 Sep 2021 15:58:43 +0200 Subject: [PATCH 31/35] Observation: restrict by time/freq via searchsorted --- resolve/data/observation.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index ba72f0ad..b194b235 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -294,6 +294,8 @@ class Observation(BaseObservation): my_assert(np.all(np.isfinite(vis[weight > 0.]))) my_assert(np.all(np.isfinite(weight))) + my_assert(np.all(np.diff(freq))) + vis.flags.writeable = False weight.flags.writeable = False @@ -452,13 +454,23 @@ class Observation(BaseObservation): self._antpos, vis, wgt, Polarization.trivial(), self._freq, self._auxiliary_tables ) - def restrict_by_time(self, tmin, tmax): - ind = np.logical_and(tmin <= self.time , self.time < tmax) - return self[ind] - - def restrict_by_freq(self, fmin, fmax): - ind = np.logical_and(fmin <= self.freq, self.freq < fmax) - return self.get_freqs_by_slice(ind) + def restrict_by_time(self, tmin, tmax, with_index=False): + my_assert(all(np.diff(self.time) >= 0)) + start, stop = np.searchsorted(self.time, [tmin, tmax]) + ind = slice(start, stop) + res = self[ind] + if with_index: + return res, ind + return res + + def restrict_by_freq(self, fmin, fmax, with_index=False): + my_assert(all(np.diff(self.freq) > 0)) + start, stop = np.searchsorted(self.freq, [fmin, fmax]) + ind = slice(start, stop) + res = self.get_freqs_by_slice(ind) + if with_index: + return res, ind + return res def restrict_to_stokesi(self): # FIXME Do I need to change something in self._auxiliary_tables? -- GitLab From d3e529770efa462c68ca0c50b384bbdd3bfb42e2 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Wed, 1 Sep 2021 15:59:12 +0200 Subject: [PATCH 32/35] Cosmetics --- resolve/polarization_space.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolve/polarization_space.py b/resolve/polarization_space.py index 5cb209e8..dad3e786 100644 --- a/resolve/polarization_space.py +++ b/resolve/polarization_space.py @@ -27,7 +27,7 @@ class PolarizationSpace(ift.UnstructuredDomain): super(PolarizationSpace, self).__init__(len(self._lbl)) def __repr__(self): - return f"PolarizationSpace({self._lbl})" + return f"PolarizationSpace(polarization_labels={self._lbl})" @property def labels(self): -- GitLab From 13be61630acf96b494562ee65b644c0bc7637b0b Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Thu, 2 Sep 2021 10:40:12 +0200 Subject: [PATCH 33/35] Cleanup --- resolve/multi_frequency/irg_space.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/resolve/multi_frequency/irg_space.py b/resolve/multi_frequency/irg_space.py index 70a07f71..a2a23bc7 100644 --- a/resolve/multi_frequency/irg_space.py +++ b/resolve/multi_frequency/irg_space.py @@ -12,8 +12,6 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # # Copyright(C) 2013-2020 Max-Planck-Society -# -# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik. import numpy as np -- GitLab From deb6e5bf5b081a906a15d5588721a3c4acb0f270 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 3 Sep 2021 18:06:43 +0200 Subject: [PATCH 34/35] Observation: Use PolarizationSpace for data domain --- resolve/data/observation.py | 3 ++- resolve/data/polarization.py | 9 +++++++++ resolve/likelihood.py | 2 +- resolve/multi_frequency/operators.py | 6 +++--- resolve/response.py | 8 +++++--- test/test_general.py | 12 +++++------- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index b194b235..6fb42d80 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -18,7 +18,8 @@ from .polarization import Polarization class BaseObservation: @property def _dom(self): - dom = [ift.UnstructuredDomain(ss) for ss in self._vis.shape] + pol_dom = self.polarization.space + dom = [pol_dom] + [ift.UnstructuredDomain(ss) for ss in self._vis.shape[1:]] return ift.makeDomain(dom) @property diff --git a/resolve/data/polarization.py b/resolve/data/polarization.py index 3bbbd199..48b2c79c 100644 --- a/resolve/data/polarization.py +++ b/resolve/data/polarization.py @@ -68,6 +68,15 @@ class Polarization: def to_str_list(self): return [TABLE[ii] for ii in self._ind] + @property + def space(self): + from ..polarization_space import PolarizationSpace + if self == Polarization.trivial(): + x = "I" + else: + x = [TABLE[ii] for ii in self._ind] + return PolarizationSpace(x) + @staticmethod def from_list(lst): return Polarization(lst) diff --git a/resolve/likelihood.py b/resolve/likelihood.py index 7e75481a..54c9ed46 100644 --- a/resolve/likelihood.py +++ b/resolve/likelihood.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: GPL-3.0-or-later -# Copyright(C) 2020 Max-Planck-Society +# Copyright(C) 2020-2021 Max-Planck-Society # Author: Philipp Arras from functools import reduce diff --git a/resolve/multi_frequency/operators.py b/resolve/multi_frequency/operators.py index 11aae3db..b77f98b2 100644 --- a/resolve/multi_frequency/operators.py +++ b/resolve/multi_frequency/operators.py @@ -2,10 +2,10 @@ # Copyright(C) 2013-2021 Max-Planck-Society # Authors: Philipp Frank, Philipp Arras, Philipp Haim -import numpy as np - import nifty8 as ift +import numpy as np +from ..polarization_space import PolarizationSpace from ..util import my_assert, my_assert_isinstance, my_asserteq from .irg_space import IRGSpace @@ -91,7 +91,7 @@ class MfWeightingInterpolation(ift.LinearOperator): my_asserteq(domain.shape[0], eff_uvw.shape[2]) # freqaxis self._domain = domain nrow, nfreq = eff_uvw.shape[1:] - tgt = [ift.UnstructuredDomain(aa) for aa in [1, nrow, nfreq]] + tgt = [PolarizationSpace("I")] + [ift.UnstructuredDomain(aa) for aa in [nrow, nfreq]] self._target = ift.DomainTuple.make(tgt) self._capability = self.TIMES | self.ADJOINT_TIMES # FIXME Try to unify all those operators which loop over freq dimension diff --git a/resolve/response.py b/resolve/response.py index 5e074cb9..fcb097b3 100644 --- a/resolve/response.py +++ b/resolve/response.py @@ -41,7 +41,7 @@ def StokesIResponse(observation, domain): return contr.adjoint @ sr0 elif npol == 2: sr1 = SingleResponse(domain, observation.uvw, observation.freq, mask[1]) - return ResponseDistributor(sr0, sr1) + return ResponseDistributor(observation.polarization.space, sr0, sr1) raise RuntimeError @@ -186,7 +186,7 @@ class MfResponse(ift.LinearOperator): class ResponseDistributor(ift.LinearOperator): - def __init__(self, *ops): + def __init__(self, iterating_target_space, *ops): dom, tgt = ops[0].domain, ops[0].target cap = self.TIMES | self.ADJOINT_TIMES for op in ops: @@ -194,8 +194,10 @@ class ResponseDistributor(ift.LinearOperator): my_assert(dom is op.domain) my_assert(tgt is op.target) my_assert(self.TIMES & op.capability, self.ADJOINT_TIMES & op.capability) + my_asserteq(len(iterating_target_space.shape), 1) + my_asserteq(len(ops), iterating_target_space.shape[0]) self._domain = ift.makeDomain(dom) - self._target = ift.makeDomain((ift.UnstructuredDomain(len(ops)), *tgt)) + self._target = ift.makeDomain((iterating_target_space, *tgt)) self._capability = cap self._ops = ops diff --git a/test/test_general.py b/test/test_general.py index 13845c50..4c91daf9 100644 --- a/test/test_general.py +++ b/test/test_general.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: GPL-3.0-or-later -# Copyright(C) 2020 Max-Planck-Society +# Copyright(C) 2020-2021 Max-Planck-Society # Author: Philipp Arras from os.path import join @@ -146,7 +146,7 @@ def test_calibration_likelihood(time_mode): npol, nfreq = obs[0].npol, obs[0].nfreq total_N = npol * nants * nfreq dom = [ - ift.UnstructuredDomain(npol), + obs[0].polarization.space, ift.UnstructuredDomain(len(uants)), time_domain, ift.UnstructuredDomain(nfreq), @@ -213,10 +213,8 @@ def test_calibration_distributor(obs): tgt = obs.vis.domain utimes = rve.unique_times(obs) uants = obs.antenna_positions.unique_antennas() - dom = [ - ift.UnstructuredDomain(nn) - for nn in [obs.npol, len(uants), len(utimes), obs.nfreq] - ] + dom = [obs.polarization.space] + \ + [ift.UnstructuredDomain(nn) for nn in [len(uants), len(utimes), obs.nfreq]] uants = rve.unique_antennas(obs) time_dct = {aa: ii for ii, aa in enumerate(utimes)} antenna_dct = {aa: ii for ii, aa in enumerate(uants)} @@ -259,7 +257,7 @@ def test_response_distributor(): dom = ift.UnstructuredDomain(2), ift.UnstructuredDomain(4) op0 = ift.makeOp(ift.makeField(dom, np.arange(8).reshape(2, -1))) op1 = ift.makeOp(ift.makeField(dom, 2 * np.arange(8).reshape(2, -1))) - op = rve.response.ResponseDistributor(op0, op1) + op = rve.response.ResponseDistributor(ift.UnstructuredDomain(2), op0, op1) ift.extra.check_linear_operator(op) -- GitLab From 762b7da1931d3099abd77c97a25ad13070c2d462 Mon Sep 17 00:00:00 2001 From: Philipp Arras <parras@mpa-garching.mpg.de> Date: Fri, 17 Sep 2021 15:14:27 +0200 Subject: [PATCH 35/35] Observation: add time_average --- resolve/data/observation.py | 47 +++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/resolve/data/observation.py b/resolve/data/observation.py index 6fb42d80..21d3bc48 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -498,6 +498,53 @@ class Observation(BaseObservation): self._auxiliary_tables ) + def time_average(self, list_of_timebins): + # FIXME check that timebins do not overlap + # time, ant1, ant2 + ts = self._antpos.time + row_to_bin_map = np.empty(ts.shape) + row_to_bin_map[:] = np.nan + + for ii, (lo, hi) in enumerate(list_of_timebins): + ind = np.logical_and(ts >= lo, ts < hi) + assert np.all(np.isnan(row_to_bin_map[ind])) + row_to_bin_map[ind] = ii + + assert np.all(~np.isnan(row_to_bin_map)) + row_to_bin_map = row_to_bin_map.astype(int) + + ant1 = self._antpos.ant1 + ant2 = self._antpos.ant2 + assert np.max(ant1) < 1000 + assert np.max(ant2) < 1000 + assert np.max(row_to_bin_map) < np.iinfo(np.dtype("int64")).max / 1000000 + atset = set(zip(ant1, ant2, row_to_bin_map)) + dct = {aa: ii for ii, aa in enumerate(atset)} + dct_inv = {yy: xx for xx, yy in dct.items()} + masterindex = np.array([dct[(a1, a2, tt)] for a1, a2, tt in zip(ant1, ant2, row_to_bin_map)]) + + vis, wgt = self.vis.val, self.weight.val + new_vis = np.empty((self.npol, len(atset), self.nfreq), dtype=self.vis.dtype) + new_wgt = np.empty((self.npol, len(atset), self.nfreq), dtype=self.weight.dtype) + for pol in range(self.npol): + for freq in range(self.nfreq): + enum = np.bincount(masterindex, weights=vis[pol, :, freq].real*wgt[pol, :, freq]) + enum = enum + 1j*np.bincount(masterindex, weights=vis[pol, :, freq].imag*wgt[pol, :, freq]) + denom = np.bincount(masterindex, weights=wgt[pol, :, freq]) + new_vis[pol, :, freq] = enum/denom + new_wgt[pol, :, freq] = denom + + new_times = np.array([dct_inv[ii][2] for ii in range(len(atset))]) + new_times = np.mean(np.array(list_of_timebins), axis=1)[new_times] + + new_ant1 = np.array([dct_inv[ii][0] for ii in range(len(atset))]) + new_ant2 = np.array([dct_inv[ii][1] for ii in range(len(atset))]) + new_uvw = ap.uvw + # FIXME Find correct uvw + ap = self._antpos + ap = AntennaPositions(new_uvw, new_ant1, new_ant2, new_times) + return Observation(ap, new_vis, new_wgt, self._polarization, self._freq, self._auxiliary_tables) + @property def uvw(self): return self._antpos.uvw -- GitLab