diff --git a/resolve/__init__.py b/resolve/__init__.py index 68cdefef795f56efc4bf8836d86271b2eab1df6b..49833d48efc104ea791beb6125767150501aa3b3 100644 --- a/resolve/__init__.py +++ b/resolve/__init__.py @@ -27,3 +27,4 @@ from .polarization_space import * from .response import MfResponse, ResponseDistributor, StokesIResponse, SingleResponse from .simple_operators import * from .util import * +from .extra import mpi_load diff --git a/resolve/data/observation.py b/resolve/data/observation.py index b4c7934d80e8cd6791bda4fc348faeebe0a6310d..e5636b5497488ef23ed022789469b3dd189bc872 100644 --- a/resolve/data/observation.py +++ b/resolve/data/observation.py @@ -162,8 +162,8 @@ class BaseObservation: if not isinstance(other, type(self)): return False if ( - self._vis.dtype != other._vis.dtype - or self._weight.dtype != other._weight.dtype + self._vis.dtype != other._vis.dtype + or self._weight.dtype != other._weight.dtype ): return False return compare_attributes(self, other, self._eq_attributes) @@ -188,6 +188,7 @@ class SingleDishObservation(BaseObservation): freq : numpy.ndarray Contains the measured frequencies. Shape (n_channels) """ + def __init__(self, pointings, data, weight, polarization, freq): my_assert_isinstance(pointings, Directions) my_assert_isinstance(polarization, Polarization) @@ -337,7 +338,7 @@ class Observation(BaseObservation): @staticmethod def load(file_name, lo_hi_index=None): - dct = dict(np.load(file_name)) + dct = np.load(file_name) antpos = [] for ii in range(4): val = dct[f"antpos{ii}"] @@ -361,16 +362,18 @@ class Observation(BaseObservation): # /Load auxtables pol = Polarization.from_list(dct["polarization"]) - slc = slice(None) if lo_hi_index is None else slice(*lo_hi_index) - # FIXME Put barrier here that makes sure that only one full Observation is loaded at a time - return Observation( - AntennaPositions.from_list(antpos), - dct["vis"][..., slc], - dct["weight"][..., slc], - pol, - dct["freq"][slc], - auxtables - ) + vis = dct["vis"] + wgt = dct["weight"] + freq = dct["freq"] + if lo_hi_index is not None: + slc = slice(*lo_hi_index) + # Convert view into its own array + vis = vis[..., slc].copy() + wgt = wgt[..., slc].copy() + freq = freq[slc].copy() + del dct + antpos = AntennaPositions.from_list(antpos) + return Observation(antpos, vis, wgt, pol, freq, auxtables) @staticmethod def legacy_load(file_name, lo_hi_index=None): @@ -417,33 +420,28 @@ class Observation(BaseObservation): if comm is None: local_imaging_bands = range(n_imaging_bands) else: - local_imaging_bands = range( - *ift.utilities.shareRange( - n_imaging_bands, comm.Get_size(), comm.Get_rank() - ) - ) + lo, hi = ift.utilities.shareRange( n_imaging_bands, comm.Get_size(), comm.Get_rank()) + local_imaging_bands = range(lo, hi) full_obs = Observation.load(file_name) - obs_list = [ - full_obs.get_freqs_by_slice( - slice(*ift.utilities.shareRange(len(global_freqs), n_imaging_bands, ii)) - ) - for ii in local_imaging_bands - ] + obs_list = [] + for ii in local_imaging_bands: + slc = slice(*ift.utilities.shareRange(len(global_freqs), n_imaging_bands, ii)) + obs_list.append(full_obs.get_freqs_by_slice(slc)) nu0 = global_freqs.mean() return obs_list, nu0 - def __getitem__(self, slc): + def __getitem__(self, slc, copy=False): # FIXME Do I need to change something in self._auxiliary_tables? - return Observation( - self._antpos[slc], - self._vis[:, slc], - self._weight[:, slc], - self._polarization, - self._freq, - self._auxiliary_tables, - ) - - def get_freqs(self, frequency_list): + ap = self._antpos[slc] + vis = self._vis[slc] + wgt = self._weight[:, slc] + if copy: + ap = ap.copy() + vis = vis.copy() + wgt = wgt.copy() + return Observation(ap, vis, wgt, self._polarization, self._freq, self._auxiliary_tables) + + def get_freqs(self, frequency_list, copy=False): """Return observation that contains a subset of the present frequencies Parameters @@ -453,18 +451,18 @@ class Observation(BaseObservation): """ mask = np.zeros(self.nfreq, dtype=bool) mask[frequency_list] = 1 - return self.get_freqs_by_slice(mask) + return self.get_freqs_by_slice(mask, copy) - def get_freqs_by_slice(self, slc): + def get_freqs_by_slice(self, slc, copy=False): # FIXME Do I need to change something in self._auxiliary_tables? - return Observation( - self._antpos, - self._vis[..., slc], - self._weight[..., slc], - self._polarization, - self._freq[slc], - self._auxiliary_tables, - ) + vis = self._vis[..., slc] + wgt = self._weight[..., slc] + freq = self._freq[slc] + if copy: + vis = vis.copy() + wgt = wgt.copy() + freq = freq.copy() + return Observation( self._antpos, vis, wgt, self._polarization, freq, self._auxiliary_tables) def average_stokesi(self): # FIXME Do I need to change something in self._auxiliary_tables? @@ -564,11 +562,26 @@ class Observation(BaseObservation): new_ant1 = np.array([dct_inv[ii][0] for ii in range(len(atset))]) new_ant2 = np.array([dct_inv[ii][1] for ii in range(len(atset))]) new_uvw = ap.uvw + raise NotImplementedError("FIXME The implementation of this is not complete.") # FIXME Find correct uvw ap = self._antpos ap = AntennaPositions(new_uvw, new_ant1, new_ant2, new_times) return Observation(ap, new_vis, new_wgt, self._polarization, self._freq, self._auxiliary_tables) + def flag_baseline(self, ant1_index, ant2_index): + ant1 = self.antenna_positions.ant1 + ant2 = self.antenna_positions.ant2 + if ant1 is None or ant2 is None: + raise RuntimeError("The calibration information needed for flagging a baseline is not " + "available. Please import the measurement set with " + "`with_calib_info=True`.") + assert np.all(ant1 < ant2) + ind = np.logical_and(ant1 == ant1_index, ant2 == ant2_index) + wgt = self._weight.copy() + wgt[:, ind] = 0. + print(f"INFO: Flag baseline {ant1_index}-{ant2_index}, {np.sum(ind)}/{self.nrow} rows flagged.") + return Observation(self._antpos, self._vis, wgt, self._polarization, self._freq, self._auxiliary_tables) + @property def uvw(self): return self._antpos.uvw diff --git a/resolve/extra.py b/resolve/extra.py new file mode 100644 index 0000000000000000000000000000000000000000..6d8e85acc8431fb762360d6a7cd19c657fb5fbb7 --- /dev/null +++ b/resolve/extra.py @@ -0,0 +1,32 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# Copyright(C) 2019-2021 Max-Planck-Society +# Author: Philipp Arras + +import nifty8 as ift + +from .mpi import master +from .data.observation import Observation + + +def split_data_file(data_path, n_task, target_folder, base_name, n_work, compress): + from os import makedirs + makedirs(target_folder, exist_ok=True) + + obs = Observation.load(data_path) + + for rank in range(n_task): + lo, hi = ift.utilities.shareRange(n_work, n_task, rank) + sliced_obs = obs.get_freqs_by_slice(slice(*(lo, hi))) + sliced_obs.save(f"{target_folder}/{base_name}_{rank}.npz", compress=compress) + + +def mpi_load(data_folder, base_name, full_data_set, n_work, comm=None, compress=False): + if master: + from os.path import isdir + if not isdir(data_folder): + split_data_file(full_data_set, comm.Get_size(), data_folder, base_name, n_work, compress) + if comm is None: + return Observation.load(full_data_set) + + comm.Barrier() + return Observation.load(f"{data_folder}/{base_name}_{comm.Get_rank()}.npz") diff --git a/resolve/response.py b/resolve/response.py index fcb097b37d095eb5b611d7fe30a470a2e5084dfd..c92713390b0b568030f093e96194439db8128b58 100644 --- a/resolve/response.py +++ b/resolve/response.py @@ -106,12 +106,17 @@ class MfResponse(ift.LinearOperator): observation : Observation Instance of the :class:`Observation` that represents the measured data. frequency_domain : IRGSpace - Contains the :class:`IRGSpace` for the frequencies. + Contains the :class:`IRGSpace` for the frequencies. The coordinates of + the space can be either frequencies or normalized log-frequencies. In + the latter case set `log_freq` to True. position_domain : nifty8.RGSpace Contains the the :class:`nifty8.RGSpace` for the positions. + log_freq : bool + If true, the coordinates of `frequency_domain` are interpreted to be + normalized log-frequencies. Default is False. """ - def __init__( self, observation, frequency_domain, position_domain): + def __init__(self, observation, frequency_domain, position_domain, log_freq=False): my_assert_isinstance(observation, Observation) # FIXME Add polarization support my_assert(observation.npol in [1, 2]) @@ -123,7 +128,11 @@ class MfResponse(ift.LinearOperator): self._target = observation.vis.domain self._capability = self.TIMES | self.ADJOINT_TIMES - data_freq = observation.freq + if log_freq: + data_freq = np.log(observation.freq / observation.freq.mean()) + else: + data_freq = observation.freq + my_assert(np.all(np.diff(data_freq) > 0)) sky_freq = np.array(frequency_domain.coordinates) band_indices = self.band_indices(sky_freq, data_freq) @@ -278,9 +287,9 @@ class SingleResponse(ift.LinearOperator): if self._ofac is not None: return self._ofac maxuv = ( - np.max(np.abs(self._args["uvw"][:, 0:2]), axis=0) - * np.max(self._args["freq"]) - / SPEEDOFLIGHT + np.max(np.abs(self._args["uvw"][:, 0:2]), axis=0) + * np.max(self._args["freq"]) + / SPEEDOFLIGHT ) hspace = self._domain[0].get_default_codomain() hvol = np.array(hspace.shape) * np.array(hspace.distances) / 2 @@ -308,7 +317,7 @@ class SingleResponse(ift.LinearOperator): for xx, yy in product(range(nfacets_x), range(nfacets_y)): cx = ((0.5 + xx) / nfacets_x - 0.5) * self._args["pixsize_x"] * npix_x cy = ((0.5 + yy) / nfacets_y - 0.5) * self._args["pixsize_y"] * npix_y - facet = x[nx * xx : nx * (xx + 1), ny * yy : ny * (yy + 1)] + facet = x[nx * xx: nx * (xx + 1), ny * yy: ny * (yy + 1)] foo = dirty2vis( dirty=facet, center_x=cx, @@ -340,5 +349,5 @@ class SingleResponse(ift.LinearOperator): verbosity=verbosity(), **self._args ) - res[nx * xx : nx * (xx + 1), ny * yy : ny * (yy + 1)] = im + res[nx * xx: nx * (xx + 1), ny * yy: ny * (yy + 1)] = im return res diff --git a/test/test_general.py b/test/test_general.py index 5ec89ebc45b01fdde333752a1f16128cd1dbb3f8..4936fb2f23928e1b2a606bec1e446da37c64c160 100644 --- a/test/test_general.py +++ b/test/test_general.py @@ -71,6 +71,12 @@ def test_legacy_load(): rve.Observation.legacy_load(f"{direc}legacy.npz") +def test_flag_baseline(): + ms = f"{direc}CYG-D-6680-64CH-10S.ms" + obs = rve.ms2observations(ms, "DATA", True, 0)[0] + obs.flag_baseline(3, 5) + + def try_operator(op): pos = ift.from_random(op.domain) op(pos)