From ada0013a14cbffa8564b564495c0651675fa3dd4 Mon Sep 17 00:00:00 2001 From: Niclas Esser <nesser@mpifr-bonn.mpg.de> Date: Fri, 2 Aug 2024 11:15:52 +0200 Subject: [PATCH] Remove deprecated signal_generator folder --- pafsim/signal_generator/h5reader.py | 80 -------- pafsim/signal_generator/signal_generator.py | 205 -------------------- 2 files changed, 285 deletions(-) delete mode 100755 pafsim/signal_generator/h5reader.py delete mode 100755 pafsim/signal_generator/signal_generator.py diff --git a/pafsim/signal_generator/h5reader.py b/pafsim/signal_generator/h5reader.py deleted file mode 100755 index 1303f4c..0000000 --- a/pafsim/signal_generator/h5reader.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 - -import h5py - -def open_hdf5_file(filename): - ''' - Open hdf5 file - - arguments - filename of the hdf5 file - - returns - h5py.File object - ''' - return h5py.File(filename) - - -def get_item_by_path(dataset, path): - ''' - Get item by path - - arguments - dataset: h5py.File object - path: path of the item - - returns - item - ''' - indice = path.split('/') - indice_length = len(indice) - try: - for index in range(indice_length): - if index == indice_length - 1: - name = indice[index] - if name in dataset.attrs.keys(): - dataset = dataset.attrs[name] - else: - data = dataset[name] - if data.ndim != 0: - dataset = data[:] - else: - dataset = data[()] - else: - dataset = dataset[indice[index]] - except Exception as e: - dataset = None - print(str(e)) - - return dataset - -def get_structure_tree(h5_object): - ''' - Get structure tree of the hdf5 file - - arguments - h5_object: h5py.File object - - returns - string representation of the structure tree - ''' - def print_attrs(name, obj): - nonlocal string_pointer - level = name.count('/') - if level > 0: - shift = (level - 1) * ' ' + '└' + '─' - else: - shift = '' - item_name = name.split("/")[-1] - if hasattr(obj, 'nbytes'): - size_string = f' ({str(obj.size)})' - else: - size_string = '' - string_pointer += shift + item_name + size_string + '\n' - if hasattr(obj.attrs, 'items'): - for key, val in obj.attrs.items(): - string_pointer += ' ' + shift + f"{key}: {val}" + '\n' - - string_pointer = "" - h5_object.visititems(print_attrs) - return string_pointer diff --git a/pafsim/signal_generator/signal_generator.py b/pafsim/signal_generator/signal_generator.py deleted file mode 100755 index 4bf4a8b..0000000 --- a/pafsim/signal_generator/signal_generator.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python3 - -import numpy as np -import sys -from matplotlib import pyplot as plt -import matplotlib -from scipy import interpolate - -import h5reader - -def generate_complex_samples(shape): - ''' - Generate complex samples in given shape - - arguments: - shape of the desired array of samples - - returns: - uncorrelated samples in given shape - ''' - shape[-1] = shape[-1] * 2 - uncorrelated_samples = np.random.normal( - 0, np.sqrt(2)/2, size=shape).astype("float64").view("complex128") - return uncorrelated_samples; - -def get_eigen(matrix): - ''' - Get eigen values and vectors for a matrix - - arguments: - matrix - - returns: - eigen values - eigen vectors - ''' - eigenvalues, eigenvectors = np.linalg.eigh(matrix) - eigenvalues[eigenvalues < 0] = 0 - - return eigenvalues, eigenvectors - -def create_interpolater(iv, matrix, k=3): - ''' - Create an interpolater from the input matrices - - arguments: - iv: independent variable - matrix: matrix to be interpolated - k: order - - returns: - interpolater - ''' - interpolater = interpolate.make_interp_spline(iv, matrix, k=k) - return interpolater - -def generate_interpolated_acm(dataset, signals, noises, rfis, - frequencies, fft_frequencies): - ''' - Interpolate the acm through frequencies. - - arguments: - dataset: dataset contains matrixes of array correlation matrix - signals: array correlation matrix for signals - noises: array correlation matrix for noises - rfis: array correlation matrix for RFIs - frequencies: orignal frequencies for the array correlation matrix - fft_frequencies: frequencies to be interpolated on - - returns: - inteperolated acm - ''' - signal_list = [] - noise_list = [] - rfi_list = [] - - if signals is not None: - for signal in signals: - signal_list.append(h5reader.get_item_by_path(dataset, signal)) - if noises is not None: - for noise in noises: - noise_list.append(h5reader.get_item_by_path(dataset, noise)) - if rfis is not None: - for rfi in rfis: - rfi_list.append(get_item_by_path(dataset, rfi)) - - noise_list = np.array(noise_list) - signal_list = np.array(signal_list) - rfi_list = np.array(rfi_list) - - combined_acm = np.sum(signal_list, axis=0) + np.sum(noise_list, axis=0) + np.sum(rfi_list, axis=0) - element_num = combined_acm.shape[-1] - # signal_acm = generate_complex_samples([length, element_num, element_num]) - - acm_interpolater = create_interpolater(frequencies, combined_acm) - # interpolated = interpolater(fft_frequencies, extrapolate=False) - interpolated_acm = acm_interpolater(fft_frequencies) - - return interpolated_acm - -def generate_samples_from_acm(acm, length): - ''' - Generate samples from given acm and length - - arguments: - acm: array correlation matrix - length: shape of the desired samples - - returns: - correlated samples - ''' - random_complex = generate_complex_samples([length,]) - # np.pad(random_complex, 500, 'constant', constant_values=0) - fourier_samples = np.fft.fft(random_complex) - # fourier_samples = np.ones(length*2).astype('float64').view(np.complex128) - element_num = acm.shape[-1] - - fourier_samples_matrix = np.tile(fourier_samples, [element_num, 1]) - - #loop over the frequency bins - for i in range(length): - eigenvalues, eigenvectors = get_eigen(acm[i]) - fourier_samples_matrix[:, i] = (eigenvectors @ np.sqrt(np.diag(eigenvalues)) - @ fourier_samples_matrix[:, i, np.newaxis]).flatten() - - time_series = np.fft.ifft(fourier_samples_matrix) - - return time_series - - -def plot_interpolated_correlation(correlated_samples, interpolated_acm, - fft_frequencies, element_num, length): - ''' - Plot correlated samples - - arguments: - correlated_samples: correlated samples - interpolated_acm: interpolated array correlation matrix - fft_frequencies: interpolated frequencies - element_num: number of elements - length: length of the samples - - returns: - none - ''' - - # fig = plt.figure() - matplotlib.rcParams['axes.linewidth'] = 0.2 - dpi = 1080 - fig = plt.figure(figsize=(4096/dpi, 4096/dpi), dpi=dpi) - gs = fig.add_gridspec(element_num, element_num, hspace=0, wspace=0) - ax = gs.subplots(sharex='col', sharey='row') - - fourier_samples_matrix = np.fft.fft(correlated_samples) - correlated = np.empty([element_num, element_num, length], dtype=np.complex128) - for row, elem1 in enumerate(fourier_samples_matrix): - for col, elem2 in enumerate(fourier_samples_matrix): - correlated[row, col] = elem1 * np.conjugate(elem2) - - ax[row, col].scatter(fft_frequencies/1e9, - np.angle(interpolated_acm[:, row, col]), - c='blue', alpha=0.7, marker=',', s=0.01) - ax[row, col].scatter(fft_frequencies/1e9, - np.angle(correlated[row, col, :]), - c='yellow', marker=',', alpha=0.7, s=0.01) - ax[row, col].xaxis.set_tick_params(labelsize=3, width=0.2) - ax[row, col].yaxis.set_tick_params(labelsize=3, width=0.2) - # ax[row, col].xaxis.set_major_locator(plt.MaxNLocator(2)) - fig.supxlabel('Frequencies (GHz)', fontsize=5) - fig.supylabel('Angle (Rad)', fontsize=5, horizontalalignment='right') - plt.subplots_adjust(0.08, 0.07, 0.99, 0.99, 0.0, 0.0) - plt.savefig("all_elements.png") - -def main(): - - acm_file= sys.argv[1] - acm_matrix = h5reader.open_hdf5_file(acm_file) - - signal_path = "acms/signal/0/acm" - t_noise_path = "acms/noise/0/t/acm" - lna_noise_path = "acms/noise/0/lna/acm" - signal_frequency_path = "acms/signal/0/frequencies" - output_length = 1000 - - ''' - structure_tree = h5reader.get_structure_tree(acm_matrix) - print(structure_tree) - ''' - acm_signal = h5reader.get_item_by_path(acm_matrix, signal_path) - frequencies_signal = h5reader.get_item_by_path(acm_matrix, signal_frequency_path) - frequencies = frequencies_signal * 1e9 - center_freq = frequencies[0] + (frequencies[-1] - frequencies[0]) / 2 - fine_frequencies = np.fft.fftfreq(output_length, d = 1/(1e9)) + center_freq - - interpolated_acm = generate_interpolated_acm(acm_matrix, - [signal_path,], [t_noise_path, lna_noise_path], None, frequencies, fine_frequencies) - acm_matrix.close() - - correlated_samples = generate_samples_from_acm(interpolated_acm, output_length) - plot_interpolated_correlation(correlated_samples, interpolated_acm, - fine_frequencies, acm_signal.shape[-1], output_length) - - -if __name__ == "__main__": - main() -- GitLab