Skip to content
Snippets Groups Projects
Commit baf8e909 authored by User's avatar User
Browse files

[add]: hdf5 file reader: h5reader.py

[add]: signal generator given array correlation matrix: signal_generator.py
parent fb378100
No related branches found
No related tags found
1 merge request!3[add]: hdf5 file reader: h5reader.py
Pipeline #212423 passed
#!/usr/bin/env python3
import h5py
def open_hdf5_file(filename):
return h5py.File(filename)
def get_item_by_path(dataset, path):
indice = path.split('/')
indice_length = len(indice)
try:
for index in range(indice_length):
if index == indice_length - 1:
name = indice[index]
if name in dataset.attrs.keys():
dataset = dataset.attrs[name]
else:
data = dataset[name]
if data.ndim != 0:
dataset = data[:]
else:
dataset = data[()]
else:
dataset = dataset[indice[index]]
except Exception as e:
dataset = None
print(str(e))
return dataset
def get_structure_tree(h5_object):
def print_attrs(name, obj):
nonlocal string_pointer
level = name.count('/')
if level > 0:
shift = (level - 1) * ' ' + '' + ''
else:
shift = ''
item_name = name.split("/")[-1]
if hasattr(obj, 'nbytes'):
size_string = f' ({str(obj.size)})'
else:
size_string = ''
string_pointer += shift + item_name + size_string + '\n'
if hasattr(obj.attrs, 'items'):
for key, val in obj.attrs.items():
string_pointer += ' ' + shift + f"{key}: {val}" + '\n'
string_pointer = ""
h5_object.visititems(print_attrs)
return string_pointer
#!/usr/bin/env python3
import numpy as np
import sys
from matplotlib import pyplot as plt
import matplotlib
from scipy import interpolate
import h5reader
def generate_complex_samples(shape):
'''
Generate complex samples in given shape
arguments:
shape of the desired array of samples
returns:
uncorrelated samples in given shape
'''
shape[-1] = shape[-1] * 2
uncorrelated_samples = np.random.normal(
0, np.sqrt(2)/2, size=shape).astype("float64").view("complex128")
return uncorrelated_samples;
def get_eigen(matrix):
'''
Get eigen values and vectors for a matrix
arguments:
matrix
returns:
eigen values
eigen vectors
'''
eigenvalues, eigenvectors = np.linalg.eigh(matrix)
eigenvalues[eigenvalues < 0] = 0
return eigenvalues, eigenvectors
def create_interpolater(iv, matrix, k=3):
'''
Create an interpolater from the input matrices
arguments:
iv: independent variable
matrix: matrix to be interpolated
k: order
returns:
interpolater
'''
interpolater = interpolate.make_interp_spline(iv, matrix, k=k)
return interpolater
def generate_wide_band_response(dataset, length, signals, noises, rfis,
frequencies, fft_frequencies):
'''
Generate a wide band response
arguments:
dataset: dataset contains matrixes of array correlation matrix
length: shape of the desired samples
signals: array correlation matrix for signals
noises: array correlation matrix for noises
rfis: array correlation matrix for RFIs
frequencies: orignal frequencies for the array correlation matrix
fft_frequencies: frequencies to be interpolated on
returns:
wide band response
'''
signal_list = []
noise_list = []
rfi_list = []
for signal in signals:
signal_list.append(h5reader.get_item_by_path(dataset, signal))
# for noise in noises:
# noise_list.append(get_item_by_path(dataset, noise))
# for rfi in rfis:
# rfi_list.append(get_item_by_path(dataset, rfi))
signal_acm = signal_list[0]
element_num = signal_acm.shape[-1]
# signal_acm = generate_complex_samples([length, element_num, element_num])
interpolater = create_interpolater(frequencies, signal_acm)
# interpolated = interpolater(fft_frequencies, extrapolate=False)
interpolated = interpolater(fft_frequencies)
return interpolated
def plot_correlated_samples(interpolated, fft_frequencies, element_num, length):
'''
Plot correlated samples
arguments:
interpolated: interpolated samples
fft_frequencies: interpolated frequencies
element_num: number of elements
length: length of the samples
returns:
none
'''
random_complex = generate_complex_samples([length,])
# np.pad(random_complex, 500, 'constant', constant_values=0)
fourier_samples = np.fft.fft(random_complex)
# fourier_samples = np.ones(length*2).astype('float64').view(np.complex128)
fourier_samples_matrix = np.tile(fourier_samples, [element_num, 1])
for i in range(length):
eigenvalues, eigenvectors = get_eigen(interpolated[i])
fourier_samples_matrix[:, i] = (eigenvectors @ np.sqrt(np.diag(eigenvalues))
@ fourier_samples_matrix[:, i, np.newaxis]).flatten()
correlated = np.empty([element_num, element_num, length], dtype=np.complex128)
for row, elem1 in enumerate(fourier_samples_matrix):
for col, elem2 in enumerate(fourier_samples_matrix):
correlated[row, col] = elem1 * np.conjugate(elem2)
'''
dpi = 1080
fig = plt.figure(figsize=(4096/dpi, 4096/dpi), dpi=dpi)
ax = fig.add_subplot()
ax.scatter(fft_frequencies, np.angle(interpolated[:, 7, 11]),
c='blue', alpha=0.5, marker='+')
ax.scatter(fft_frequencies, np.angle(correlated[7, 11, :]),
c='yellow', alpha=0.5)
plt.savefig("single_element.png")
'''
# fig = plt.figure()
matplotlib.rcParams['axes.linewidth'] = 0.2
dpi = 1080
fig = plt.figure(figsize=(4096/dpi, 4096/dpi), dpi=dpi)
gs = fig.add_gridspec(element_num, element_num, hspace=0, wspace=0)
ax = gs.subplots(sharex='col', sharey='row')
correlated = np.empty([element_num, element_num, length], dtype=np.complex128)
for row, elem1 in enumerate(fourier_samples_matrix):
for col, elem2 in enumerate(fourier_samples_matrix):
correlated[row, col] = elem1 * np.conjugate(elem2)
ax[row, col].scatter(fft_frequencies/1e9,
np.angle(interpolated[:, row, col]),
c='blue', alpha=0.7, marker=',', s=0.01)
ax[row, col].scatter(fft_frequencies/1e9,
np.angle(correlated[row, col, :]),
c='yellow', marker=',', alpha=0.7, s=0.01)
ax[row, col].xaxis.set_tick_params(labelsize=3, width=0.2)
ax[row, col].yaxis.set_tick_params(labelsize=3, width=0.2)
# ax[row, col].xaxis.set_major_locator(plt.MaxNLocator(2))
fig.supxlabel('Frequencies (GHz)', fontsize=5)
fig.supylabel('Angle (Rad)', fontsize=5, horizontalalignment='right')
plt.subplots_adjust(0.08, 0.07, 0.99, 0.99, 0.0, 0.0)
plt.savefig("all_elements.png")
# time_series_list = np.fft.ifft(fourier_samples_matrix)
# gs = fig.add_gridspec(element_num, 1, hspace=0, wspace=0)
# ax = gs.subplots(sharex='col', sharey='row')
# for time_seris in enumerate(time_series_list
# ax
def main():
acm_file= sys.argv[1]
acm_matrix = h5reader.open_hdf5_file(acm_file)
signal_path = "acms/signal/0/acm"
sys_path = "acms/noise/0/sys/acm"
signal_frequency_path = "acms/signal/0/frequencies"
output_length = 1000
'''
structure_tree = h5reader.get_structure_tree(acm_matrix)
print(structure_tree)
'''
acm_signal = h5reader.get_item_by_path(acm_matrix, signal_path)
frequencies_signal = h5reader.get_item_by_path(acm_matrix, signal_frequency_path)
frequencies = frequencies_signal * 1e9
center_freq = frequencies[0] + (frequencies[-1] - frequencies[0]) / 2
fine_frequencies = np.fft.fftfreq(output_length, d = 1/(1e9)) + center_freq
interpolated_samples = generate_wide_band_response(acm_matrix, output_length,
[signal_path,] , None, None,frequencies, fine_frequencies)
plot_correlated_samples(interpolated_samples, fine_frequencies,
acm_signal.shape[-1], output_length)
acm_matrix.close()
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment