Source code for hermespy.channel.quadriga.quadriga

# -*- coding: utf-8 -*-

from __future__ import annotations
from typing import Set


import numpy as np
from h5py import Group

from hermespy.core import ChannelStateInformation, ChannelStateFormat, SignalBlock
from ..channel import (
    Channel,
    ChannelRealization,
    ChannelSample,
    ChannelSampleHook,
    LinkState,
    InterpolationMode,
)
from .matlab import MatlabEngine
from .octave import Oct2Py

if MatlabEngine is not None:  # pragma: no cover
    from .matlab import QuadrigaMatlabInterface as QuadrigaInterface  # type: ignore
elif Oct2Py is not None:  # pragma: no cover
    from .octave import QuadrigaOctaveInterface as QuadrigaInterface  # type: ignore
else:  # pragma: no cover
    from .interface import QuadrigaInterface  # type: ignore

__author__ = "Tobias Kronauer"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Tobias Kronauer", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class QuadrigaChannelSample(ChannelSample): """Sample of a quadriga channel model.""" __path_gains: np.ndarray __path_delays: np.ndarray def __init__( self, path_gains: np.ndarray, path_delays: np.ndarray, gain: float, state: LinkState ) -> None: """ Args: path_gains (numpy.ndarray): Path gains. path_delays (numpy.ndarray): Path delays. gain (float): Channel gain. state (ChannelState): Channel state at which the sample was generated. """ # Initialize base class ChannelSample.__init__(self, state) # Initialize class attributes self.__gain = gain self.__path_gains = path_gains self.__path_delays = path_delays @property def path_gains(self) -> np.ndarray: """Path gains.""" return self.__path_gains @property def path_delays(self) -> np.ndarray: """Path delays.""" return self.__path_delays @property def expected_energy_scale(self) -> float: return self.__gain * float(np.sum(self.__path_gains)) def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock: max_delay_in_samples = int(np.round(np.max(self.path_delays) * self.bandwidth)) propagated_signal = np.zeros( ( self.transmitter_state.antennas.num_receive_antennas, signal.num_samples + max_delay_in_samples, ), dtype=np.complex128, ) for channel, delay in zip( self.path_gains.transpose((2, 0, 1)), self.path_delays.transpose((2, 0, 1)) ): time_delay = int(np.round(delay * self.bandwidth)) propagated_signal[:, time_delay : time_delay + signal.num_samples] += channel @ signal propagated_signal *= np.sqrt(self.__gain) return SignalBlock(propagated_signal, signal._offset)
[docs] def state( self, num_samples: int, max_num_taps: int, interpolation_mode: InterpolationMode = InterpolationMode.NEAREST, ) -> ChannelStateInformation: max_delay_in_samples = int(np.round(np.max(self.path_delays) * self.bandwidth)) num_taps = min(max_num_taps, max_delay_in_samples + 1) impulse_response = np.zeros( ( self.receiver_state.antennas.num_receive_antennas, self.transmitter_state.antennas.num_transmit_antennas, num_samples, num_taps, ), dtype=np.complex128, ) for channel, delay in zip( self.path_gains.transpose((2, 0, 1)), self.path_delays.transpose((2, 0, 1)) ): time_delay = int(np.round(delay * self.bandwidth)) impulse_response[:, :, :, time_delay] += channel impulse_response *= np.sqrt(self.__gain) return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, impulse_response)
[docs] class QuadrigaChannelRealization(ChannelRealization[QuadrigaChannelSample]): """Realization of a quadriga channel model.""" def __init__( self, interface: QuadrigaInterface, sample_hooks: Set[ChannelSampleHook[QuadrigaChannelSample]], gain: float, ) -> None: """ Args: quadriga_interface (QuadrigaInterface): Interface to the Quadriga channel model. sample_hooks (Set[ChannelSampleHook[QuadrigaChannelSample]]): Hooks to be called when a new sample is generated. gain (float): Linear channel power gain factor. """ # Initialize base class ChannelRealization.__init__(self, sample_hooks, gain) # Save interface settings self.__interface = interface def _sample(self, state: LinkState) -> QuadrigaChannelSample: # Execute the matlab backend to fetch a channel impulse response cirs = self.__interface.sample_quadriga(state) # Return the sample return QuadrigaChannelSample(cirs[0, 0].coefficients, cirs[0, 0].delays, self.gain, state) def _reciprocal_sample( self, sample: QuadrigaChannelSample, state: LinkState ) -> QuadrigaChannelSample: # pragma: no cover return self._sample(state)
[docs] def to_HDF(self, group: Group) -> None: group.attrs["gain"] = self.gain
[docs] @staticmethod def From_HDF( group: Group, quadriga_interface: QuadrigaInterface, sample_hooks: Set[ChannelSampleHook[QuadrigaChannelSample]], ) -> QuadrigaChannelRealization: return QuadrigaChannelRealization(quadriga_interface, sample_hooks, group.attrs["gain"])
[docs] class QuadrigaChannel(Channel[QuadrigaChannelRealization, QuadrigaChannelSample]): """Quadriga Channel Model. Maps the output of the :class:`QuadrigaInterface<hermespy.channel.quadriga_interface.QuadrigaInterface>` to fit into Hermes' software architecture. """ yaml_tag = "Quadriga" __interface: QuadrigaInterface | None # Reference to the interface class def __init__(self, *args, interface: QuadrigaInterface | None = None, **kwargs) -> None: """ Args: interface (QuadrigaInterface, optional): Specifies the consisdered Quadriga interface. """ # Init base channel class Channel.__init__(self, *args, **kwargs) # Save interface settings self.__interface = QuadrigaInterface() if interface is None else interface # type: ignore def _realize(self) -> QuadrigaChannelRealization: return QuadrigaChannelRealization(self.__interface, self.sample_hooks, self.gain)
[docs] def recall_realization(self, group: Group) -> QuadrigaChannelRealization: return QuadrigaChannelRealization.From_HDF(group, self.__interface, self.sample_hooks)