Skip to content
Snippets Groups Projects
Commit e1427da9 authored by Philipp Arras's avatar Philipp Arras
Browse files

Merge branch 'master' into devel

parents ebfc4936 5baca2aa
No related branches found
No related tags found
1 merge request!27Devel
Pipeline #110610 passed
......@@ -27,3 +27,4 @@ from .polarization_space import *
from .response import MfResponse, ResponseDistributor, StokesIResponse, SingleResponse
from .simple_operators import *
from .util import *
from .extra import mpi_load
......@@ -188,6 +188,7 @@ class SingleDishObservation(BaseObservation):
freq : numpy.ndarray
Contains the measured frequencies. Shape (n_channels)
"""
def __init__(self, pointings, data, weight, polarization, freq):
my_assert_isinstance(pointings, Directions)
my_assert_isinstance(polarization, Polarization)
......@@ -337,7 +338,7 @@ class Observation(BaseObservation):
@staticmethod
def load(file_name, lo_hi_index=None):
dct = dict(np.load(file_name))
dct = np.load(file_name)
antpos = []
for ii in range(4):
val = dct[f"antpos{ii}"]
......@@ -361,16 +362,18 @@ class Observation(BaseObservation):
# /Load auxtables
pol = Polarization.from_list(dct["polarization"])
slc = slice(None) if lo_hi_index is None else slice(*lo_hi_index)
# FIXME Put barrier here that makes sure that only one full Observation is loaded at a time
return Observation(
AntennaPositions.from_list(antpos),
dct["vis"][..., slc],
dct["weight"][..., slc],
pol,
dct["freq"][slc],
auxtables
)
vis = dct["vis"]
wgt = dct["weight"]
freq = dct["freq"]
if lo_hi_index is not None:
slc = slice(*lo_hi_index)
# Convert view into its own array
vis = vis[..., slc].copy()
wgt = wgt[..., slc].copy()
freq = freq[slc].copy()
del dct
antpos = AntennaPositions.from_list(antpos)
return Observation(antpos, vis, wgt, pol, freq, auxtables)
@staticmethod
def legacy_load(file_name, lo_hi_index=None):
......@@ -417,33 +420,28 @@ class Observation(BaseObservation):
if comm is None:
local_imaging_bands = range(n_imaging_bands)
else:
local_imaging_bands = range(
*ift.utilities.shareRange(
n_imaging_bands, comm.Get_size(), comm.Get_rank()
)
)
lo, hi = ift.utilities.shareRange( n_imaging_bands, comm.Get_size(), comm.Get_rank())
local_imaging_bands = range(lo, hi)
full_obs = Observation.load(file_name)
obs_list = [
full_obs.get_freqs_by_slice(
slice(*ift.utilities.shareRange(len(global_freqs), n_imaging_bands, ii))
)
for ii in local_imaging_bands
]
obs_list = []
for ii in local_imaging_bands:
slc = slice(*ift.utilities.shareRange(len(global_freqs), n_imaging_bands, ii))
obs_list.append(full_obs.get_freqs_by_slice(slc))
nu0 = global_freqs.mean()
return obs_list, nu0
def __getitem__(self, slc):
def __getitem__(self, slc, copy=False):
# FIXME Do I need to change something in self._auxiliary_tables?
return Observation(
self._antpos[slc],
self._vis[:, slc],
self._weight[:, slc],
self._polarization,
self._freq,
self._auxiliary_tables,
)
def get_freqs(self, frequency_list):
ap = self._antpos[slc]
vis = self._vis[slc]
wgt = self._weight[:, slc]
if copy:
ap = ap.copy()
vis = vis.copy()
wgt = wgt.copy()
return Observation(ap, vis, wgt, self._polarization, self._freq, self._auxiliary_tables)
def get_freqs(self, frequency_list, copy=False):
"""Return observation that contains a subset of the present frequencies
Parameters
......@@ -453,18 +451,18 @@ class Observation(BaseObservation):
"""
mask = np.zeros(self.nfreq, dtype=bool)
mask[frequency_list] = 1
return self.get_freqs_by_slice(mask)
return self.get_freqs_by_slice(mask, copy)
def get_freqs_by_slice(self, slc):
def get_freqs_by_slice(self, slc, copy=False):
# FIXME Do I need to change something in self._auxiliary_tables?
return Observation(
self._antpos,
self._vis[..., slc],
self._weight[..., slc],
self._polarization,
self._freq[slc],
self._auxiliary_tables,
)
vis = self._vis[..., slc]
wgt = self._weight[..., slc]
freq = self._freq[slc]
if copy:
vis = vis.copy()
wgt = wgt.copy()
freq = freq.copy()
return Observation( self._antpos, vis, wgt, self._polarization, freq, self._auxiliary_tables)
def average_stokesi(self):
# FIXME Do I need to change something in self._auxiliary_tables?
......@@ -564,11 +562,26 @@ class Observation(BaseObservation):
new_ant1 = np.array([dct_inv[ii][0] for ii in range(len(atset))])
new_ant2 = np.array([dct_inv[ii][1] for ii in range(len(atset))])
new_uvw = ap.uvw
raise NotImplementedError("FIXME The implementation of this is not complete.")
# FIXME Find correct uvw
ap = self._antpos
ap = AntennaPositions(new_uvw, new_ant1, new_ant2, new_times)
return Observation(ap, new_vis, new_wgt, self._polarization, self._freq, self._auxiliary_tables)
def flag_baseline(self, ant1_index, ant2_index):
ant1 = self.antenna_positions.ant1
ant2 = self.antenna_positions.ant2
if ant1 is None or ant2 is None:
raise RuntimeError("The calibration information needed for flagging a baseline is not "
"available. Please import the measurement set with "
"`with_calib_info=True`.")
assert np.all(ant1 < ant2)
ind = np.logical_and(ant1 == ant1_index, ant2 == ant2_index)
wgt = self._weight.copy()
wgt[:, ind] = 0.
print(f"INFO: Flag baseline {ant1_index}-{ant2_index}, {np.sum(ind)}/{self.nrow} rows flagged.")
return Observation(self._antpos, self._vis, wgt, self._polarization, self._freq, self._auxiliary_tables)
@property
def uvw(self):
return self._antpos.uvw
......
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright(C) 2019-2021 Max-Planck-Society
# Author: Philipp Arras
import nifty8 as ift
from .mpi import master
from .data.observation import Observation
def split_data_file(data_path, n_task, target_folder, base_name, n_work, compress):
from os import makedirs
makedirs(target_folder, exist_ok=True)
obs = Observation.load(data_path)
for rank in range(n_task):
lo, hi = ift.utilities.shareRange(n_work, n_task, rank)
sliced_obs = obs.get_freqs_by_slice(slice(*(lo, hi)))
sliced_obs.save(f"{target_folder}/{base_name}_{rank}.npz", compress=compress)
def mpi_load(data_folder, base_name, full_data_set, n_work, comm=None, compress=False):
if master:
from os.path import isdir
if not isdir(data_folder):
split_data_file(full_data_set, comm.Get_size(), data_folder, base_name, n_work, compress)
if comm is None:
return Observation.load(full_data_set)
comm.Barrier()
return Observation.load(f"{data_folder}/{base_name}_{comm.Get_rank()}.npz")
......@@ -106,12 +106,17 @@ class MfResponse(ift.LinearOperator):
observation : Observation
Instance of the :class:`Observation` that represents the measured data.
frequency_domain : IRGSpace
Contains the :class:`IRGSpace` for the frequencies.
Contains the :class:`IRGSpace` for the frequencies. The coordinates of
the space can be either frequencies or normalized log-frequencies. In
the latter case set `log_freq` to True.
position_domain : nifty8.RGSpace
Contains the the :class:`nifty8.RGSpace` for the positions.
log_freq : bool
If true, the coordinates of `frequency_domain` are interpreted to be
normalized log-frequencies. Default is False.
"""
def __init__( self, observation, frequency_domain, position_domain):
def __init__(self, observation, frequency_domain, position_domain, log_freq=False):
my_assert_isinstance(observation, Observation)
# FIXME Add polarization support
my_assert(observation.npol in [1, 2])
......@@ -123,7 +128,11 @@ class MfResponse(ift.LinearOperator):
self._target = observation.vis.domain
self._capability = self.TIMES | self.ADJOINT_TIMES
if log_freq:
data_freq = np.log(observation.freq / observation.freq.mean())
else:
data_freq = observation.freq
my_assert(np.all(np.diff(data_freq) > 0))
sky_freq = np.array(frequency_domain.coordinates)
band_indices = self.band_indices(sky_freq, data_freq)
......
......@@ -71,6 +71,12 @@ def test_legacy_load():
rve.Observation.legacy_load(f"{direc}legacy.npz")
def test_flag_baseline():
ms = f"{direc}CYG-D-6680-64CH-10S.ms"
obs = rve.ms2observations(ms, "DATA", True, 0)[0]
obs.flag_baseline(3, 5)
def try_operator(op):
pos = ift.from_random(op.domain)
op(pos)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment