Source code for hermespy.channel.cdl.cluster_delay_lines

# -*- coding: utf-8 -*-

from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum, IntEnum
from functools import cache, cached_property
from math import ceil, sin, cos, sqrt
from typing import Generator, Generic, Literal, List, Set, Tuple, TypeVar

import matplotlib.pyplot as plt
import numpy as np
from h5py import Group
from matplotlib.projections.polar import PolarAxes
from mpl_toolkits.mplot3d import Axes3D  # type: ignore
from scipy.constants import pi, speed_of_light

from hermespy.core import (
    AntennaMode,
    ChannelStateInformation,
    ChannelStateFormat,
    Direction,
    Executable,
    HDFSerializable,
    ScatterVisualization,
    SignalBlock,
    StemVisualization,
    VisualizableAttribute,
)
from hermespy.core.visualize import VAT
from hermespy.tools import db2lin
from ..channel import (
    Channel,
    ChannelRealization,
    ChannelSample,
    LinkState,
    ChannelSampleHook,
    InterpolationMode,
)
from ..consistent import (
    ConsistentGaussian,
    ConsistentRealization,
    ConsistentUniform,
    ConsistentGenerator,
    ConsistentSample,
)

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


class DelayNormalization(Enum):
    """Normalization routine applied to a set of sampled delays.

    Configuration option to :class:.ClusterDelayLineBase models.
    """

    ZERO = 0
    """Normalize the delays, so that the minimal delay is zero."""

    TOF = 1
    """The minimal delay is the time of flight between two devices."""


[docs] class _PowerDelayVisualization(VisualizableAttribute[StemVisualization]): """Visualization of cluster delay sample power delay profile.""" __sample: ClusterDelayLineSample # Sample to be visualized def __init__(self, sample: ClusterDelayLineSample) -> None: self.__sample = sample def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> StemVisualization: _axes: plt.Axes = axes[0, 0] colors = self._get_color_cycle() container = _axes.stem( self.__sample.cluster_delays + self.__sample.delay_offset, self.__sample.cluster_powers, markerfmt="x", ) container.stemlines.set_colors(colors) container.markerline.set_color("white") _axes.set_yscale("log") _axes.xaxis.set_label_text("Delay") _axes.yaxis.set_label_text("Power") _axes.set_xlim(0, self.__sample.max_delay) _axes.set_ylim(self.__sample.cluster_powers.min(), 1) return StemVisualization(figure, axes, container) def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None: # Update stem markers visualization.container.markerline.set_data( self.__sample.cluster_delays + self.__sample.delay_offset, self.__sample.cluster_powers ) # Update stem lines leading to markers visualization.container.stemlines.set_paths( [ np.array([[xx, 0.0], [xx, yy]]) for (xx, yy) in zip( self.__sample.cluster_delays + self.__sample.delay_offset, self.__sample.cluster_powers, ) ] ) # Update axes limits axes: plt.Axes = visualization.axes[0, 0] axes.set_xlim(0, max(self.__sample.max_delay, axes.get_xlim()[1])) axes.set_ylim(min(self.__sample.cluster_powers.min(), axes.get_ylim()[0]), 1.0) @property def title(self) -> str: return "Cluster Power Delay Profile"
[docs] class _AngleVisualization(VisualizableAttribute[ScatterVisualization]): """Visualization of the angles of arrival and departure.""" __sample: ClusterDelayLineSample # Sample to be visualized def __init__(self, sample: ClusterDelayLineSample) -> None: self.__sample = sample def _axes_dimensions(self, **kwargs) -> Tuple[int, int]: return (1, 2)
[docs] def create_figure(self, **kwargs) -> Tuple[plt.FigureBase, VAT]: return plt.subplots( *self._axes_dimensions(**kwargs), subplot_kw={"projection": "polar"}, squeeze=False )
def __generate_scatter_locations(self, center_los=True) -> np.ndarray: delta = self.__sample.transmitter_pose.translation - self.__sample.receiver_pose.translation abs_delta = np.linalg.norm(delta, 2) los_zenith = np.arccos((-delta[2] / abs_delta, delta[2] / abs_delta)) los_azimuth = np.arctan((-delta[1] / delta[0], delta[1] / delta[0])) azimuth_offset = -los_azimuth if center_los else np.zeros(2) zenith_offset = -los_zenith if center_los else np.zeros(2) scatter_locations = np.empty( (self.__sample.num_clusters, 2, self.__sample.num_rays, 2), dtype=np.float64 ) for i, (azimuth_clusters, zenith_clusters) in enumerate( [ [self.__sample.azimuth_of_arrival, self.__sample.zenith_of_arrival], [self.__sample.azimuth_of_departure, self.__sample.zenith_of_departure], ] ): azimuth_clusters = azimuth_clusters + azimuth_offset[0] zenith_clusters = abs(180 * (zenith_clusters + zenith_offset[0]) / pi) % 180 # azimuth_points[zenith_points > 90] -= pi zenith_clusters[zenith_clusters > 90] = 90 - zenith_clusters[zenith_clusters > 90] % 90 scatter_locations[:, i, :, 0] = azimuth_clusters scatter_locations[:, i, :, 1] = zenith_clusters return scatter_locations def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> ScatterVisualization: _axes = axes[0, :] colors = self._get_color_cycle() num_colors = len(colors) markers = ["x", "+", "o"] paths = np.empty((1, 2), dtype=np.object_) scatter_locations = self.__generate_scatter_locations() paths = np.empty((1, 2, 25), dtype=np.object_) c = 0 for c, cluster in enumerate(scatter_locations): paths[0, 0, c] = _axes[0].scatter( cluster[0, :, 0], cluster[0, :, 1], marker=markers[int(c / num_colors)], color=colors[c % num_colors], ) paths[0, 1, c] = _axes[1].scatter( cluster[1, :, 0], cluster[1, :, 1], marker=markers[int(c / num_colors)], color=colors[c % num_colors], ) # Add additional invisible clusters in case the number of clusters changes during redraws for c in range(c + 1, 25): paths[0, 0, c] = _axes[0].scatter( [], [], marker=markers[int(c / num_colors)], color=colors[c % num_colors] ) paths[0, 1, c] = _axes[1].scatter( [], [], marker=markers[int(c / num_colors)], color=colors[c % num_colors] ) ax: PolarAxes for ax in _axes: ax.set_rmax(90) ax.grid(True) ax.set_xlabel("Azimuth") ax.set_ylabel("Zenith") ax.set_rlabel_position(22.5) _axes[0].set_title("Angles of Arrival") _axes[1].set_title("Angles of Departure") return ScatterVisualization(figure, _axes, paths) def _update_visualization(self, visualization: ScatterVisualization, **kwargs) -> None: scatter_locations = self.__generate_scatter_locations() c = 0 for c, cluster in enumerate(scatter_locations): visualization.paths[0, 0, c].set_offsets(cluster[0]) visualization.paths[0, 1, c].set_offsets(cluster[1]) # Hide additional clusters for c in range(c + 1, 25): visualization.paths[0, 0, c].set_offsets(np.empty((0, 2), dtype=np.float64)) visualization.paths[0, 1, c].set_offsets(np.empty((0, 2), dtype=np.float64)) @property def title(self) -> str: return "Ray Angles"
[docs] class ClusterDelayLineSample(ChannelSample): """Sample of a 3GPP Cluster Delay Line channel model.""" # Sub-cluster partitions for the three strongest clusters subcluster_indices: List[List[int]] = [ [0, 1, 2, 3, 4, 5, 6, 7, 18, 19], [8, 9, 10, 11, 16, 17], [12, 13, 14, 15], ] __line_of_sight: bool __rice_factor: float __azimuth_of_arrival: np.ndarray __zenith_of_arrival: np.ndarray __azimuth_of_departure: np.ndarray __zenith_of_departure: np.ndarray __delay_offset: float __cluster_delays: np.ndarray __cluster_delay_spread: float __cluster_powers: np.ndarray __polarization_transformations: np.ndarray __max_delay: float __power_delay_visualization: _PowerDelayVisualization __angle_visualization: _AngleVisualization def __init__( self, line_of_sight: bool, rice_factor: float, azimuth_of_arrival: np.ndarray, zenith_of_arrival: np.ndarray, azimuth_of_departure: np.ndarray, zenith_of_departure: np.ndarray, delay_offset: float, cluster_delays: np.ndarray, cluster_delay_spread: float, cluster_powers: np.ndarray, polarization_transformations: np.ndarray, state: LinkState, ) -> None: # Initialize base class ChannelSample.__init__(self, state) # Initialize attributes self.__line_of_sight = line_of_sight self.__rice_factor = rice_factor self.__azimuth_of_arrival = azimuth_of_arrival self.__zenith_of_arrival = zenith_of_arrival self.__azimuth_of_departure = azimuth_of_departure self.__zenith_of_departure = zenith_of_departure self.__delay_offset = delay_offset self.__cluster_delays = cluster_delays self.__cluster_delay_spread = cluster_delay_spread self.__cluster_powers = cluster_powers self.__polarization_transformations = polarization_transformations self.__power_delay_visualization = _PowerDelayVisualization(self) self.__angle_visualization = _AngleVisualization(self) # Infer additional parameters self.__num_clusters = cluster_delays.shape[0] self.__num_rays = azimuth_of_arrival.shape[1] self.__max_delay = ( max(np.max(cluster_delays[:3] + cluster_delay_spread * 2.56), cluster_delays.max()) + self.__delay_offset ) @property def line_of_sight(self) -> bool: """Does the realization include direct line of sight between the two devices?""" return self.__line_of_sight @property def rice_factor(self) -> float: """Rice factor.""" return self.__rice_factor @property def azimuth_of_arrival(self) -> np.ndarray: """Azimuth of arrival angles in radians.""" return self.__azimuth_of_arrival @property def zenith_of_arrival(self) -> np.ndarray: """Zenith of arrival angles in radians.""" return self.__zenith_of_arrival @property def azimuth_of_departure(self) -> np.ndarray: """Azimuth of departure angles in radians.""" return self.__azimuth_of_departure @property def zenith_of_departure(self) -> np.ndarray: """Zenith of departure angles in radians.""" return self.__zenith_of_departure @property def cluster_delays(self) -> np.ndarray: """Cluster delays in seconds.""" return self.__cluster_delays @property def cluster_delay_spread(self) -> float: """Cluster delay spread in seconds.""" return self.__cluster_delay_spread @property def cluster_powers(self) -> np.ndarray: """Cluster powers.""" return self.__cluster_powers @property def polarization_transformations(self) -> np.ndarray: """Polarization transformations.""" return self.__polarization_transformations @property def num_clusters(self) -> int: """Number of realized scatter clusters.""" return self.__num_clusters @property def num_rays(self) -> int: """Number of rays within each cluster.""" return self.__num_rays @property def max_delay(self) -> float: """Maximum expected delay in seconds.""" return self.__max_delay @property def delay_offset(self) -> float: """Delay offset in seconds.""" return self.__delay_offset @property def expected_energy_scale(self) -> float: return float(np.sum(self.cluster_powers)) def __ray_impulse_generator( self, sampling_rate: float, num_samples: int, center_frequency: float ) -> Generator[Tuple[np.ndarray, np.ndarray, float], None, None]: """Sequential generation of individual ray impulse responses. Subroutine of :meth:`._propagate`. Args: sampling_rate (float): Sampling rate in Hertz. num_samples (int): Number of samples to generate. center_frequency (float): Center frequency in Hertz. """ # First summand scaling of equation 7.5-30 in ETSI TR 138 901 v17.0.0 rice_factor_lin = db2lin(self.rice_factor) nlos_scale = (1 + rice_factor_lin) ** -0.5 if self.line_of_sight else 1.0 # Compute the number of clusters, considering the first two clusters get split into 3 partitions num_split_clusters = min(2, self.num_clusters) virtual_num_clusters = 3 * num_split_clusters + max(0, self.num_clusters - 2) # Prepare the cluster delays, equation 7.5-26 subcluster_delays: np.ndarray = np.repeat( self.cluster_delays[:num_split_clusters, None], 3, axis=1 ) + self.cluster_delay_spread * np.array([0.0, 1.28, 2.56]) virtual_cluster_delays = np.concatenate( (subcluster_delays.flatten(), self.cluster_delays[num_split_clusters:]) ) # Wavelength factor wavelength_factor = center_frequency / speed_of_light relative_velocity = self.receiver_velocity - self.transmitter_velocity fast_fading = wavelength_factor * np.arange(num_samples) / sampling_rate # Compute the cluster propagation parameters for subcluster_idx in range(0, virtual_num_clusters): cluster_idx = int(subcluster_idx / 3) if subcluster_idx < 6 else subcluster_idx - 4 ray_indices = ( ClusterDelayLineSample.subcluster_indices[cluster_idx] if cluster_idx < num_split_clusters else range(self.num_rays) ) delay: float = virtual_cluster_delays[subcluster_idx] for aoa, zoa, aod, zod, jones in zip( self.azimuth_of_arrival[cluster_idx, ray_indices], self.zenith_of_arrival[cluster_idx, ray_indices], self.azimuth_of_departure[cluster_idx, ray_indices], self.zenith_of_departure[cluster_idx, ray_indices], self.polarization_transformations[:, :, cluster_idx, ray_indices].transpose( 2, 0, 1 ), ): # Compute directive unit vectors tx_direction = Direction.From_Spherical(aod, zod) rx_direction = Direction.From_Spherical(aoa, zoa) # Combination of Equation 7.5-23, 7.5.24 and 7.5.28 tx_array_response = self.transmitter_antennas.cartesian_array_response( center_frequency, tx_direction.view(np.ndarray), "global", AntennaMode.TX ) rx_array_response = self.receiver_antennas.cartesian_array_response( center_frequency, rx_direction.view(np.ndarray), "global", AntennaMode.RX ) # Equation 7.5-22 and 7.5.28 in ETSI TR 138 901 v17.0.0 channel: np.ndarray = ( rx_array_response @ jones @ tx_array_response.T * (sqrt(self.cluster_powers[cluster_idx] / self.num_rays) * nlos_scale) ) wave_vector = np.array( [cos(aoa) * sin(zoa), sin(aoa) * sin(zoa), cos(zoa)], dtype=float ) impulse: np.ndarray = np.exp( np.inner(wave_vector, relative_velocity) * fast_fading * 2j * pi ) yield channel, impulse, delay # If line of sight is enabled, yield an additional parameter touple for the line of sight component if self.__line_of_sight: device_vector = self.receiver_pose.translation - self.transmitter_pose.translation los_distance = np.linalg.norm(device_vector, 2) rx_wave_vector = device_vector / los_distance # Equation 7.5-29 tx_array_response = self.transmitter_antennas.cartesian_array_response( center_frequency, self.receiver_pose.translation, "global", AntennaMode.TX ) rx_array_response = self.receiver_antennas.cartesian_array_response( center_frequency, self.transmitter_pose.translation, "global", AntennaMode.RX ) # Second summand of equation 7.5-30, including equation 7.5-29 of ETSI TR 138 901 v17.0.0 los_delay: float = self.cluster_delays[0] los_channel: np.ndarray = ( rx_array_response @ np.array([[1, 0], [-1, 0]]) @ tx_array_response.T * (rice_factor_lin / (1 + rice_factor_lin)) ** 0.5 ) los_impulse: np.ndarray = np.exp(-2j * pi * los_distance * wavelength_factor) * np.exp( np.inner(rx_wave_vector, relative_velocity) * fast_fading * 2j * pi ) yield los_channel, los_impulse, los_delay def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock: max_delay_in_samples = ceil((self.max_delay) * self.bandwidth) propagated_samples = np.zeros( (self.num_receive_antennas, signal.num_samples + max_delay_in_samples), dtype=np.complex128, ) # Prepare the optimal einsum path ahead of time for faster execution channel = np.zeros( (self.num_receive_antennas, self.num_transmit_antennas), dtype=np.complex128 ) impulse = np.zeros(signal.num_samples, dtype=np.complex128) einsum_subscripts = "ij,jk,k->ik" einsum_path = np.einsum_path( einsum_subscripts, channel, signal, impulse, optimize="optimal" )[0] for channel, impulse, delay in self.__ray_impulse_generator( self.bandwidth, signal.num_samples, self.carrier_frequency ): if interpolation == InterpolationMode.NEAREST: delay_in_samples = int((delay + self.delay_offset) * self.bandwidth) propagated_samples[ :, delay_in_samples : delay_in_samples + signal.num_samples ] += np.einsum(einsum_subscripts, channel, signal, impulse, optimize=einsum_path) return SignalBlock(propagated_samples, signal._offset)
[docs] def state( self, num_samples: int, max_num_taps: int, interpolation_mode: InterpolationMode = InterpolationMode.NEAREST, ) -> ChannelStateInformation: max_delay_in_samples = min(max_num_taps, ceil(self.max_delay * self.bandwidth)) raw_state = np.zeros( ( self.num_receive_antennas, self.num_transmit_antennas, num_samples, 1 + max_delay_in_samples, ), dtype=np.complex128, ) for channel, impulse, delay in self.__ray_impulse_generator( self.bandwidth, num_samples, self.carrier_frequency ): # Compute the delay in samples delay_tap_index = int((delay + self.delay_offset) * self.bandwidth) if delay_tap_index >= max_num_taps: continue # pragma: no cover # Compute the resulting channel state raw_state[:, :, :, delay_tap_index] += np.einsum( "ij,k->ijk", channel, impulse, optimize=False ) return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)
@property def plot_power_delay(self) -> _PowerDelayVisualization: return self.__power_delay_visualization @property def plot_angles(self) -> _AngleVisualization: return self.__angle_visualization
[docs] def plot_rays(self, title: str | None = None) -> plt.Figure: with Executable.style_context(): axis: Axes3D figure, axis = plt.subplots(subplot_kw={"projection": "3d"}) colors = plt.rcParams["axes.prop_cycle"].by_key()["color"] magnitude = np.linalg.norm( self.transmitter_pose.translation - self.receiver_pose.translation ) for i, (aoa, zoa, aod, zod) in enumerate( zip( self.azimuth_of_arrival, self.zenith_of_arrival, self.azimuth_of_departure, self.zenith_of_departure, ) ): tx_vectors = np.array( [np.sin(zod) * np.cos(aod), np.sin(zod) * np.sin(aod), np.cos(zod)] ) rx_vectors = -np.array( [np.sin(zoa) * np.cos(aoa), np.sin(zoa) * np.sin(aoa), np.cos(zoa)] ) for vectors, origin in zip( (tx_vectors, rx_vectors), (self.transmitter_pose.translation, self.receiver_pose.translation), ): stops = vectors * magnitude + origin[:, np.newaxis] for stop in stops.T: axis.plot( [origin[0], stop[0]], [origin[1], stop[1]], [origin[2], stop[2]], color=colors[i % len(colors)], ) """start_arrival = self.transmitter_position stop_arrival = self.transmitter_position + arrival start_departure = self.receiver_position stop_departure = self.receiver_position + departure""" axis.set_zlim(-magnitude, magnitude) axis.scatter(*self.transmitter_pose.translation, marker="o", color="w") axis.text(*self.transmitter_pose.translation, "Tx", zorder=1) axis.scatter(*self.receiver_pose.translation, marker="o", color="w") axis.text(*self.receiver_pose.translation, "Rx", zorder=1) figure.suptitle("Rays" if title is None else title) return figure
@staticmethod def _angular_spread(angles: np.ndarray, powers: np.ndarray) -> float: """Compute the angular spread of a set of data. Implements equation (A-1) of :footcite:t:`3GPP:TR38901`. Args: angles (numpy.ndarray): Numpy array of angles in radians for all paths of propagation. powers (numpy.ndarray): Numpy angles of power for all paths of propagation. Returns: The angluar spread in radians. Raises: ValueError: If the dimensions of `angles` and `powers` don't match. """ if angles.shape != powers.shape: raise ValueError("Dimensions of angles and powers don't match") summed_power = np.sum(powers, axis=None) weighted_angle_sum = np.sum(np.exp(1j * angles) * powers, axis=None) spread = np.sqrt(-2 * np.log(np.abs(weighted_angle_sum / summed_power))) return spread @cached_property def azimuth_arrival_spread(self) -> float: """Spread of azimuth of arrival angles. Returns: Angular spread in radians. """ return self._angular_spread( self.azimuth_of_arrival, np.repeat(self.cluster_powers[:, None], self.azimuth_of_arrival.shape[1], axis=1), ) @cached_property def azimuth_departure_spread(self) -> float: """Spread of azimuth of departure angles. Returns: Angular spread in radians. """ return self._angular_spread( self.azimuth_of_departure, np.repeat(self.cluster_powers[:, None], self.azimuth_of_departure.shape[1], axis=1), ) @cached_property def zenith_arrival_spread(self) -> float: """Spread of zenith of arrival angles. Returns: Angular spread in radians. """ return self._angular_spread( self.zenith_of_arrival, np.repeat(self.cluster_powers[:, None], self.zenith_of_arrival.shape[1], axis=1), ) @cached_property def zenith_departure_spread(self) -> float: """Spread of zenith of departure angles. Returns: Angular spread in radians. """ return self._angular_spread( self.zenith_of_departure, np.repeat(self.cluster_powers[:, None], self.zenith_of_departure.shape[1], axis=1), )
[docs] def reciprocal(self, state: LinkState) -> ClusterDelayLineSample: """Generate a reciprocal sample of the current sample. Args: state (ChannelState): State of the reciprocal sample. Returns: A reciprocal sample. """ return ClusterDelayLineSample( self.line_of_sight, self.rice_factor, self.azimuth_of_departure, self.zenith_of_departure, self.azimuth_of_arrival, self.zenith_of_arrival, self.delay_offset, self.cluster_delays, self.cluster_delay_spread, self.cluster_powers, self.polarization_transformations, state, )
[docs] class LargeScaleState(object): """Large scale state of a 3GPP Cluster Delay Line channel model.""" @property @abstractmethod def line_of_sight(self) -> bool: """Direct line of sight between two linked devices.""" ... # pragma: no cover @property @abstractmethod def value(self) -> int: """Integer value of the state.""" ... # pragma: no cover
class LOSState(LargeScaleState, IntEnum): """Line of sight state of the cluster delay line model.""" LOS = 0 """Direct line of sight connection without blockage between base station and user terminal. """ NLOS = 1 """Physical blockage between base station and user terminal.""" @property def line_of_sight(self) -> bool: return self == LOSState.LOS @property def value(self) -> int: return int(self) class O2IState(LargeScaleState, IntEnum): """O2I state of the cluster delay line model.""" LOS = 0 """Direct line of sight connection without blockage between base station and user terminal. """ NLOS = 1 """Physical blockage between base station and user terminal.""" O2I = 2 """User terminal loacated inside a building.""" @property def line_of_sight(self) -> bool: return self == LOSState.LOS @property def value(self) -> int: return int(self) LSST = TypeVar("LSST", bound=LargeScaleState) """Type variable for large scale state types.""" class ClusterDelayLineSampleParameters(object): """Data class for initialization parameters of a 3GPP Cluster Delay Line channel model sample.""" __carrier_frequency: float __distance_3d: float __distance_2d: float __base_height: float __terminal_height: float def __init__( self, carrier_frequency: float, distance_3d: float, distance_2d: float, base_height: float, terminal_height: float, ) -> None: """ Args: carrier_frequency (float): Carrier frequency in Hz. distance_3d (float): Distance between the base station and the user terminal in meters. distance_2d (float): Horizontal distance between the base station and the user terminal in meters. base_height (float): Height of the base station in meters. terminal_height (float): Height of the user terminal in meters. """ self.__carrier_frequency = carrier_frequency self.__distance_3d = distance_3d self.__distance_2d = distance_2d self.__base_height = base_height self.__terminal_height = terminal_height @property def carrier_frequency(self) -> float: """Carrier frequency in Hz.""" return self.__carrier_frequency @property def distance_3d(self) -> float: """Distance between the base station and the user terminal in meters.""" return self.__distance_3d @property def distance_2d(self) -> float: """Horizontal distance between the base station and the user terminal in meters.""" return self.__distance_2d @property def base_height(self) -> float: """Height of the base station in meters.""" return self.__base_height @property def terminal_height(self) -> float: """Height of the user terminal in meters.""" return self.__terminal_height @dataclass class ClusterDelayLineRealizationParameters(object): """Data class for initialization parameters of a 3GPP Cluster Delay Line channel model realization.""" state_variable: ConsistentUniform delay_normalization: DelayNormalization oxygen_absorption: bool large_scale_parameters: np.ndarray """Large scale parameters of the channel model. Numpy vector with the following elements with the elements containing samples of a standard normal distribution with specific cross-correlations. """ cluster_shadowing_log_variable: ConsistentGaussian cluster_delays_variable: ConsistentUniform azimuth_angle_variation_variable: ConsistentGaussian azimuth_spread_sign: np.ndarray zenith_angle_variation_variable: ConsistentGaussian zenith_spread_sign: np.ndarray angle_coupling_indices: np.ndarray cross_polarization_power_variable: ConsistentGaussian cross_polarization_phase_variable: ConsistentUniform
[docs] class ClusterDelayLineRealization(ChannelRealization[ClusterDelayLineSample], Generic[LSST]): """Realization of a 3GPP Cluster Delay Line channel model.""" # Cluster scaling factors for azimuth angles # Table 7.5-2 in ETSI TR 138 901 v17.0.0 __azimuth_scaling_factors = np.array( [ [4, 0.779], [5, 0.86], [8, 1.018], [10, 1.090], [11, 1.123], [12, 1.146], [14, 1.19], [15, 1.211], [16, 1.226], [19, 1.273], [20, 1.289], [25, 1.358], ], dtype=np.float64, ) # Cluster scaling factors for zenith angles # Table 7.5-4 in ETSI TR 138 901 v17.0.0 __zenith_scaling_factors = np.array( [ [8, 0.889], [10, 0.957], [11, 1.031], [12, 1.104], [15, 1.1088], [19, 1.184], [20, 1.178], [25, 1.282], ], dtype=np.float64, ) # Ray offset angles # Table 7.5-3 in ETSI TR 138 901 v17.0.0 _ray_offset_angles = np.array( [ 0.0447, -0.0447, 0.1413, -0.1413, 0.2492, -0.2492, 0.3715, -0.3715, 0.5129, -0.5129, 0.6797, -0.6797, 0.8844, -0.8844, 1.1481, -1.1481, 1.5195, -1.5195, 2.1551, -2.1551, ], dtype=np.float64, ) # Oxygen absorption factors # Table 7.6-1 in ETSI TR 138 901 v17.0.0 # First column: Carrier frequency in GHz # Second column: Oxygen absorption factor in dB/km __oxygen_loss = np.array( [ [52.0, 0.0], [53.0, 1.0], [54.0, 2.0], [55.0, 4.0], [65.0, 6.6], [75.0, 9.7], [58.0, 12.6], [59.0, 14.6], [60.0, 15.0], [61.0, 14.6], [62.0, 14.3], [63.0, 10.5], [64.0, 6.8], [65.0, 3.9], [66.0, 1.9], [67.0, 1.0], [68.0, 0.0], ], dtype=np.float64, ) def __init__( self, expected_state: LSST | None, state_realization: ConsistentRealization, parameters: ClusterDelayLineRealizationParameters, sample_hooks: Set[ChannelSampleHook[ClusterDelayLineSample]], gain: float = 1.0, ) -> None: """ Args: expected_state (LSST | None): Expected large-scale state of the channel. If not specified, the large-scale state is randomly generated. state_realization (ConsistentRealization): Realization of the large scale state. parameters (ClusterDelayLineRealizationParameters): General parameters of the cluster delay line realization. sample_hooks (Set[ChannelSampleHook[ClusterDelayLineSample]]): Hooks for callback functions during the sample generation. gain (float, optional): Linear amplitude scaling factor if signals propagated over the channel. """ # Initialize base class ChannelRealization.__init__(self, sample_hooks, gain) # Initialize attributes self.__expected_state = expected_state self.__state_realization = state_realization self.__state_variable = parameters.state_variable self.__delay_normalization = parameters.delay_normalization self.__oxygen_absorption = parameters.oxygen_absorption self.__large_scale_parameters = parameters.large_scale_parameters self.__cluster_shadowing_log_variable = parameters.cluster_shadowing_log_variable self.__cluster_delays_variable = parameters.cluster_delays_variable self.__azimuth_angle_variation_variable = parameters.azimuth_angle_variation_variable self.__azimuth_spread_sign = parameters.azimuth_spread_sign self.__zenith_angle_variation_variable = parameters.zenith_angle_variation_variable self.__zenith_spread_sign = parameters.zenith_spread_sign self.__angle_coupling_indices = parameters.angle_coupling_indices self.__xpr_variable = parameters.cross_polarization_power_variable self.__xpr_phase = parameters.cross_polarization_phase_variable @property def expected_state(self) -> LSST | None: """Expected state of the channel realization. If None, the state will be randomly drawn from the state variable for each sample. """ return self.__expected_state @abstractmethod def _pathloss_dB(self, state: LSST, parameters: ClusterDelayLineSampleParameters) -> float: """Power loss during signal propagation in dB. Args: state (LSST): Large scale state of the channel. parameters (ClusterDelayLineSampleParameters): Parameters of the channel realization. Returns: Pathloss in dB. """ ... # pragma: no cover @abstractmethod def _small_scale_realization(self, state: LSST) -> ConsistentRealization: """Fetch the random realization of the small scale state. Args: state (LSST): Large scale state of the channel. Returns: ConsistentRealization: Realization of the small scale state. """ ... # pragma: no cover @abstractmethod def _sample_large_scale_state( self, state_variable_realization: float, parameters: ClusterDelayLineSampleParameters ) -> LSST: """Sample the large scale state of the channel. Returns: LSST: Large scale state of the channel. """ ... # pragma: no cover @abstractmethod def _delay_spread_mean(self, state: LSST, carrier_frequency: float) -> float: """Mean of the cluster delay spread. The spread realization and its mean are referred to as :math:`\\mathrm{DS}` and :math:`\\mu_{\\mathrm{lgDS}}` within the the standard, respectively. Returns: float: Mean delay spread in seconds. """ ... # pragma: no cover @abstractmethod def _delay_spread_std(self, state: LSST, carrier_frequency: float) -> float: """Standard deviation of the cluster delay spread. The spread realization and its standard deviation are referred to as :math:`\\mathrm{DS}` and :math:`\\sigma_{\\mathrm{lgDS}}` within the the standard, respectively. Returns: float: Delay spread standard deviation in seconds. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _aod_spread_mean(self, state: LSST, carrier_frequency: float) -> float: """Mean of the Azimuth Angle-of-Departure spread. The spread realization and its mean are referred to as :math:`\\mathrm{ASD}` and :math:`\\mu_{\\mathrm{lgASD}}` within the the standard, respectively. Returns: float: Mean angle spread in seconds """ ... # pragma: no cover @abstractmethod def _aod_spread_std(self, state: LSST, carrier_frequency: float) -> float: """Standard deviation of the Azimuth Angle-of-Departure spread. The spread realization and its standard deviation are referred to as :math:`\\mathrm{ASD}` and :math:`\\sigma_{\\mathrm{lgASD}}` within the the standard, respectively. Returns: float: Angle spread standard deviation in seconds. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _aoa_spread_mean(self, state: LSST, carrier_frequency: float) -> float: """Mean of the Azimuth Angle-of-Arriaval spread. The spread realization and its mean are referred to as :math:`\\mathrm{ASA}` and :math:`\\mu_{\\mathrm{lgASA}}` within the the standard, respectively. Returns: float: Mean angle spread in seconds """ ... # pragma: no cover @abstractmethod def _aoa_spread_std(self, state: LSST, carrier_frequency: float) -> float: """Standard deviation of the Azimuth Angle-of-Arrival spread. The spread realization and its standard deviation are referred to as :math:`\\mathrm{ASA}` and :math:`\\sigma_{\\mathrm{lgASA}}` within the the standard, respectively. Returns: float: Angle spread standard deviation in seconds. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _zoa_spread_mean(self, state: LSST, carrier_frequency: float) -> float: """Mean of the Zenith Angle-of-Arriaval spread. The spread realization and its mean are referred to as :math:`\\mathrm{ZSA}` and :math:`\\mu_{\\mathrm{lgZSA}}` within the the standard, respectively. Returns: float: Mean angle spread in seconds """ ... # pragma: no cover @abstractmethod def _zoa_spread_std(self, state: LSST, carrier_frequency: float) -> float: """Standard deviation of the Zenith Angle-of-Arrival spread. The spread realization and its standard deviation are referred to as :math:`\\mathrm{ZSA}` and :math:`\\sigma_{\\mathrm{lgZSA}}` within the the standard, respectively. Returns: float: Angle spread standard deviation in seconds. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover ############################### # ToDo: Shadow fading function ############################### @abstractmethod def _rice_factor_mean(self) -> float: """Mean of the rice factor distribution. The rice factor realization and its mean are referred to as :math:`K` and :math:`\\mu_K` within the the standard, respectively. Returns: float: Rice factor mean in dB. """ ... # pragma: no cover @abstractmethod def _rice_factor_std(self) -> float: """Standard deviation of the rice factor distribution. The rice factor realization and its standard deviation are referred to as :math:`K` and :math:`\\sigma_K` within the the standard, respectively. Returns: float: Rice factor standard deviation in dB. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _delay_scaling(self, state: LSST) -> float: """Delay scaling proportionality factor Referred to as :math:`r_{\\tau}` within the standard. Returns: float: Scaling factor. Raises: ValueError: If scaling factor is smaller than one. """ ... # pragma: no cover @abstractmethod def _cross_polarization_power_mean(self, state: LSST) -> float: """Mean of the cross-polarization power. The cross-polarization power and its mean are referred to as :math:`\\mathrm{XPR}` and :math:`\\mu_{\\mathrm{XPR}}` within the the standard, respectively. Returns: float: Mean power in dB. """ ... # pragma: no cover @abstractmethod def _cross_polarization_power_std(self, state: LSST) -> float: """Standard deviation of the cross-polarization power. The cross-polarization power and its standard deviation are referred to as :math:`\\mathrm{XPR}` and :math:`\\sigma_{\\mathrm{XPR}}` within the the standard, respectively. Returns: float: Power standard deviation in dB. Raises: ValueError: If the standard deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _num_clusters(self, state: LSST) -> int: """Number of clusters in the channel realization.""" ... # pragma: no cover @abstractmethod def _cluster_aod_spread(self, state: LSST) -> float: """Azimuth Angle-of-Departure spread within an individual cluster. Referred to as :math:`c_{ASD}` within the standard. Returns: float: Angle spread in degrees. Raises: ValueError: If spread is smaller than zero. """ ... # pragma: no cover @abstractmethod def _cluster_aoa_spread(self, state: LSST) -> float: """Azimuth Angle-of-Arrival spread within an individual cluster. Referred to as :math:`c_{ASA}` within the standard. Returns: float: Angle spread in degrees. Raises: ValueError: If spread is smaller than zero. """ ... # pragma: no cover @abstractmethod def _cluster_zoa_spread(self, state: LSST) -> float: """Zenith Angle-of-Arrival spread within an individual cluster. Referred to as :math:`c_{ZSA}` within the standard. Returns: float: Angle spread in degrees. Raises: ValueError: If spread is smaller than zero. """ ... # pragma: no cover @abstractmethod def _cluster_shadowing_std(self, state: LSST) -> float: """Standard deviation of the cluster shadowing. Referred to as :math:`\\zeta` within the the standard. Returns: float: Cluster shadowing standard deviation. Raises: ValueError: If the deviation is smaller than zero. """ ... # pragma: no cover @abstractmethod def _zod_spread_mean( self, state: LSST, parameters: ClusterDelayLineSampleParameters ) -> float: ... # pragma: no cover @abstractmethod def _zod_spread_std( self, state: LSST, parameters: ClusterDelayLineSampleParameters ) -> float: ... # pragma: no cover @abstractmethod def _zod_offset(self, state: LSST, parameters: ClusterDelayLineSampleParameters) -> float: """Offset between Zenith Angle-of-Arrival and Angle-of-Departure. The offset is referred to as :math:`\\mu_{\\mathrm{offset,ZOD}}` within the standard. Returns: float: The offset in degrees. """ ... # pragma: no cover def _sample_large_scale_parameters( self, state: LSST, parameters: ClusterDelayLineSampleParameters ) -> Tuple[float, ...]: """Sample large-scale parameters. Representation of step 4 in ETSI TR 138.901 v17.0.0. Args: state (LSST): Large scale state of the channel. parameters (ClusterDelayLineSampleParameters): Parameters of the channel realization. Returns: Tuple of large-scale parameters. """ mean = np.array( [ self._delay_spread_mean(state, parameters.carrier_frequency), self._aod_spread_mean(state, parameters.carrier_frequency), self._aoa_spread_mean(state, parameters.carrier_frequency), self._zoa_spread_mean(state, parameters.carrier_frequency), self._zod_spread_mean(state, parameters), self._rice_factor_mean(), 0.0, ], dtype=np.float64, ) std = np.array( [ self._delay_spread_std(state, parameters.carrier_frequency), self._aod_spread_std(state, parameters.carrier_frequency), self._aoa_spread_std(state, parameters.carrier_frequency), self._zoa_spread_std(state, parameters.carrier_frequency), self._zod_spread_std(state, parameters), self._rice_factor_std(), 1.0, ], dtype=np.float64, ) # Enforce the proper cross-correlations between the large-scale parameters # The routine is described in section 3.3.1 of the WINNERII final report S = mean + std * self.__large_scale_parameters[state.value, ...] # Transform to the proper distribution DS = 10 ** S[0] ASD = min(10 ** S[1], 104) ASA = min(10 ** S[2], 104) ZSA = min(10 ** S[3], 52) ZSD = min(10 ** S[4], 52) K = S[5] SF = S[6] return DS, ASD, ASA, ZSA, ZSD, K, SF def _cluster_delays( self, consistent_sample: ConsistentSample, state: LSST, delay_spread: float, rice_factor: float, ) -> Tuple[np.ndarray, np.ndarray, float]: """Compute a single sample set of normalized cluster delays. A single cluster delay is referred to as :math:`\\tau_n` within the the standard. Implementation of equations 7.5-1 to 7.5-4 in TR 138.901 v17.0.0. Args: consistent_sample (ConsistentSample): Sample of the consistent random process. state (LSST): Large scale state of the channel. line_of_sight (bool): Is the realization line of sight? delay_spread (float): Delay spread in seconds. Denoted by :math:`\\mathrm{DS}` in the standard. rice_factor (float): Rice factor K in dB. Denoted by :math:`K` in the standard. Returns: Tuple of - Raw delay samples - True delays - Minimal delay """ # Equation 7.5-1 in TR 138.901 v17.0.0 r_tau = self._delay_scaling(state) X_n = self.__cluster_delays_variable.sample(consistent_sample)[: self._num_clusters(state)] unsorted_delays = -r_tau * delay_spread * np.log(X_n) # Equation 7.5-2 in TR 138.901 v17.0.0 # Sort the delays in ascending order sorted_delays = np.sort(unsorted_delays) minimal_delay = sorted_delays[0] sorted_delays -= minimal_delay # In case of line of sight, scale the delays by the appropriate K-factor # Equations 7.5-3 and 7.5-4 in TR 138.901 v17.0.0 scaled_delays = sorted_delays.copy() if state.line_of_sight: rice_scale = ( 0.775 - 0.0433 * rice_factor + 2e-4 * rice_factor**2 + 17e-5 * rice_factor**3 ) scaled_delays /= rice_scale minimal_delay = 0.0 # This is required for the oxygen absorption computation # Return the raw and scaled delays, since they are both required for further processing return sorted_delays, scaled_delays, minimal_delay def _cluster_powers( self, consistent_sample: ConsistentSample, state: LSST, delay_spread: float, delays: np.ndarray, rice_factor: float, ) -> np.ndarray: """Compute a single sample set of normalized cluster power factors from delays. A single cluster power factor is referred to as :math:`P_n` within the the standard. Args: consistent_sample (ConsistentSample): Sample of the consistent random process. state (LSST): Large scale state of the channel. delay_spread (float): Delay spread in seconds. Denoted by :math:`\\mathrm{DS}` in the standard. delays (numpy.ndarray): Vector of cluster delays. Denoted by :math:`\\tau` in the standard. rice_factor (float): Rice factor in dB. Denoted by :math:`K` in the standard. Returns: np.ndarray: Vector of cluster power scales. """ delay_scaling = self._delay_scaling(state) shadowing = 10 ** ( -0.1 * self.__cluster_shadowing_log_variable.sample( consistent_sample, 0.0, self._cluster_shadowing_std(state) )[: self._num_clusters(state)] ) # Equation 7.5-5 in TR 138.901 v17.0.0 powers = np.exp(-delays * (delay_scaling - 1) / (delay_scaling * delay_spread)) * shadowing # In case of line of sight, add a specular component to the cluster delays if state.line_of_sight: # Equations 7.5-7 and 7.5-8 in TR 138.901 v17.0.0 linear_rice_factor = db2lin(rice_factor) powers /= (1 + linear_rice_factor) * np.sum(powers.flat) powers[0] += linear_rice_factor / (1 + linear_rice_factor) else: # Equation 7.5-6 in TR 138.901 v17.0.0 powers /= np.sum(powers.flat) # Eliminate clusters with powers less than -25 dB with respect to the strongest cluster cutoff_power = powers[0] * 10 ** (-25 / 10) powers = powers[np.where(powers >= cutoff_power)] return powers def _ray_azimuth_angles( self, consistent_sample: ConsistentSample, state: LSST, cluster_powers: np.ndarray, spread: float, rice_factor: float, los_azimuth: float, direction: Literal["arrival", "departure"], ) -> np.ndarray: """Compute cluster ray azimuth angles of arrival or departure. Args: consistent_sample (ConsistentSample): Sample of the consistent random process. state (LSST): Large scale state of the channel. cluster_powers (numpy.ndarray): Vector of cluster powers. The length determines the number of clusters. spread (float): Root-mean squared Azimuth spread in degrees. Denoted by :math:`\\mathrm{ASD}` or :math:`\\mathrm{ASA}` in the standard. rice_factor (float): Rice factor in dB. Denoted by :math:`K` in the standard. los_azimuth (float): Line of sight azimuth angle to the target in degrees. direction (Literal["arrival", "departure"]): Direction of the angles. Either "arrival" or "departure". Returns: np.ndarray: Matrix of angles in degrees. The first dimension indicates the cluster index, the second dimension the ray index. """ # Determine the closest scaling factor scale_index = np.argmin(np.abs(self.__azimuth_scaling_factors[:, 0] - len(cluster_powers))) angle_scale = self.__azimuth_scaling_factors[scale_index, 1] # Scale the scale (hehe) in the line of sight case if state.line_of_sight: angle_scale *= ( 1.1035 - 0.028 * rice_factor - 2e-3 * rice_factor**2 + 1e-4 * rice_factor**3 ) angles: np.ndarray = ( 2 * spread / 1.4 / angle_scale * np.sqrt(-np.log(cluster_powers / cluster_powers.max())) ) # Assign positive / negative integers and add some noise, Equation 7.5-11 in TR 138.901 v17.0.0 angle_variation = (spread / 7) * self.__azimuth_angle_variation_variable.sample( consistent_sample ) spread_angles = ( self.__azimuth_spread_sign[: cluster_powers.size] * angles + angle_variation[: cluster_powers.size] ) # Add the actual line of sight term if state.line_of_sight: # The first angle within the list is exactly the line of sight component spread_angles += los_azimuth - spread_angles[0] else: spread_angles += los_azimuth # Spread the angles cluster_spread = ( self._cluster_aoa_spread(state) if direction == "arrival" else self._cluster_aod_spread(state) ) ray_offsets = cluster_spread * self._ray_offset_angles ray_angles = np.tile(spread_angles[:, None], len(ray_offsets)) + ray_offsets return ray_angles def _ray_zoa( self, consistent_sample: ConsistentSample, state: LSST, cluster_powers: np.ndarray, spread: float, rice_factor: float, los_zenith: float, ) -> np.ndarray: """Compute cluster ray zenith angles of arrival. Args: consistent_sample (ConsistentSample): Sample of the consistent random process. state (LSST): Large scale state of the channel. cluster_powers (numpy.ndarray): Vector of cluster powers. The length determines the number of clusters. spread (float): Zenith of arrival angular spread in degrees. Denoted by :math:`\\mathrm{ZSA}` in the standard. rice_factor (float): Rice factor in dB. Denoted by :math:`K` in the standard. los_zenith (float): Line of sight zenith angle to the target in degrees. Returns: np.ndarray: Matrix of angles in degrees. The first dimension indicates the cluster index, the second dimension the ray index. """ # Equation 7.5-15 of TR 138.901 v17.0.0 scaling_factor_index = np.argmin( np.abs(self.__zenith_scaling_factors[:, 0] - cluster_powers.size) ) zenith_scaling_factor = self.__zenith_scaling_factors[scaling_factor_index, 1] if state.line_of_sight: zenith_scaling_factor *= ( 1.3086 + 0.0339 * rice_factor - 0.0077 * rice_factor**2 + 2e-4 * rice_factor**3 ) # Generate angle starting point # Equation 7.5-14 of TR 138.901 v17.0.0 zenith_centroids: np.ndarray = ( -spread * np.log(cluster_powers / cluster_powers.max()) / zenith_scaling_factor ) # Equation 7.5-16 of TR 138.901 v17.0.0 # ToDo: Treat the BST-UT case!!!! (los_zenith = 90°) cluster_variation = self.__zenith_angle_variation_variable.sample( consistent_sample, 0.0, (spread / 7) ) cluster_zenith = ( self.__zenith_spread_sign[: cluster_powers.size] * zenith_centroids + cluster_variation[: cluster_powers.size] ) # Equation 7.5-17 of TR 138.901 v17.0.0 if state == LOSState.LOS: cluster_zenith += los_zenith - cluster_zenith[0] else: cluster_zenith += los_zenith # Spread the angles # Equation 7.5 -18 of TR 138.901 v17.0.0 ray_offsets = self._cluster_zoa_spread(state) * self._ray_offset_angles ray_zenith = np.tile(cluster_zenith[:, None], len(ray_offsets)) + ray_offsets return ray_zenith def _ray_zod( self, consistent_sample: ConsistentSample, state: LSST, parameters: ClusterDelayLineSampleParameters, cluster_powers: np.ndarray, spread: float, rice_factor: float, los_zenith: float, ) -> np.ndarray: """Compute cluster ray zenith angles of departure. Args: consistent_sample (ConsistentSample): Sample of the consistent random process. state (LSST): Large scale state of the channel. cluster_powers (numpy.ndarray): Vector of cluster powers. The length determines the number of clusters. spread (float): Zenith of departure angular spread in degrees. Denoted by :math:`\\mathrm{ZSD}` in the standard. rice_factor (float): Rice factor in dB. los_zenith (float): Line of sight zenith angle to the target in degrees. Returns: np.ndarray: Matrix of angles in degrees. The first dimension indicates the cluster index, the second dimension the ray index. """ # Equation 7.5-15 of TR 138.901 v17.0.0 scaling_factor_index = np.argmin( np.abs(self.__zenith_scaling_factors[:, 0] - cluster_powers.size) ) zenith_scaling_factor = self.__zenith_scaling_factors[scaling_factor_index, 1] if state.line_of_sight: zenith_scaling_factor *= ( 1.3086 + 0.0339 * rice_factor - 0.0077 * rice_factor**2 + 2e-4 * rice_factor**3 ) # Generate angle starting point # Equation 7.5-14 of TR 138.901 v17.0.0 zenith_centroids: np.ndarray = ( -spread * np.log(cluster_powers / cluster_powers.max()) / zenith_scaling_factor ) # Equation 7.5-19 of TR 138.901 v17.0.0 # ToDo: Treat the BST-UT case!!!! (los_zenith = 90°) cluster_variation = self.__zenith_angle_variation_variable.sample( consistent_sample, 0.0, (spread / 7) ) cluster_zenith = ( self.__zenith_spread_sign[: cluster_powers.size] * zenith_centroids + cluster_variation[: cluster_powers.size] + self._zod_offset( state, parameters ) # This line is the only difference to equation 7.5-16 ) # Equation 7.5-17 of TR 138.901 v17.0.0 if state == LOSState.LOS: cluster_zenith += los_zenith - cluster_zenith[0] else: cluster_zenith += los_zenith # Spread the angles # Equation 7.5-20 of TR 138.901 v17.0.0 zod_spread_mean = self._zod_spread_mean(state, parameters) ray_offsets = 3 / 8 * 10**zod_spread_mean * self._ray_offset_angles ray_zenith = np.tile(cluster_zenith[:, None], len(ray_offsets)) + ray_offsets return ray_zenith def _sample(self, channel_state: LinkState) -> ClusterDelayLineSample: # Compute the line of sight distance between the devices, i.e. base station to user terminal distance_3d: float = np.linalg.norm(channel_state.transmitter.position - channel_state.receiver.position, 2) # type: ignore[assignment] distance_2d: float = np.linalg.norm( # type: ignore[assignment] channel_state.transmitter.position[:2] - channel_state.receiver.position[:2], 2 ) base_height: float = max( channel_state.transmitter.position[2], channel_state.receiver.position[2] ) terminal_height: float = min( channel_state.transmitter.position[2], channel_state.receiver.position[2] ) parameters = ClusterDelayLineSampleParameters( channel_state.carrier_frequency, distance_3d, distance_2d, base_height, terminal_height ) # Ensure transmiter and receiver are not located at identical positions if distance_3d == 0.0: raise RuntimeError( "Identical device positions violate the far-field assumption of the 3GPP CDL channel model" ) # Decide on the overall large-scale state of the channel if self.expected_state is None: state_variable_sample = float( self.__state_variable.sample( self.__state_realization.sample( channel_state.transmitter.position, channel_state.receiver.position ) ) ) state = self._sample_large_scale_state(state_variable_sample, parameters) else: state = self.expected_state small_scale_realization = self._small_scale_realization(state) parameters_sample = small_scale_realization.sample( channel_state.transmitter.position, channel_state.receiver.position ) # Sample large-scale parameters DS, ASD, ASA, ZSA, ZSD, K, SF = self._sample_large_scale_parameters(state, parameters) # Compute line of sight directions in local coordinates for both devices tx_los_direction = channel_state.transmitter.pose.invert().transform_direction( channel_state.receiver.position - channel_state.transmitter.position, True ) rx_los_direction = channel_state.receiver.pose.invert().transform_direction( channel_state.transmitter.position - channel_state.receiver.position, True ) # Extract directive angles in spherical coordinates tx_los_angles = tx_los_direction.to_spherical() rx_los_angles = rx_los_direction.to_spherical() raw_cluster_delays, cluster_delays, minimal_delay = self._cluster_delays( parameters_sample, state, DS, K ) cluster_powers = self._cluster_powers(parameters_sample, state, DS, raw_cluster_delays, K) raw_cluster_delays = raw_cluster_delays[: cluster_powers.size] cluster_delays = cluster_delays[: cluster_powers.size] # Compute cluster angles ray_aod = ( pi / 180 * self._ray_azimuth_angles( parameters_sample, state, cluster_powers, ASD, K, 180 * tx_los_angles[0] / pi, "departure", ) ) ray_aoa = ( pi / 180 * self._ray_azimuth_angles( parameters_sample, state, cluster_powers, ASA, K, 180 * rx_los_angles[0] / pi, "arrival", ) ) ray_zod = ( pi / 180 * self._ray_zod( parameters_sample, state, parameters, cluster_powers, ZSD, K, 180 * tx_los_angles[1] / pi, ) ) ray_zoa = ( pi / 180 * self._ray_zoa( parameters_sample, state, cluster_powers, ZSA, K, 180 * rx_los_angles[1] / pi ) ) # Couple cluster angles randomly # Step 8 in the small-scale parameter generation of ETSI TR 138.901 v17.0.0 # This is equivalent of shuffeling the cluster ray indices according to the coupling indices # ToDo: Re-check if this is correct shuffled_ray_aoa = np.take_along_axis( ray_aoa, self.__angle_coupling_indices[0, : cluster_powers.size, ...], axis=1 ) shuffled_ray_zoa = np.take_along_axis( ray_zoa, self.__angle_coupling_indices[1, : cluster_powers.size, ...], axis=1 ) shuffled_ray_aod = np.take_along_axis( ray_aod, self.__angle_coupling_indices[2, : cluster_powers.size, ...], axis=1 ) shuffled_ray_zod = np.take_along_axis( ray_zod, self.__angle_coupling_indices[3, : cluster_powers.size, ...], axis=1 ) # Generate cross-polarization power ratios (step 9) # Equation 7.5-21 in TR 138.901 v17.0.0 cross_polarization_factor = 10 ** ( -0.05 * self.__xpr_variable.sample( parameters_sample, self._cross_polarization_power_mean(state), self._cross_polarization_power_std(state), ) ) # Draw initial random phases (step 10) # A single 2x2 slice represents the jones matrix transforming the polarization of a single ray polarization_transformations = np.exp(2j * pi * self.__xpr_phase.sample(parameters_sample)) polarization_transformations[0, 1, ::] *= cross_polarization_factor polarization_transformations[1, 0, ::] *= cross_polarization_factor # Compute a delay offset accounting for the time of flight over the line of sight # Equations 7.6-43 and 7.6-44 in TR 138.901 v17.0.0 if self.__delay_normalization == DelayNormalization.TOF: delay_offset = distance_3d / speed_of_light else: delay_offset = 0.0 # Step 12 # Model pathloss # Table 7.4.1-1 in TR 138.901 v17.0.0 path_power_scale = db2lin(-self._pathloss_dB(state, parameters)) # Model oxygen power loss # 7.6-1 in TR 138.901 v17.0.0 if self.__oxygen_absorption: oxygen_loss_factor = np.interp( channel_state.carrier_frequency * 1e-9, self.__oxygen_loss[0, :], self.__oxygen_loss[1, :], ) oxygen_loss = 10 ** ( -1e-2 * oxygen_loss_factor * (distance_3d + speed_of_light * (cluster_delays + minimal_delay)) ) else: oxygen_loss = 1.0 # Compute the expected amplitude scale factor of the sample expected_loss = self.gain * oxygen_loss true_cluster_powers = cluster_powers * path_power_scale * expected_loss**2 return ClusterDelayLineSample( state.line_of_sight, K, shuffled_ray_aoa, shuffled_ray_zoa, shuffled_ray_aod, shuffled_ray_zod, delay_offset, cluster_delays, DS, true_cluster_powers, polarization_transformations, channel_state, ) def _reciprocal_sample( self, sample: ClusterDelayLineSample, state: LinkState ) -> ClusterDelayLineSample: # Check if an optimized resampling is possible, otherwise sample the realization again # Resample if the sample parameters don't match if ( state.bandwidth != sample.bandwidth or state.carrier_frequency != sample.carrier_frequency ): return self._sample(state) if np.any(state.transmitter.position != sample.receiver_pose.translation) or np.any( state.receiver.position != sample.transmitter_pose.translation ): return self._sample(state) return sample.reciprocal(state)
[docs] def to_HDF(self, group: Group) -> None: self.__state_realization.to_HDF(HDFSerializable._create_group(group, "state_realization")) group.attrs["gain"] = self.gain group.attrs["delay_normalization"] = self.__delay_normalization.value group.attrs["oxygen_absorption"] = self.__oxygen_absorption group.create_dataset("large_scale_parameters", data=self.__large_scale_parameters) group.create_dataset("zenith_spread_sign", data=self.__zenith_spread_sign) group.create_dataset("azimuth_spread_sign", data=self.__azimuth_spread_sign) group.create_dataset("angle_coupling_indices", data=self.__angle_coupling_indices)
CDLRT = TypeVar("CDLRT", bound=ClusterDelayLineRealization) """Type of a 3GPP Cluster Delay Line channel realization."""
[docs] class ClusterDelayLineBase(Channel[CDLRT, ClusterDelayLineSample], Generic[CDLRT, LSST]): """Base class for all 3GPP Cluster Delay Line channel models.""" __delay_normalization: DelayNormalization __oxygen_absorption: bool __expected_state: LSST | None def __init__( self, gain: float = 1.0, delay_normalization: DelayNormalization = DelayNormalization.ZERO, oxygen_absorption: bool = True, expected_state: LSST | None = None, **kwargs, ) -> None: """ Args: gain (float, optional): Linear gain factor a signal amplitude experiences when being propagated over this realization. :math:`1.0` by default. delay_normalization (DelayNormalization, optional): The delay normalization routine applied during channel sampling. oxygen_absorption (bool, optional): Model oxygen absorption in the channel. Enabled by default. expected_state (LSST, optional): Expected large-scale state of the channel. If `None`, the state is randomly generated during each sample of the channel's realization. \**kwargs: Additional keyword arguments passed to the base class. """ # Initialize base class Channel.__init__(self, gain, **kwargs) # Initialize class attributes self.delay_normalization = delay_normalization self.oxygen_absorption = oxygen_absorption self.expected_state = expected_state # Generate dual consistent random variables self.__state_generator = ConsistentGenerator(self) self.__parameter_generator = ConsistentGenerator(self) # Configure the random variable distributions self.__state_variable = self.__state_generator.uniform() num_clusters = self.max_num_clusters num_rays = self.max_num_rays self.__cluster_shadowing_log_variable = self.__parameter_generator.gaussian((num_clusters,)) self.__cluster_delays_variable = self.__parameter_generator.uniform((num_clusters,)) self.__azimuth_variation_variable = self.__parameter_generator.gaussian((num_clusters,)) self.__zenith_variation_variable = self.__parameter_generator.gaussian((num_clusters,)) self.__cross_polarization_power_variable = self.__parameter_generator.gaussian( (num_clusters, num_rays) ) self.__cross_polarization_phase_variable = self.__parameter_generator.uniform( (2, 2, num_clusters, num_rays) ) @property def delay_normalization(self) -> DelayNormalization: """The delay normalization routine applied during channel sampling.""" return self.__delay_normalization @delay_normalization.setter def delay_normalization(self, normalization: DelayNormalization) -> None: self.__delay_normalization = normalization @property def oxygen_absorption(self) -> bool: """Model oxygen absorption in the channel.""" return self.__oxygen_absorption @oxygen_absorption.setter def oxygen_absorption(self, absorption: bool) -> None: self.__oxygen_absorption = absorption @property def expected_state(self) -> LSST | None: """Expected large-scale state of the channel. If `None`, the state is randomly generated during each sample of the channel's realization. """ return self.__expected_state @expected_state.setter def expected_state(self, state: LSST | None) -> None: self.__expected_state = state @property @abstractmethod def max_num_clusters(self) -> int: """Maximum number of clusters a realization will generate.""" ... # pragma: no cover @property @abstractmethod def max_num_rays(self) -> int: """Maximum number of rays a realization will generate per cluster.""" ... # pragma: no cover @property @abstractmethod def _large_scale_correlations(self) -> np.ndarray: """Correlation factors between the large-scale parameters.""" ... # pragma: no cover @abstractmethod def _initialize_realization( self, state_generator: ConsistentGenerator, parameter_generator: ConsistentGenerator, parameters: ClusterDelayLineRealizationParameters, ) -> CDLRT: """Initialize a new realization of the channel model.""" ... # pragma: no cover @cache def __C_MM(self) -> np.ndarray: """Subroutine to compute the cross-correlation matrix of the large-scale parameters. Args: state (int): Large scale state index of the channel. Returns: Lower-triangular root of the cross-correlation matrix. """ C_MM = np.empty((3, 7, 7), dtype=np.float64) for c, C in enumerate(self._large_scale_correlations): C_MM_squared = np.array( [ # DS, ASD, ASA, ZSA, ZSD, K, SF [1.0, C[0], C[1], C[15], C[14], C[8], C[4]], # DS [C[0], 1.0, C[5], C[17], C[16], C[6], C[3]], # ASD [C[1], C[5], 1.0, C[19], C[18], C[7], C[2]], # ASA [C[15], C[17], C[19], 1.0, C[20], C[13], C[11]], # ZSA [C[14], C[16], C[18], C[20], 1.0, C[12], C[10]], # ZSD [C[8], C[6], C[7], C[13], C[12], 1.0, C[9]], # K [C[4], C[3], C[2], C[11], C[10], C[9], 1.0], # SF ], dtype=np.float64, ) # Section 4 of ETSI TR 138.901 v17.0.0 hints at using the cholesky decomposition # to enforce the expected cross-correlations between the large-scale parameters C_MM[c, ...] = np.linalg.cholesky(C_MM_squared) return C_MM def _realize(self) -> CDLRT: # Realize static random variables azimuth_spread_sign = self._rng.choice([-1.0, 1.0], size=(self.max_num_clusters,)) zenith_spread_sign = self._rng.choice([-1.0, 1.0], size=(self.max_num_clusters,)) # Random coupling indices for step 8 in ETSI TR 138 901 v17.0.0 angle_candidate_indices = np.arange(self.max_num_rays) angle_coupling_indices = np.array( [ [ self._rng.permutation(angle_candidate_indices) for _ in range(self.max_num_clusters) ] for _ in range(4) ] ) # Generate standard normal realizations of the large-scale parameters LSPs = self.__C_MM() @ self._rng.standard_normal(7) parameters = ClusterDelayLineRealizationParameters( self.__state_variable, self.delay_normalization, self.oxygen_absorption, LSPs, self.__cluster_shadowing_log_variable, self.__cluster_delays_variable, self.__azimuth_variation_variable, azimuth_spread_sign, self.__zenith_variation_variable, zenith_spread_sign, angle_coupling_indices, self.__cross_polarization_power_variable, self.__cross_polarization_phase_variable, ) return self._initialize_realization( self.__state_generator, self.__parameter_generator, parameters ) @abstractmethod def _recall_specific_realization( self, group: Group, parameters: ClusterDelayLineRealizationParameters ) -> CDLRT: ... # pragma: no cover
[docs] def recall_realization(self, group: Group) -> CDLRT: # Recall the static random variables LSPs = np.array(group["large_scale_parameters"], dtype=np.float64) azimuth_spread_sign = np.array(group["azimuth_spread_sign"], dtype=np.float64) zenith_spread_sign = np.array(group["zenith_spread_sign"], dtype=np.float64) angle_coupling_indices = np.array(group["angle_coupling_indices"], dtype=np.int_) parameters = ClusterDelayLineRealizationParameters( self.__state_variable, self.delay_normalization, self.oxygen_absorption, LSPs, self.__cluster_shadowing_log_variable, self.__cluster_delays_variable, self.__azimuth_variation_variable, azimuth_spread_sign, self.__zenith_variation_variable, zenith_spread_sign, angle_coupling_indices, self.__cross_polarization_power_variable, self.__cross_polarization_phase_variable, ) return self._recall_specific_realization(group, parameters)