# -*- coding: utf-8 -*-
"""
===================
Leakage Calibration
===================
"""
from __future__ import annotations
from typing import Tuple, Type
from typing_extensions import override
import matplotlib.pyplot as plt
import numpy as np
from numpy.linalg import svd
from scipy.fft import fft, ifft
from scipy.signal import convolve, find_peaks, peak_widths
from hermespy.core import Signal, VAT, SerializationProcess, DeserializationProcess
from ..physical_device import LeakageCalibrationBase, PhysicalDevice
from .delay import DelayCalibration
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class SelectiveLeakageCalibration(LeakageCalibrationBase):
"""Calibration of a frequency-selective leakage model."""
__DEFAULT_DELAY: float = 0.0
__leakage_response: np.ndarray # Impulse response of the leakage model
__sampling_rate: float # Sampling rate of the leakage model
__delay: float # Implicit delay of the leakage model
def __init__(
self,
leakage_response: np.ndarray,
sampling_rate: float,
delay: float = __DEFAULT_DELAY,
physical_device: PhysicalDevice | None = None,
) -> None:
"""
Args:
leakage_response:
The leakage impulse response matrix.
sampling_rate:
The sampling rate of the leakage model in Hz.
delay:
The implicit delay of the leakage model in seconds.
Defaults to zero.
"""
if leakage_response.ndim != 3:
raise ValueError(
f"Leakage response matrix must be a three-dimensional array (has {leakage_response.ndim} dimensions)"
)
if sampling_rate <= 0.0:
raise ValueError(f"Sampling rate must be non-negative (not {sampling_rate} Hz)")
if delay < 0.0:
raise ValueError(f"Delay must be non-negative (not {delay} seconds)")
# Initialize base class
LeakageCalibrationBase.__init__(self)
# Initialize class attributes
self.__leakage_response = leakage_response
self.__sampling_rate = sampling_rate
self.__delay = delay
@property
def leakage_response(self) -> np.ndarray:
"""Leakage impulse response matrix.
Returns:
Numpy matrix of dimensions :math:`M \\times N \\times L`,
where :math:`M` is the number of receive streams and :math:`N` is the number of transmit streams and :math:`L` is the number of samples in the impulse response.
"""
return self.__leakage_response
@property
def sampling_rate(self) -> float:
"""Sampling rate of the leakage model in Hz."""
return self.__sampling_rate
@property
def delay(self) -> float:
"""Implicit delay of the leakage model in seconds."""
return self.__delay
[docs]
def predict_leakage(self, transmitted_signal: Signal) -> Signal:
if transmitted_signal.num_streams != self.leakage_response.shape[1]:
raise ValueError(
f"Transmitted signal has unxpected number of streams ({transmitted_signal.num_streams} instead of {self.leakage_response.shape[1]})"
)
predicted_signal_samples = np.zeros(
(
self.leakage_response.shape[0],
transmitted_signal.num_samples + self.leakage_response.shape[2] - 1,
),
dtype=np.complex128,
)
for m, n in np.ndindex(self.leakage_response.shape[0], transmitted_signal.num_streams):
# The leaked signal is the convolution of the transmitted signal with the leakage response
predicted_signal_samples[m, :] += convolve(
self.__leakage_response[m, n, :], transmitted_signal.getitem(n).flatten()
)
return transmitted_signal.from_ndarray(predicted_signal_samples)
[docs]
def remove_leakage(self, transmitted_signal: Signal, received_signal: Signal) -> Signal:
if transmitted_signal.num_streams != self.leakage_response.shape[1]:
raise ValueError(
f"Transmitted signal has unxpected number of streams ({transmitted_signal.num_streams} instead of {self.leakage_response.shape[1]})"
)
if received_signal.num_streams != self.leakage_response.shape[0]:
raise ValueError(
f"Received signal has unxpected number of streams ({received_signal.num_streams} instead of {self.leakage_response.shape[0]})"
)
if transmitted_signal.sampling_rate != received_signal.sampling_rate:
raise ValueError(
f"Transmitted and received signal must have the same sampling rate ({transmitted_signal.sampling_rate} != {received_signal.sampling_rate})"
)
if transmitted_signal.carrier_frequency != received_signal.carrier_frequency:
raise ValueError(
f"Transmitted and received signal must have the same carrier frequency ({transmitted_signal.carrier_frequency} != {received_signal.carrier_frequency})"
)
# Compute the delay correction
delay_correction = transmitted_signal.delay + received_signal.delay
# The received signal is corrected by subtracting the leaked samples
corrected_signal = received_signal.copy()
# If nothing has been transmitted, abort
if transmitted_signal.num_samples < 1:
return corrected_signal
# Compute the implicit delay shift in samples
delay_sample_shift = round((self.delay - delay_correction) * received_signal.sampling_rate)
for m, n in np.ndindex(received_signal.num_streams, transmitted_signal.num_streams):
# The leaked signal is the convolution of the transmitted signal with the leakage response
predicted_siso_signal = convolve(
self.__leakage_response[m, n, :], transmitted_signal.getitem(n).flatten()
)
# The correction is achieved by subtracting the leaked signal from the received signal
if delay_sample_shift >= 0:
leak_range = min(
delay_sample_shift + len(predicted_siso_signal), corrected_signal.num_samples
)
min_num_samples = min(
corrected_signal.num_samples - delay_sample_shift, len(predicted_siso_signal)
)
corrected_signal[m, delay_sample_shift:leak_range] = (
corrected_signal.getitem((m, slice(delay_sample_shift, leak_range)))
- predicted_siso_signal[:min_num_samples]
)
else:
leak_range = min(
delay_sample_shift + len(predicted_siso_signal),
corrected_signal.num_samples + delay_sample_shift,
)
min_num_samples = min(corrected_signal.num_samples, len(predicted_siso_signal))
corrected_signal[m, 0:leak_range] = (
corrected_signal.getitem((m, slice(0, leak_range)))
- predicted_siso_signal[-delay_sample_shift:min_num_samples]
)
return corrected_signal
[docs]
def plot(self) -> Tuple[plt.FigureBase, VAT]:
"""Plot the leakage response in the time and frequency domain."""
num_tx = self.__leakage_response.shape[1]
num_rx = self.__leakage_response.shape[0]
figure, axes = plt.subplots(num_rx, num_tx, squeeze=False)
max_amplitude = 0.0
sample_instances = np.arange(self.__leakage_response.shape[2]) / self.__sampling_rate
for m, n in np.ndindex(num_rx, num_tx):
# frequency_bins = fftshift(fftfreq(self.__leakage_response.shape[2]))
samples = np.abs(self.__leakage_response[m, n, :])
max_amplitude = max(samples.max(), max_amplitude)
axes[m, n].plot(sample_instances, samples)
# freq_axes.plot(
# frequency_bins,
# fftshift(abs(fft(self.__leakage_response[m, n, :]))),
# label=f"Tx: {n} Rx{m}",
# )
axes[m, n].set_xlabel("Time [s]")
axes[m, n].set_ylabel("Absolute Amplitude")
# freq_axes.set_xlabel("Frequency [Hz]")
# Set the y-axis limits to the maximum amplitude
for ax in axes.flat:
ax.set_ylim(0, max_amplitude)
return figure, axes
[docs]
def estimate_delay(self) -> DelayCalibration:
"""Estimate the delay of the leakage model.
Returns:
The delay of the leakage model in seconds.
"""
# The delay is estimated by finding the maximum of the absolute value of the leakage response
delay = float(np.argmax(np.abs(self.leakage_response)) / self.sampling_rate + self.delay)
return DelayCalibration(delay)
@staticmethod
def __probe_leakage(
device: PhysicalDevice, num_probes: int, num_wavelet_samples: int
) -> Tuple[np.ndarray, np.ndarray]:
if num_probes < 1:
raise ValueError(f"Number of probes must be greater than zero (not {num_probes})")
if num_wavelet_samples < 1:
raise ValueError(
f"Number of samples must be greater than zero (not {num_wavelet_samples})"
)
# Generate zadoff-chu sequences to probe the device leakage
cf = num_wavelet_samples % 2
q = 1
sample_indices = np.arange(num_wavelet_samples)
probe_indices = np.arange(1, 1 + num_probes)
zadoff_chu_sequences = np.exp(
-1j
* np.pi
* np.outer(probe_indices, sample_indices * (sample_indices + cf + 2 * q))
/ num_wavelet_samples
)
# Replicate the (periodic) ZC waveform such that any window of
# num_wavelet_samples will always contain an entire ZC sequence
# (time-shifted). Essentially, build a huge CP and CS around the center
# ZC sequence. At the receiver, we will focus on receiving the second
# (i.e. center) ZC waveform.
probing_waveforms = np.tile(zadoff_chu_sequences, (1, 3))
probing_frequencies = fft(zadoff_chu_sequences, axis=1, norm="ortho")
num_samples = probing_waveforms.shape[1]
# Collect received samples
received_waveforms = np.zeros(
(
num_probes,
device.antennas.num_receive_ports,
device.antennas.num_transmit_ports,
num_wavelet_samples,
),
dtype=np.complex128,
)
for p, n in np.ndindex(num_probes, device.antennas.num_transmit_ports):
tx_samples = np.zeros(
(device.antennas.num_transmit_ports, num_samples), dtype=np.complex128
)
tx_samples[n, :] = probing_waveforms[p, :]
tx_signal = Signal.Create(
tx_samples,
sampling_rate=device.sampling_rate,
carrier_frequency=device.carrier_frequency,
)
rx_signal = device.trigger_direct(tx_signal, calibrate=False)
# From the received signal, collect the middle num_wavelet_samples of the transmitted
# ZC sequences. This should account for any delays of the transmission, as long as the
# sequence is long enough. If it's not long enough, the leakage calculation will fail.
# TODO: look at the estimated delay and its reliability. If the delay cannot be estimated
# reliably, most probably, the TX signal was not received in the window decided here
start = num_wavelet_samples
received_waveforms[p, :, n, :] = rx_signal.getitem(
(slice(None, None), slice(start, start + num_wavelet_samples))
)
# Compute received frequency spectra
received_frequencies = fft(received_waveforms, axis=3, norm="ortho")
# Return the collected probing and received frequency spectra
return probing_frequencies, received_frequencies
[docs]
@staticmethod
def LeastSquaresEstimate(
device: PhysicalDevice,
num_probes: int = 7,
num_wavelet_samples: int = 4673,
configure_device: bool = True,
filter_calibration: bool = True,
) -> SelectiveLeakageCalibration:
"""Estimate the transmit-receive leakage for a physical device using Leat-Squares estimation.
Args:
device:
Physical device to estimate the covariance matrix for.
num_probes:
Number of probings transmitted to estimate the covariance matrix.
:math:`7` by default.
num_wavelet_samples:
Number of samples transmitted per probing to estimate the covariance matrix.
:math:`4673` by default.
configure_device:
Configure the specified device by the estimated leakage calibration.
Enabled by default.
filter_calibration:
Filter the estimated calibration to consider only prominent peaks.
Enabled by default.
Returns: The initialized :class:`SelectiveLeakageCalibration` instance.
Raises:
ValueError: If the number of probes is not strictly positive.
ValueError: If the number of samples is not strictly positive.
"""
# Probe the device leakage
probing_frequencies, received_frequencies = SelectiveLeakageCalibration.__probe_leakage(
device, num_probes, num_wavelet_samples
)
num_samples = probing_frequencies.shape[1]
estimated_frequency_response = np.zeros(
(device.antennas.num_receive_ports, device.antennas.num_transmit_ports, num_samples),
dtype=np.complex128,
)
for m, n in np.ndindex(
device.antennas.num_receive_ports, device.antennas.num_transmit_ports
):
# Select the transmitted and received frequency spectra for the current antenna pairs
rx_frequencies = received_frequencies[:, m, n, :]
tx_frequencies = probing_frequencies[:, :]
# Estimate the frequency-selectivity by least-squares estimation
Rx = rx_frequencies
Tx = tx_frequencies
# Solve for X (i.e. the channel frequency response) in the least-squares sense:
# Minimize \|Rx - X*Tx\|^2 under the constraint that X is diagonal.
# See https://math.stackexchange.com/a/3502842/397295
# results in this expression:
# x_ls = np.diag(Tx.conj().T.dot(Rx)) / np.diag(Tx.conj().T.dot(Tx))
# optimized version:
x_ls = np.sum(Tx.conj() * Rx, axis=0) / np.sum(Tx.conj() * Tx, axis=0)
estimated_frequency_response[m, n, :] = x_ls
# Convert the estimated leakage-response into the time-domain
leakage_response = ifft(estimated_frequency_response, norm="backward")
# Only consider the leakage response around the highest peak
if filter_calibration:
summed_response = np.sum(abs(leakage_response), axis=(0, 1), keepdims=False)
peaks: np.ndarray = find_peaks(summed_response, 0.25 * summed_response.max())[0]
if peaks.size != 0:
widths: np.ndarray = np.ceil(
peak_widths(summed_response, peaks, rel_height=0.75)[0]
)
new_leakage_response = np.zeros_like(leakage_response)
for p, w in zip(peaks, widths):
p_start = max(0, int(p - w // 2))
p_end = min(leakage_response.shape[2], 1 + int(p + w // 2))
new_leakage_response[:, :, p_start:p_end] = leakage_response[
:, :, p_start:p_end
]
leakage_response = new_leakage_response
calibration = SelectiveLeakageCalibration(
leakage_response, device.sampling_rate, device.delay_calibration.delay
)
# Configure the device with the estimated calibration if the respective flag is enabled
if configure_device:
device.leakage_calibration = calibration
return calibration
[docs]
@staticmethod
def MMSEEstimate(
device: PhysicalDevice,
num_probes: int = 7,
num_wavelet_samples: int = 127,
noise_power: np.ndarray | None = None,
configure_device: bool = True,
) -> SelectiveLeakageCalibration:
"""Estimate the transmit receive leakage for a physical device using Minimum Mean Square Error (MMSE) estimation.
Args:
device:
Physical device to estimate the covariance matrix for.
num_probes:
Number of probings transmitted to estimate the covariance matrix.
:math:`7` by default.
num_wavelet_samples:
Number of samples transmitted per probing to estimate the covariance matrix.
:math:`127` by default.
noise_power:
Noise power at the receiving antennas.
If not specified, the device's noise power configuration will be assumed or estimated on-the-fly.
configure_device:
Configure the specified device by the estimated leakage calibration.
Enabled by default.
Returns: The initialized :class:`SelectiveLeakageCalibration` instance.
Raises:
ValueError: If the number of probes is not strictly positive.
ValueError: If the number of samples is not strictly positive.
ValueError: If the noise power is negative.
"""
# Probe the device leakage
probing_frequencies, received_frequencies = SelectiveLeakageCalibration.__probe_leakage(
device, num_probes, num_wavelet_samples
)
num_samples = probing_frequencies.shape[1]
# Estimate noise power if not specified
if noise_power is None:
if device.noise_power is None:
noise_power = device.estimate_noise_power()
else:
noise_power = device.noise_power
if np.any(noise_power < 0.0):
raise ValueError(f"Noise power must be non-negative (not {noise_power})")
if noise_power.ndim != 1 or noise_power.shape[0] != device.antennas.num_receive_ports:
raise ValueError("Noise power has invalid dimensions")
# Estimate frequency spectra via MMSE estimation
# https://nowak.ece.wisc.edu/ece830/ece830_fall11_lecture20.pdf
# Page 2 Example 1
h = np.zeros((num_samples * num_probes, num_samples), dtype=np.complex128)
for p, probe in enumerate(probing_frequencies):
h[p * num_samples : (1 + p) * num_samples, :] = np.diag(probe)
# For now, the noise power is assumed to be the mean over all receive chains
mean_noise_power = np.mean(noise_power)
# Compute the pseudo-inverse of the received frequency spectra by singular value decomposition
u, s, vh = svd(
h @ h.T.conj()
+ mean_noise_power * np.eye(num_samples * num_probes, num_samples * num_probes),
full_matrices=False,
hermitian=True,
)
u_select = u[:, :num_samples]
s_select = s[:num_samples]
vh_select = vh[:num_samples, :]
mmse_estimator = h.T.conj() @ vh_select.T.conj() @ np.diag(1 / s_select) @ u_select.T.conj()
# Estimate the frequency spectra for each antenna probing independently
mmse_frequency_selectivity_estimation = np.zeros(
(device.antennas.num_receive_ports, device.antennas.num_transmit_ports, num_samples),
dtype=np.complex128,
)
for m, n in np.ndindex(
device.antennas.num_receive_ports, device.antennas.num_transmit_ports
):
probing_estimation = mmse_estimator @ received_frequencies[:, m, n, :].flatten()
mmse_frequency_selectivity_estimation[m, n, :] = probing_estimation
# Initialize the calibration from the estimated frequency spectra
leakage_response = ifft(mmse_frequency_selectivity_estimation, axis=2)[
:, :, :num_wavelet_samples
]
calibration = SelectiveLeakageCalibration(
leakage_response, device.sampling_rate, device.delay_calibration.delay
)
# Configure the device with the estimated calibration if the respective flag is enabled
if configure_device:
device.leakage_calibration = calibration
return calibration
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_array(self.leakage_response, "leakage_response")
process.serialize_floating(self.sampling_rate, "sampling_rate")
process.serialize_floating(self.delay, "delay")
[docs]
@override
@classmethod
def Deserialize(
cls: Type[SelectiveLeakageCalibration], process: DeserializationProcess
) -> SelectiveLeakageCalibration:
return cls(
process.deserialize_array("leakage_response", np.complex128),
process.deserialize_floating("sampling_rate"),
process.deserialize_floating("delay", cls.__DEFAULT_DELAY),
)