Source code for hermespy.hardware_loop.calibration.delay

# -*- coding: utf-8 -*-
"""
=================
Delay Calibration
=================
"""

from __future__ import annotations
from time import sleep
from typing import List, Type

from h5py import Group
import numpy as np

from hermespy.core import Serializable, Signal
from ..physical_device import DelayCalibrationBase, PhysicalDevice

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class DelayCalibration(DelayCalibrationBase, Serializable): """Static delay calibration""" yaml_tag = "DelayCalibration" __delay: float def __init__(self, delay: float, physical_device: PhysicalDevice | None = None) -> None: # Initialize base class DelayCalibrationBase.__init__(self, physical_device) # Initialize class attributes self.delay = delay @property def delay(self) -> float: return self.__delay @delay.setter def delay(self, value: float) -> None: self.__delay = value def to_HDF(self, group: Group) -> None: # Serialize attributes group.attrs["delay"] = self.delay @classmethod def from_HDF(cls: Type[DelayCalibration], group: Group) -> DelayCalibration: # Deserialize attributes delay = group.attrs["delay"] # Return new instance return cls(delay=delay)
[docs] @staticmethod def Estimate( device: PhysicalDevice, max_delay: float, num_iterations: int = 10, wait: float = 0.0 ) -> DelayCalibration: """Estimate a physical device's inherent transmit-receive delay. Ideally, the transmit and receive channels of the device should be connected by a patch cable. WARNING: An attenuator element may be required! Be careful!!!! Args: device (PhysicalDevice): The physical device to calibrate, i.e. the device of which a delay is to be estimated. max_delay (float): The maximum expected delay which the calibration should compensate for in seconds. num_iterations (int, optional): Number of calibration iterations. Default is 10. wait (float, optional): Idle time between iteration transmissions in seconds. Zero by default. Returns: An initialized delay calibration instance. """ if num_iterations < 1: raise ValueError("The number of iterations must be greater or equal to one") if wait < 0.0: raise ValueError("The waiting time must be greater or equal to zero") sampling_rate = device.max_sampling_rate num_samples = int(2 * max_delay * device.max_sampling_rate) if num_samples <= 1: raise ValueError( "The assumed maximum delay is not resolvable by the configured sampling rate" ) dirac_index = int(max_delay * sampling_rate) waveform = np.zeros((device.num_antennas, num_samples), dtype=complex) waveform[:, dirac_index] = 1.0 calibration_signal = Signal.Create(waveform, sampling_rate, device.carrier_frequency) propagated_signals: List[Signal] = [] propagated_dirac_indices = np.empty(num_iterations, dtype=int) # Make multiple iteration calls for calibration for n in range(num_iterations): propagated_signal = device.trigger_direct(calibration_signal) # Infer the implicit delay by estimating the sample index of the propagated dirac propagated_signals.append(propagated_signal) propagated_dirac_indices[n] = np.max([np.argmax(b[0]) for b in propagated_signal]) # Wait the configured amount of time between iterations sleep(wait) # Compute calibration delay # This is just a ML estimation mean_dirac_index = np.mean(propagated_dirac_indices) calibration_delay = (mean_dirac_index - dirac_index) / propagated_signal.sampling_rate # Visualize the results # Feature currently deactivated # fig, axes = plt.subplots(2, 1) # fig.suptitle("Device Delay Calibration") # axes[0].plot(calibration_signal.timestamps, abs(calibration_signal.getitem(0))) # for sig in propagated_signals: # axes[1].plot(sig.timestamps, abs(sig[0, :]), color="blue") # axes[1].axvline(x=(dirac_index / sampling_rate - calibration_delay), color="red") return DelayCalibration(calibration_delay)