Skip to content
Snippets Groups Projects
Commit 9f6abdb1 authored by Niclas Esser's avatar Niclas Esser
Browse files

Interpolation method

parent 8afcb7f4
No related branches found
No related tags found
No related merge requests found
Pipeline #255198 failed
This diff is collapsed.
......@@ -38,12 +38,15 @@ class WeightGenerator(Processor):
Returns:
np.ndarray: _description_
"""
x = np.asarray(self.idata[0].data)
weights = np.zeros((self.idata[0].getDimSize('T') // 2,
x = np.vstack([i.data for i in self.idata])
print(x.shape)
for i in self.idata:
print(i.data.shape)
weights = np.zeros((x.shape[0] // 2,
self.idata[0].getDimSize('F'),
self.idata[0].getDimSize('P'),
self.idata[0].getDimSize('E')), dtype='complex')
for t in range(0, self.idata[0].getDimSize('T'), 2):
for t in range(0, x.shape[0], 2):
acm_on = x[t]
acm_off = x[t*2+1]
acm_sn = np.matmul(np.linalg.inv(acm_off), acm_on)
......
......@@ -119,18 +119,7 @@ class Channelizer(Processor):
Returns:
np.ndarray: The output array
"""
self._generateFilter()
print(self.idata)
if np.iscomplexobj(self.idata[0].data):
reshaped = self.idata[0].data.reshape(self.idata[0].getDimSize('E'),
self.idata[0].getDimSize('P'),
-1, self.channels)
return applyShortTimeFFT(self.idata[0].data, self.channels, self.idata[0].meta["fs"]).transpose(2,0,1,3)
# return np.fft.fft(reshaped * self.coeff, axis=-1).transpose(3,0,1,2)
reshaped = self.idata[0].data.reshape(self.idata[0].getDimSize('E'),
self.idata[0].getDimSize('P'),
-1, self.channels*2)
return np.fft.rfft(reshaped * self.coeff, axis=-1).transpose(3,0,1,2)[1:]
return applyShortTimeFFT(self.idata[0].data, self.channels, self.idata[0].meta["fs"]).transpose(2,0,1,3)
#pylint: disable=R0801
def plot(self, path="", figsize=(8,4)):
......
......@@ -95,6 +95,28 @@ def weightSamples(acm, signals):
@ signals[:, i, j, np.newaxis]).flatten()
return signals
def interpolateACM(acms: np.ndarray, frequencies: np.ndarray, fs: float, n: int, k: int=3) -> np.ndarray:
"""Interpolate between ACMs
Args:
acms (np.ndarray): Array containing the ACMS N x M X M
N -> number of ACMs
M -> number of elements
frequencies (np.ndarray): 1D array, containing the frequency labels of the ACMs
fs (float): Sampling rate
n (int): Number of interpolation.
If N is 10 and n is 100, 10 ACMs are interpolate between 2 orginal ACMs
k (int, optional): _description_. Defaults to 3.
Returns:
np.ndarray: Returns the array contianing the interpolated ACMs n x M x M
"""
center_freq = frequencies[0] + (frequencies[-1] - frequencies[0]) / 2
fine_frequencies = np.fft.fftshift(
np.fft.fftfreq(n, d = 1/fs)) + center_freq
interpolater = interpolate.make_interp_spline(frequencies, acms, k=k)
return interpolater(fine_frequencies)
class Generator(Processor):
"""The Generator class generates wideband time series data for all simulated PAF elements.
......@@ -124,12 +146,13 @@ class Generator(Processor):
self.rfi = self.conf.get("rfi", [0])
self.duration = self.conf.get("duration", 1.0)
self.reader_type = self.conf.get("reader_type", "FrontendReader")
self.interpolate = self.conf.get("interpolate_factor", 10)
self.reader = getattr(dm, self.reader_type)(self.dataset)
self.sources_list = []
self.antennas = self.reader.nelements
self.pol = self.reader.npol
self.nchan = self.reader.nchan
self.fs = (self.reader.fmax("noise", self.noise) - self.reader.fmin("noise", self.noise))*2e9
self.nchan = self.reader.nchan * self.interpolate
self.fs = 1e9#(self.reader.fmax("noise", self.noise) - self.reader.fmin("noise", self.noise))*2e9
self.width = int(self.fs * self.duration)
if self.width % self.nchan:
self.width = ((self.width + self.nchan - 1) // self.nchan) * self.nchan
......@@ -156,9 +179,6 @@ class Generator(Processor):
"""
self.noise_series = self._generateNoiseSignals()
self.source_series = self._generateSourceSignals()
print("NOISE shape ", self.noise_series.shape)
print("SRC shape ", self.source_series.shape)
# return noise_series + source_series
return self.noise_series + self.source_series
......@@ -169,13 +189,13 @@ class Generator(Processor):
np.ndarray: Time series of individual elements
"""
acm = self.reader.noise(self.noise, "acm")
if self.interpolate > 1:
acm = interpolateACM(acm, self.reader.frequencies("noise", self.noise), self.fs, self.nchan)
signal = channelizedSignals(self.antennas, self.width, self.nchan, self.fs)
channelized_array = weightSamples(acm, signal)
# De-channelization
tmp = []
for e in range(self.antennas):
tmp.append(applyShortTimeFFT(channelized_array[e], self.nchan, self.fs, False))
return np.expand_dims(np.asarray(tmp), axis=1)[..., :-self.nchan//2]
return np.expand_dims(applyShortTimeFFT(channelized_array, self.nchan, self.fs, False)[..., :self.width], axis=1)
def _generateSourceSignals(self) -> np.ndarray:
......@@ -189,6 +209,8 @@ class Generator(Processor):
acm = self.reader.source(source.name, "acm")
# source.signal = source.gain * (np.random.randn(self.nchan, (self.width // self.nchan)) + 1j\
# * np.random.randn(self.nchan, (self.width // self.nchan))) / np.sqrt(2)
if self.interpolate > 1:
acm = interpolateACM(acm, self.reader.frequencies("noise", self.noise), self.fs, self.nchan)
source.signal = channelizedSignals(1, self.width, self.nchan, self.fs)
signal = np.tile(source.signal, [self.antennas, 1, 1])
channelized_array = weightSamples(acm, signal)
......@@ -197,7 +219,7 @@ class Generator(Processor):
for e in range(self.antennas):
tmp.append(applyShortTimeFFT(channelized_array[e], self.nchan, self.fs, False))
out += onOffCycle(np.expand_dims(np.asarray(tmp), axis=1) * source.gain, self.fs, source.period, source.percentage)
return out[..., :-self.nchan//2]
return out[..., :self.width]
#pylint: disable=R0801
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment