Source code for hermespy.hardware_loop.calibration.leakage

# -*- coding: utf-8 -*-
"""
===================
Leakage Calibration
===================
"""

from __future__ import annotations
from typing import Tuple, Type

import matplotlib.pyplot as plt
import numpy as np
from h5py import Group
from numpy.linalg import svd
from scipy.fft import fft, ifft
from scipy.signal import convolve, find_peaks, peak_widths

from hermespy.core import Serializable, Signal, VAT
from ..physical_device import LeakageCalibrationBase, PhysicalDevice
from .delay import DelayCalibration

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class SelectiveLeakageCalibration(LeakageCalibrationBase, Serializable): """Calibration of a frequency-selective leakage model.""" yaml_tag = "SelectiveLeakageCalibration" __leakage_response: np.ndarray # Impulse response of the leakage model __sampling_rate: float # Sampling rate of the leakage model __delay: float # Implicit delay of the leakage model def __init__( self, leakage_response: np.ndarray, sampling_rate: float, delay: float = 0.0, physical_device: PhysicalDevice | None = None, ) -> None: """ Args: leakage_response (numpy.ndarray): The leakage impulse response matrix. sampling_rate (float): The sampling rate of the leakage model in Hz. delay (float, optional): The implicit delay of the leakage model in seconds. Defaults to zero. """ if leakage_response.ndim != 3: raise ValueError( f"Leakage response matrix must be a three-dimensional array (has {leakage_response.ndim} dimensions)" ) if sampling_rate <= 0.0: raise ValueError(f"Sampling rate must be non-negative (not {sampling_rate} Hz)") if delay < 0.0: raise ValueError(f"Delay must be non-negative (not {delay} seconds)") # Initialize base class LeakageCalibrationBase.__init__(self) # Initialize class attributes self.__leakage_response = leakage_response self.__sampling_rate = sampling_rate self.__delay = delay @property def leakage_response(self) -> np.ndarray: """Leakage impulse response matrix. Returns: Numpy matrix of dimensions :math:`M \\times N \\times L`, where :math:`M` is the number of receive streams and :math:`N` is the number of transmit streams and :math:`L` is the number of samples in the impulse response. """ return self.__leakage_response @property def sampling_rate(self) -> float: """Sampling rate of the leakage model in Hz.""" return self.__sampling_rate @property def delay(self) -> float: """Implicit delay of the leakage model in seconds.""" return self.__delay
[docs] def predict_leakage(self, transmitted_signal: Signal) -> Signal: if transmitted_signal.num_streams != self.leakage_response.shape[1]: raise ValueError( f"Transmitted signal has unxpected number of streams ({transmitted_signal.num_streams} instead of {self.leakage_response.shape[1]})" ) predicted_signal_samples = np.zeros( ( self.leakage_response.shape[0], transmitted_signal.num_samples + self.leakage_response.shape[2] - 1, ), dtype=np.complex128, ) for m, n in np.ndindex(self.leakage_response.shape[0], transmitted_signal.num_streams): # The leaked signal is the convolution of the transmitted signal with the leakage response predicted_signal_samples[m, :] += convolve( self.__leakage_response[m, n, :], transmitted_signal.getitem(n).flatten() ) return transmitted_signal.from_ndarray(predicted_signal_samples)
[docs] def remove_leakage(self, transmitted_signal: Signal, received_signal: Signal) -> Signal: if transmitted_signal.num_streams != self.leakage_response.shape[1]: raise ValueError( f"Transmitted signal has unxpected number of streams ({transmitted_signal.num_streams} instead of {self.leakage_response.shape[1]})" ) if received_signal.num_streams != self.leakage_response.shape[0]: raise ValueError( f"Received signal has unxpected number of streams ({received_signal.num_streams} instead of {self.leakage_response.shape[0]})" ) if transmitted_signal.sampling_rate != received_signal.sampling_rate: raise ValueError( f"Transmitted and received signal must have the same sampling rate ({transmitted_signal.sampling_rate} != {received_signal.sampling_rate})" ) if transmitted_signal.carrier_frequency != received_signal.carrier_frequency: raise ValueError( f"Transmitted and received signal must have the same carrier frequency ({transmitted_signal.carrier_frequency} != {received_signal.carrier_frequency})" ) # Compute the delay correction delay_correction = transmitted_signal.delay + received_signal.delay # The received signal is corrected by subtracting the leaked samples corrected_signal = received_signal.copy() # If nothing has been transmitted, abort if transmitted_signal.num_samples < 1: return corrected_signal # Compute the implicit delay shift in samples delay_sample_shift = round((self.delay - delay_correction) * received_signal.sampling_rate) for m, n in np.ndindex(received_signal.num_streams, transmitted_signal.num_streams): # The leaked signal is the convolution of the transmitted signal with the leakage response predicted_siso_signal = convolve( self.__leakage_response[m, n, :], transmitted_signal.getitem(n).flatten() ) # The correction is achieved by subtracting the leaked signal from the received signal if delay_sample_shift >= 0: leak_range = min( delay_sample_shift + len(predicted_siso_signal), corrected_signal.num_samples ) min_num_samples = min( corrected_signal.num_samples - delay_sample_shift, len(predicted_siso_signal) ) corrected_signal[m, delay_sample_shift:leak_range] = ( corrected_signal.getitem((m, slice(delay_sample_shift, leak_range))) - predicted_siso_signal[:min_num_samples] ) else: leak_range = min( delay_sample_shift + len(predicted_siso_signal), corrected_signal.num_samples + delay_sample_shift, ) min_num_samples = min(corrected_signal.num_samples, len(predicted_siso_signal)) corrected_signal[m, 0:leak_range] = ( corrected_signal.getitem((m, slice(0, leak_range))) - predicted_siso_signal[-delay_sample_shift:min_num_samples] ) return corrected_signal
[docs] def plot(self) -> Tuple[plt.FigureBase, VAT]: """Plot the leakage response in the time and frequency domain.""" num_tx = self.__leakage_response.shape[1] num_rx = self.__leakage_response.shape[0] figure, axes = plt.subplots(num_rx, num_tx, squeeze=False) max_amplitude = 0.0 sample_instances = np.arange(self.__leakage_response.shape[2]) / self.__sampling_rate for m, n in np.ndindex(num_rx, num_tx): # frequency_bins = fftshift(fftfreq(self.__leakage_response.shape[2])) samples = np.abs(self.__leakage_response[m, n, :]) max_amplitude = max(samples.max(), max_amplitude) axes[m, n].plot(sample_instances, samples) # freq_axes.plot( # frequency_bins, # fftshift(abs(fft(self.__leakage_response[m, n, :]))), # label=f"Tx: {n} Rx{m}", # ) axes[m, n].set_xlabel("Time [s]") axes[m, n].set_ylabel("Absolute Amplitude") # freq_axes.set_xlabel("Frequency [Hz]") # Set the y-axis limits to the maximum amplitude for ax in axes.flat: ax.set_ylim(0, max_amplitude) return figure, axes
def to_HDF(self, group: Group) -> None: self._write_dataset(group, "leakage_response", self.leakage_response) group.attrs["sampling_rate"] = self.sampling_rate group.attrs["delay"] = self.delay @classmethod def from_HDF( cls: Type[SelectiveLeakageCalibration], group: Group ) -> SelectiveLeakageCalibration: leakage_response = np.asarray(group.get("leakage_response"), dtype=np.complex128) sampling_rate = group.attrs.get("sampling_rate") delay = group.attrs.get("delay") return SelectiveLeakageCalibration(leakage_response, sampling_rate, delay)
[docs] def estimate_delay(self) -> DelayCalibration: """Estimate the delay of the leakage model. Returns: The delay of the leakage model in seconds. """ # The delay is estimated by finding the maximum of the absolute value of the leakage response delay = float(np.argmax(np.abs(self.leakage_response)) / self.sampling_rate + self.delay) return DelayCalibration(delay)
@staticmethod def __probe_leakage( device: PhysicalDevice, num_probes: int, num_wavelet_samples: int ) -> Tuple[np.ndarray, np.ndarray]: if num_probes < 1: raise ValueError(f"Number of probes must be greater than zero (not {num_probes})") if num_wavelet_samples < 1: raise ValueError( f"Number of samples must be greater than zero (not {num_wavelet_samples})" ) # Generate zadoff-chu sequences to probe the device leakage cf = num_wavelet_samples % 2 q = 1 sample_indices = np.arange(num_wavelet_samples) probe_indices = np.arange(1, 1 + num_probes) zadoff_chu_sequences = np.exp( -1j * np.pi * np.outer(probe_indices, sample_indices * (sample_indices + cf + 2 * q)) / num_wavelet_samples ) # Replicate the (periodic) ZC waveform such that any window of # num_wavelet_samples will always contain an entire ZC sequence # (time-shifted). Essentially, build a huge CP and CS around the center # ZC sequence. At the receiver, we will focus on receiving the second # (i.e. center) ZC waveform. probing_waveforms = np.tile(zadoff_chu_sequences, (1, 3)) probing_frequencies = fft(zadoff_chu_sequences, axis=1, norm="ortho") num_samples = probing_waveforms.shape[1] # Collect received samples received_waveforms = np.zeros( ( num_probes, device.antennas.num_receive_ports, device.antennas.num_transmit_ports, num_wavelet_samples, ), dtype=np.complex128, ) for p, n in np.ndindex(num_probes, device.antennas.num_transmit_ports): tx_samples = np.zeros( (device.antennas.num_transmit_ports, num_samples), dtype=np.complex128 ) tx_samples[n, :] = probing_waveforms[p, :] tx_signal = Signal.Create( tx_samples, sampling_rate=device.sampling_rate, carrier_frequency=device.carrier_frequency, ) rx_signal = device.trigger_direct(tx_signal, calibrate=False) # From the received signal, collect the middle num_wavelet_samples of the transmitted # ZC sequences. This should account for any delays of the transmission, as long as the # sequence is long enough. If it's not long enough, the leakage calculation will fail. # TODO: look at the estimated delay and its reliability. If the delay cannot be estimated # reliably, most probably, the TX signal was not received in the window decided here start = num_wavelet_samples received_waveforms[p, :, n, :] = rx_signal.getitem( (slice(None, None), slice(start, start + num_wavelet_samples)) ) # Compute received frequency spectra received_frequencies = fft(received_waveforms, axis=3, norm="ortho") # Return the collected probing and received frequency spectra return probing_frequencies, received_frequencies
[docs] @staticmethod def LeastSquaresEstimate( device: PhysicalDevice, num_probes: int = 7, num_wavelet_samples: int = 4673, configure_device: bool = True, filter_calibration: bool = True, ) -> SelectiveLeakageCalibration: """Estimate the transmit-receive leakage for a physical device using Leat-Squares estimation. Args: device (PhysicalDevice): Physical device to estimate the covariance matrix for. num_probes (int, optional): Number of probings transmitted to estimate the covariance matrix. :math:`7` by default. num_wavelet_samples (int, optional): Number of samples transmitted per probing to estimate the covariance matrix. :math:`4673` by default. configure_device (bool, optional): Configure the specified device by the estimated leakage calibration. Enabled by default. filter_calibration (bool, optional): Filter the estimated calibration to consider only prominent peaks. Enabled by default. Returns: The initialized :class:`SelectiveLeakageCalibration` instance. Raises: ValueError: If the number of probes is not strictly positive. ValueError: If the number of samples is not strictly positive. """ # Probe the device leakage probing_frequencies, received_frequencies = SelectiveLeakageCalibration.__probe_leakage( device, num_probes, num_wavelet_samples ) num_samples = probing_frequencies.shape[1] estimated_frequency_response = np.zeros( (device.antennas.num_receive_ports, device.antennas.num_transmit_ports, num_samples), dtype=np.complex128, ) for m, n in np.ndindex( device.antennas.num_receive_ports, device.antennas.num_transmit_ports ): # Select the transmitted and received frequency spectra for the current antenna pairs rx_frequencies = received_frequencies[:, m, n, :] tx_frequencies = probing_frequencies[:, :] # Estimate the frequency-selectivity by least-squares estimation Rx = rx_frequencies Tx = tx_frequencies # Solve for X (i.e. the channel frequency response) in the least-squares sense: # Minimize \|Rx - X*Tx\|^2 under the constraint that X is diagonal. # See https://math.stackexchange.com/a/3502842/397295 # results in this expression: # x_ls = np.diag(Tx.conj().T.dot(Rx)) / np.diag(Tx.conj().T.dot(Tx)) # optimized version: x_ls = np.sum(Tx.conj() * Rx, axis=0) / np.sum(Tx.conj() * Tx, axis=0) estimated_frequency_response[m, n, :] = x_ls # Convert the estimated leakage-response into the time-domain leakage_response = ifft(estimated_frequency_response, norm="backward") # Only consider the leakage response around the highest peak if filter_calibration: summed_response = np.sum(abs(leakage_response), axis=(0, 1), keepdims=False) peaks: np.ndarray = find_peaks(summed_response, 0.25 * summed_response.max())[0] if peaks.size != 0: widths: np.ndarray = np.ceil( peak_widths(summed_response, peaks, rel_height=0.75)[0] ) new_leakage_response = np.zeros_like(leakage_response) for p, w in zip(peaks, widths): p_start = max(0, int(p - w // 2)) p_end = min(leakage_response.shape[2], 1 + int(p + w // 2)) new_leakage_response[:, :, p_start:p_end] = leakage_response[ :, :, p_start:p_end ] leakage_response = new_leakage_response calibration = SelectiveLeakageCalibration( leakage_response, device.sampling_rate, device.delay_calibration.delay ) # Configure the device with the estimated calibration if the respective flag is enabled if configure_device: device.leakage_calibration = calibration return calibration
[docs] @staticmethod def MMSEEstimate( device: PhysicalDevice, num_probes: int = 7, num_wavelet_samples: int = 127, noise_power: np.ndarray | None = None, configure_device: bool = True, ) -> SelectiveLeakageCalibration: """Estimate the transmit receive leakage for a physical device using Minimum Mean Square Error (MMSE) estimation. Args: device (PhysicalDevice): Physical device to estimate the covariance matrix for. num_probes (int, optional): Number of probings transmitted to estimate the covariance matrix. :math:`7` by default. num_wavelet_samples (int, optional): Number of samples transmitted per probing to estimate the covariance matrix. :math:`127` by default. noise_power (numpy.ndarray, optional): Noise power at the receiving antennas. If not specified, the device's noise power configuration will be assumed or estimated on-the-fly. configure_device (bool, optional): Configure the specified device by the estimated leakage calibration. Enabled by default. Returns: The initialized :class:`SelectiveLeakageCalibration` instance. Raises: ValueError: If the number of probes is not strictly positive. ValueError: If the number of samples is not strictly positive. ValueError: If the noise power is negative. """ # Probe the device leakage probing_frequencies, received_frequencies = SelectiveLeakageCalibration.__probe_leakage( device, num_probes, num_wavelet_samples ) num_samples = probing_frequencies.shape[1] # Estimate noise power if not specified if noise_power is None: if device.noise_power is None: noise_power = device.estimate_noise_power() else: noise_power = device.noise_power if np.any(noise_power < 0.0): raise ValueError(f"Noise power must be non-negative (not {noise_power})") if noise_power.ndim != 1 or noise_power.shape[0] != device.antennas.num_receive_ports: raise ValueError("Noise power has invalid dimensions") # Estimate frequency spectra via MMSE estimation # https://nowak.ece.wisc.edu/ece830/ece830_fall11_lecture20.pdf # Page 2 Example 1 h = np.zeros((num_samples * num_probes, num_samples), dtype=np.complex128) for p, probe in enumerate(probing_frequencies): h[p * num_samples : (1 + p) * num_samples, :] = np.diag(probe) # For now, the noise power is assumed to be the mean over all receive chains mean_noise_power = np.mean(noise_power) # Compute the pseudo-inverse of the received frequency spectra by singular value decomposition u, s, vh = svd( h @ h.T.conj() + mean_noise_power * np.eye(num_samples * num_probes, num_samples * num_probes), full_matrices=False, hermitian=True, ) u_select = u[:, :num_samples] s_select = s[:num_samples] vh_select = vh[:num_samples, :] mmse_estimator = h.T.conj() @ vh_select.T.conj() @ np.diag(1 / s_select) @ u_select.T.conj() # Estimate the frequency spectra for each antenna probing independently mmse_frequency_selectivity_estimation = np.zeros( (device.antennas.num_receive_ports, device.antennas.num_transmit_ports, num_samples), dtype=np.complex128, ) for m, n in np.ndindex( device.antennas.num_receive_ports, device.antennas.num_transmit_ports ): probing_estimation = mmse_estimator @ received_frequencies[:, m, n, :].flatten() mmse_frequency_selectivity_estimation[m, n, :] = probing_estimation # Initialize the calibration from the estimated frequency spectra leakage_response = ifft(mmse_frequency_selectivity_estimation, axis=2)[ :, :, :num_wavelet_samples ] calibration = SelectiveLeakageCalibration( leakage_response, device.sampling_rate, device.delay_calibration.delay ) # Configure the device with the estimated calibration if the respective flag is enabled if configure_device: device.leakage_calibration = calibration return calibration