Source code for hermespy.channel.sionna_rt_channel

# -*- coding: utf-8 -*-

from __future__ import annotations
from typing import Set, Any
from typing_extensions import override
from math import ceil

import numpy as np

from sionna import rt  # type: ignore

from hermespy.channel.channel import ChannelSampleHook, InterpolationMode, LinkState
from hermespy.core import (
    ChannelStateInformation,
    ChannelStateFormat,
    DeserializationProcess,
    SerializationProcess,
    SignalBlock,
)
from .channel import Channel, ChannelRealization, ChannelSample

__author__ = "Egor Achkasov"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Egor Achkasov"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class SionnaRTChannelSample(ChannelSample): """Sample of a Sionna RT channel realization. Generated by sampling a :class:`SionnaRTChannelRealization`. """ paths: rt.Paths # Channel states and delays before Doppler effect application. # These members are Tensorflow tensors and SHOULD NOT be used directly. # It is a backup of sionna.rt.Paths properties used to reverse Doppler effect. __a: Any __tau: Any __gain: float def __init__(self, paths: rt.Paths, gain: float, state: LinkState) -> None: """ Args: paths: Ray-Tracing paths in this sample. Should be generated with _realize of Realization. gain: Linear channel power factor. state: State of the channel at the time of sampling. """ # Initialize base class ChannelSample.__init__(self, state) # Initialize class attributes self.paths = paths self.__a = rt.tf.identity(paths._a) self.__tau = rt.tf.identity(paths._tau) self.__gain = gain @property def expected_energy_scale(self) -> float: """Expected linear scaling of a propagated signal's energy at each receiving antenna. Required to compute the expected energy of a signal after propagation, and therfore signal-to-noise ratios (SNRs) and signal-to-interference-plus-noise ratios (SINRs). TODO Current implementation is technically incorrect. """ return np.abs(np.sum(self.__a)) def __apply_doppler(self, num_samples: int) -> tuple: """Apply Doppler effect on the paths (via sionna.rt.Paths.apply_doppler). Cast and reshape the channel state (a and tau) as assumed in self.state and self._propagate. Set the original _a and _tau of self.paths back to reverse the doppler effect. This workflow allows reusing of the same Paths object for different propagations. Returns: a: gains. Shape (num_rx_ants, num_tx_ants, num_paths, num_samples) tau: delays. Shape (num_rx_ants, num_tx_ants, num_paths) """ # Apply doppler self.paths.apply_doppler( sampling_frequency=self.bandwidth, num_time_steps=num_samples, tx_velocities=self.transmitter_velocity, rx_velocities=self.receiver_velocity, ) # Get and cast CIR a, tau = self.paths.cir() a = a.numpy()[0, 0, :, 0, :, :, :] tau = tau.numpy()[0, 0, 0, :] # Restore paths to the original state self.paths._a = rt.tf.identity(self.__a) self.paths._tau = rt.tf.identity(self.__tau) return a, tau
[docs] @override def state( self, num_samples: int, max_num_taps: int, interpolation_mode: InterpolationMode = InterpolationMode.NEAREST, ) -> ChannelStateInformation: # Apply Doppler effect and get the channel impulse response a, tau = self.__apply_doppler(num_samples) # Init result max_delay = np.max(tau) if tau.size != 0 else 0 max_delay_in_samples = min(max_num_taps, ceil(max_delay * self.bandwidth)) raw_state = np.zeros( ( self.num_receive_antennas, self.num_transmit_antennas, num_samples, 1 + max_delay_in_samples, ), dtype=np.complex128, ) # If no paths hit the target, then return an empty state if a.size == 0 or tau.size == 0: return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state) for a_p, tau_p in zip(np.moveaxis(a, -2, 0), np.moveaxis(tau, -1, 0)): if tau_p < 0: continue delay_tap_index = int(tau_p * self.bandwidth) if delay_tap_index >= max_num_taps: continue # pragma: no cover raw_state[:, :, :, delay_tap_index] += a_p raw_state *= np.sqrt(self.__gain) return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)
@override def _propagate( self, signal_block: SignalBlock, interpolation: InterpolationMode ) -> SignalBlock: # Calculate the resulting signal block parameters sr_ratio = self.receiver_state.sampling_rate / self.transmitter_state.sampling_rate offset_new = int(signal_block.offset * sr_ratio) num_streams_new = self.num_receive_antennas num_samples_new = int(signal_block.num_samples * sr_ratio) # Apply Doppler effect and get the channel impulse response a, tau = self.__apply_doppler(signal_block.num_samples) # If no paths hit the target, then return a zeroed signal if a.size == 0 or tau.size == 0: return SignalBlock( np.zeros((num_streams_new, num_samples_new), signal_block.dtype), offset_new ) # Set other attributes max_delay = np.max(tau) max_delay_in_samples = ceil(max_delay * self.bandwidth) propagated_samples = np.zeros( (num_streams_new, signal_block.num_samples + max_delay_in_samples), dtype=signal_block.dtype, ) # Prepare the optimal einsum path ahead of time for faster execution einsum_subscripts = "ijk,jk->ik" einsum_path = np.einsum_path( einsum_subscripts, a[:, :, 0, :], signal_block, optimize="optimal" )[0] # For each path for a_p, tau_p in zip(np.moveaxis(a, -2, 0), np.moveaxis(tau, -1, 0)): if tau_p == -1.0: continue t = int(tau_p * self.bandwidth) propagated_samples[:, t : t + signal_block.num_samples] += np.einsum( einsum_subscripts, a_p, signal_block, optimize=einsum_path ) propagated_samples *= np.sqrt(self.__gain) return SignalBlock(propagated_samples, offset_new)
[docs] class SionnaRTChannelRealization(ChannelRealization[SionnaRTChannelSample]): """Realization of a Sionna RT channel. Generated by the :meth:`_realize()<SionnaRTChannel._realize>` routine of :class:`SionnaRTChannels<SionnaRTChannel>`. """ __scene: rt.scene.Scene __scene_file: str def __init__( self, scene: rt.scene.Scene, scene_file: str, sample_hooks: Set[ChannelSampleHook] | None = None, gain: float = ChannelRealization._DEFAULT_GAIN, ) -> None: """ Args: scene: The loaded scene object. scene_file: Path to the scene file. Required for simple serialization. sample_hooks: Hooks to be called when a new sample is generated. gain: Linear channel power gain factor. Defaults to 1.0. """ ChannelRealization.__init__(self, sample_hooks, gain) self.__scene = scene self.__scene_file = scene_file @property def scene(self) -> rt.Scene: """Sionna scene object used to generate samples.""" return self.__scene
[docs] @override def _sample(self, state: LinkState) -> SionnaRTChannelSample: # Clear the scene self.__scene._transmitters.clear() self.__scene._receivers.clear() self.__scene._tx_array = None self.__scene._rx_array = None # init self.scene.tx_array tx_antenna = rt.Antenna("iso", "V") tx_positions = [a.position for a in state.transmitter.antennas.transmit_antennas] self.__scene.tx_array = rt.AntennaArray(tx_antenna, tx_positions) # init self.scene.rx_array rx_antenna = rt.Antenna("iso", "V") rx_positions = [a.position for a in state.receiver.antennas.receive_antennas] self.__scene.rx_array = rt.AntennaArray(rx_antenna, rx_positions) # init tx and rx self.__scene.add(rt.Transmitter("Alpha device", state.transmitter.position)) self.__scene.add(rt.Receiver("Beta device", state.receiver.position)) # set other self.scene params self.__scene.frequency = state.transmitter.carrier_frequency self.__scene.synthetic_array = True # calculate paths paths = self.__scene.compute_paths() paths.normalize_delays = False # construct the sample return SionnaRTChannelSample(paths, self.gain, state)
@override def _reciprocal_sample( self, sample: SionnaRTChannelSample, state: LinkState ) -> SionnaRTChannelSample: return self._sample(state)
[docs] @override def serialize(self, process: SerializationProcess) -> None: ChannelRealization.serialize(self, process) process.serialize_string(self.__scene_file, "scene")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> SionnaRTChannelRealization: return cls( rt.load_scene(process.deserialize_string("scene")), process.deserialize_string("scene"), **ChannelRealization._DeserializeParameters(process), # type: ignore[arg-type] )
[docs] class SionnaRTChannel(Channel[SionnaRTChannelRealization, SionnaRTChannelSample]): """Sionna ray-tracing channel. Refer to :doc:`/api/channel/sionna-rt` for further information. """ __scene_file: str __scene: rt.scene.Scene def __init__( self, scene: str, gain: float = Channel._DEFAULT_GAIN, seed: int | None = None ) -> None: """ Args: scene: Path to the scene file. gain: Linear channel power gain factor. Defaults to 1.0. seed: Seed used to initialize the pseudo-random number generator. """ # Init base class Channel.__init__(self, gain, seed) # Load the scene self.__scene_file = scene self.__scene = rt.load_scene(scene) @property def scene(self) -> rt.Scene: """Scene object used to generate realizations.""" return self.__scene
[docs] @override def _realize(self) -> SionnaRTChannelRealization: return SionnaRTChannelRealization( self.__scene, self.__scene_file, self.sample_hooks, self.gain )
[docs] @override def serialize(self, process: SerializationProcess) -> None: Channel.serialize(self, process) process.serialize_string(self.__scene_file, "scene")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> SionnaRTChannel: return cls( process.deserialize_string("scene"), **Channel._DeserializeParameters(process), # type: ignore[arg-type] )