Source code for hermespy.channel.quadriga_channel

# -*- coding: utf-8 -*-

from __future__ import annotations
from math import ceil
from typing import Type


import numpy as np
from h5py import Group

from hermespy.core import (
    ChannelStateInformation,
    ChannelStateFormat,
    Device,
    HDFSerializable,
    Signal,
)
from hermespy.channel import Channel, ChannelRealization, InterpolationMode, QuadrigaInterface

__author__ = "Tobias Kronauer"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Tobias Kronauer", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class QuadrigaChannelRealization(ChannelRealization): """Realization of a Quadriga channel. Generated by the :meth:`realize<QuadrigaChannel.realize>` method of :class:`QuadrigaChannel`. """ __path_gains: np.ndarray __path_delays: np.ndarray def __init__( self, alpha_device: Device, beta_device: Device, gain: float, path_gains: np.ndarray, path_delays: np.ndarray, interpolation_mode: InterpolationMode = InterpolationMode.NEAREST, ) -> None: """ Args: alpha_device (Device): First device linked by the realized channel. beta_device (Device): Second device linked by the realized channel. gain (float): Channel gain. path_gains (np.ndarray): Path gains. path_delays (np.ndarray): Path delays. interpolation_mode (InterpolationMode, optional): Interpolation mode. Defaults to InterpolationMode.NEAREST. """ # Initialize base class ChannelRealization.__init__(self, alpha_device, beta_device, gain, interpolation_mode) # Initialize class attributes self.__path_gains = path_gains self.__path_delays = path_delays @property def path_gains(self) -> np.ndarray: """Path gains.""" return self.__path_gains @property def path_delays(self) -> np.ndarray: """Path delays.""" return self.__path_delays def _propagate( self, signal: Signal, transmitter: Device, receiver: Device, interpolation: InterpolationMode, ) -> Signal: if signal.num_samples > self.path_gains.shape[3]: raise ValueError( f"Quadriga channel realization does not support signals with more samples than the channel realization ({signal.num_samples} > {self.path_gains.shape[2]})." ) max_delay_in_samples = ceil(np.max(self.path_delays) * signal.sampling_rate) propagated_signal = np.zeros( (receiver.antennas.num_receive_antennas, signal.num_samples + max_delay_in_samples), dtype=np.complex_, ) for tx_antenna in range(transmitter.antennas.num_transmit_antennas): for rx_antenna in range(receiver.antennas.num_receive_antennas): # of dimension, #paths x #snap_shots, along the third dimension are the samples # choose first snapshot, i.e. assume static cir_txa_rxa = self.__path_gains[rx_antenna, tx_antenna, ::] tau_txa_rxa = self.__path_delays[rx_antenna, tx_antenna, ::] time_delay_in_samples_vec = np.around(tau_txa_rxa * signal.sampling_rate).astype( int ) for delay_idx, delay_in_samples in enumerate(time_delay_in_samples_vec): propagated_signal[ rx_antenna, delay_in_samples : delay_in_samples + signal.num_samples ] += ( cir_txa_rxa[delay_idx, : signal.num_samples] * signal.samples[tx_antenna, :] ) return Signal(propagated_signal, signal.sampling_rate, signal.carrier_frequency)
[docs] def state( self, transmitter: Device, receiver: Device, delay: float, sampling_rate: float, num_samples: int, max_num_taps: int, ) -> ChannelStateInformation: max_delay_in_samples = ceil(np.max(self.path_delays) * sampling_rate) num_taps = min(max_num_taps, max_delay_in_samples + 1) impulse_response = np.zeros( ( receiver.antennas.num_receive_antennas, transmitter.antennas.num_transmit_antennas, num_samples, num_taps, ), dtype=np.complex_, ) for tx_antenna in range(receiver.antennas.num_receive_antennas): for rx_antenna in range(transmitter.antennas.num_transmit_antennas): # of dimension, #paths x #snap_shots, along the third dimension are the samples # choose first snapshot, i.e. assume static cir_txa_rxa = self.path_gains[rx_antenna, tx_antenna, ::] tau_txa_rxa = self.path_delays[rx_antenna, tx_antenna, :] time_delay_in_samples_vec = np.around(tau_txa_rxa * sampling_rate).astype(int) time_delay_in_samples_vec = time_delay_in_samples_vec[ time_delay_in_samples_vec < num_taps ] for delay_idx, delay_in_samples in enumerate(time_delay_in_samples_vec): impulse_response[rx_antenna, tx_antenna, :, delay_in_samples] += cir_txa_rxa[ delay_idx ] impulse_response *= np.sqrt(self.gain) return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, impulse_response)
[docs] def to_HDF(self, group: Group) -> None: ChannelRealization.to_HDF(self, group) HDFSerializable._write_dataset(group, "path_gains", self.path_gains) HDFSerializable._write_dataset(group, "path_delays", self.path_delays)
[docs] @classmethod def From_HDF( cls: Type[QuadrigaChannelRealization], group: Group, alpha_device: Device, beta_device: Device, ) -> QuadrigaChannelRealization: params = cls._parameters_from_HDF(group) params["path_gains"] = np.array(group["path_gains"], dtype=np.complex_) params["path_delays"] = np.array(group["path_delays"], dtype=np.float_) return cls(alpha_device, beta_device, **params)
[docs] class QuadrigaChannel(Channel): """Quadriga Channel Model. Maps the output of the :class:`QuadrigaInterface<hermespy.channel.quadriga_interface.QuadrigaInterface>` to fit into Hermes' software architecture. """ yaml_tag = "Quadriga" __interface: QuadrigaInterface | None # Reference to the interface class def __init__(self, *args, interface: QuadrigaInterface | None = None, **kwargs) -> None: """ Args: interface (QuadrigaInterface, optional): Specifies the consisdered Quadriga interface. Defaults to None. """ # Init base channel class Channel.__init__(self, *args, **kwargs) # Save interface settings self.__interface = interface # Register this channel at the interface self._quadriga_interface.register_channel(self) def __del__(self) -> None: """Quadriga channel object destructor. Automatically un-registers channel objects at the interface. """ self._quadriga_interface.unregister_channel(self) @property def _quadriga_interface(self) -> QuadrigaInterface: """Access global Quadriga interface as property. Returns: QuadrigaInterface: Global Quadriga interface. """ return QuadrigaInterface.GlobalInstance() if self.__interface is None else self.__interface # type: ignore def _realize(self) -> ChannelRealization: # Query the quadriga interface for a new impulse response (path_gains, path_delays) = self._quadriga_interface.get_impulse_response(self) # Return the channel realization return QuadrigaChannelRealization( self.alpha_device, self.beta_device, self.gain, path_gains, path_delays, self.interpolation_mode, )
[docs] def recall_realization(self, group: Group) -> QuadrigaChannelRealization: return QuadrigaChannelRealization.From_HDF(group, self.alpha_device, self.beta_device)