# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum, IntEnum
from functools import cache, cached_property
from math import ceil, sin, cos, sqrt
from typing import Generator, Generic, Literal, List, Set, Tuple, TypeVar
import matplotlib.pyplot as plt
import numpy as np
from h5py import Group
from matplotlib.projections.polar import PolarAxes
from scipy.constants import pi, speed_of_light
from hermespy.core import (
AntennaMode,
ChannelStateInformation,
ChannelStateFormat,
Direction,
Executable,
HDFSerializable,
ScatterVisualization,
SignalBlock,
StemVisualization,
VisualizableAttribute,
)
from hermespy.core.visualize import VAT
from hermespy.tools import db2lin
from ..channel import (
Channel,
ChannelRealization,
ChannelSample,
LinkState,
ChannelSampleHook,
InterpolationMode,
)
from ..consistent import (
ConsistentGaussian,
ConsistentRealization,
ConsistentUniform,
ConsistentGenerator,
ConsistentSample,
)
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class DelayNormalization(Enum):
"""Normalization routine applied to a set of sampled delays.
Configuration option to :class:.ClusterDelayLineBase models.
"""
ZERO = 0
"""Normalize the delays, so that the minimal delay is zero."""
TOF = 1
"""The minimal delay is the time of flight between two devices."""
[docs]
class _PowerDelayVisualization(VisualizableAttribute[StemVisualization]):
"""Visualization of cluster delay sample power delay profile."""
__sample: ClusterDelayLineSample # Sample to be visualized
def __init__(self, sample: ClusterDelayLineSample) -> None:
self.__sample = sample
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> StemVisualization:
_axes: plt.Axes = axes[0, 0]
colors = self._get_color_cycle()
container = _axes.stem(
self.__sample.cluster_delays + self.__sample.delay_offset,
self.__sample.cluster_powers,
markerfmt="x",
)
container.stemlines.set_colors(colors)
container.markerline.set_color("white")
_axes.set_yscale("log")
_axes.xaxis.set_label_text("Delay")
_axes.yaxis.set_label_text("Power")
_axes.set_xlim(0, self.__sample.max_delay)
_axes.set_ylim(self.__sample.cluster_powers.min(), 1)
return StemVisualization(figure, axes, container)
def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None:
# Update stem markers
visualization.container.markerline.set_data(
self.__sample.cluster_delays + self.__sample.delay_offset, self.__sample.cluster_powers
)
# Update stem lines leading to markers
visualization.container.stemlines.set_paths(
[
np.array([[xx, 0.0], [xx, yy]])
for (xx, yy) in zip(
self.__sample.cluster_delays + self.__sample.delay_offset,
self.__sample.cluster_powers,
)
]
)
# Update axes limits
axes: plt.Axes = visualization.axes[0, 0]
axes.set_xlim(0, max(self.__sample.max_delay, axes.get_xlim()[1]))
axes.set_ylim(min(self.__sample.cluster_powers.min(), axes.get_ylim()[0]), 1.0)
@property
def title(self) -> str:
return "Cluster Power Delay Profile"
[docs]
class _AngleVisualization(VisualizableAttribute[ScatterVisualization]):
"""Visualization of the angles of arrival and departure."""
__sample: ClusterDelayLineSample # Sample to be visualized
def __init__(self, sample: ClusterDelayLineSample) -> None:
self.__sample = sample
def _axes_dimensions(self, **kwargs) -> Tuple[int, int]:
return (1, 2)
def __generate_scatter_locations(self, center_los=True) -> np.ndarray:
delta = self.__sample.transmitter_pose.translation - self.__sample.receiver_pose.translation
abs_delta = np.linalg.norm(delta, 2)
los_zenith = np.arccos((-delta[2] / abs_delta, delta[2] / abs_delta))
los_azimuth = np.arctan((-delta[1] / delta[0], delta[1] / delta[0]))
azimuth_offset = -los_azimuth if center_los else np.zeros(2)
zenith_offset = -los_zenith if center_los else np.zeros(2)
scatter_locations = np.empty(
(self.__sample.num_clusters, 2, self.__sample.num_rays, 2), dtype=np.float_
)
for i, (azimuth_clusters, zenith_clusters) in enumerate(
[
[self.__sample.azimuth_of_arrival, self.__sample.zenith_of_arrival],
[self.__sample.azimuth_of_departure, self.__sample.zenith_of_departure],
]
):
azimuth_clusters = azimuth_clusters + azimuth_offset[0]
zenith_clusters = abs(180 * (zenith_clusters + zenith_offset[0]) / pi) % 180
# azimuth_points[zenith_points > 90] -= pi
zenith_clusters[zenith_clusters > 90] = 90 - zenith_clusters[zenith_clusters > 90] % 90
scatter_locations[:, i, :, 0] = azimuth_clusters
scatter_locations[:, i, :, 1] = zenith_clusters
return scatter_locations
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> ScatterVisualization:
_axes = axes[0, :]
colors = self._get_color_cycle()
num_colors = len(colors)
markers = ["x", "+", "o"]
paths = np.empty((1, 2), dtype=np.object_)
scatter_locations = self.__generate_scatter_locations()
paths = np.empty((1, 2, 25), dtype=np.object_)
c = 0
for c, cluster in enumerate(scatter_locations):
paths[0, 0, c] = _axes[0].scatter(
cluster[0, :, 0],
cluster[0, :, 1],
marker=markers[int(c / num_colors)],
color=colors[c % num_colors],
)
paths[0, 1, c] = _axes[1].scatter(
cluster[1, :, 0],
cluster[1, :, 1],
marker=markers[int(c / num_colors)],
color=colors[c % num_colors],
)
# Add additional invisible clusters in case the number of clusters changes during redraws
for c in range(c + 1, 25):
paths[0, 0, c] = _axes[0].scatter(
[], [], marker=markers[int(c / num_colors)], color=colors[c % num_colors]
)
paths[0, 1, c] = _axes[1].scatter(
[], [], marker=markers[int(c / num_colors)], color=colors[c % num_colors]
)
ax: PolarAxes
for ax in _axes:
ax.set_rmax(90)
ax.grid(True)
ax.set_xlabel("Azimuth")
ax.set_ylabel("Zenith")
ax.set_rlabel_position(22.5)
_axes[0].set_title("Angles of Arrival")
_axes[1].set_title("Angles of Departure")
return ScatterVisualization(figure, _axes, paths)
def _update_visualization(self, visualization: ScatterVisualization, **kwargs) -> None:
scatter_locations = self.__generate_scatter_locations()
c = 0
for c, cluster in enumerate(scatter_locations):
visualization.paths[0, 0, c].set_offsets(cluster[0])
visualization.paths[0, 1, c].set_offsets(cluster[1])
# Hide additional clusters
for c in range(c + 1, 25):
visualization.paths[0, 0, c].set_offsets(np.empty((0, 2), dtype=np.float_))
visualization.paths[0, 1, c].set_offsets(np.empty((0, 2), dtype=np.float_))
@property
def title(self) -> str:
return "Ray Angles"
[docs]
class ClusterDelayLineSample(ChannelSample):
"""Sample of a 3GPP Cluster Delay Line channel model."""
# Sub-cluster partitions for the three strongest clusters
subcluster_indices: List[List[int]] = [
[0, 1, 2, 3, 4, 5, 6, 7, 18, 19],
[8, 9, 10, 11, 16, 17],
[12, 13, 14, 15],
]
__line_of_sight: bool
__rice_factor: float
__azimuth_of_arrival: np.ndarray
__zenith_of_arrival: np.ndarray
__azimuth_of_departure: np.ndarray
__zenith_of_departure: np.ndarray
__delay_offset: float
__cluster_delays: np.ndarray
__cluster_delay_spread: float
__cluster_powers: np.ndarray
__polarization_transformations: np.ndarray
__max_delay: float
__power_delay_visualization: _PowerDelayVisualization
__angle_visualization: _AngleVisualization
def __init__(
self,
line_of_sight: bool,
rice_factor: float,
azimuth_of_arrival: np.ndarray,
zenith_of_arrival: np.ndarray,
azimuth_of_departure: np.ndarray,
zenith_of_departure: np.ndarray,
delay_offset: float,
cluster_delays: np.ndarray,
cluster_delay_spread: float,
cluster_powers: np.ndarray,
polarization_transformations: np.ndarray,
state: LinkState,
) -> None:
# Initialize base class
ChannelSample.__init__(self, state)
# Initialize attributes
self.__line_of_sight = line_of_sight
self.__rice_factor = rice_factor
self.__azimuth_of_arrival = azimuth_of_arrival
self.__zenith_of_arrival = zenith_of_arrival
self.__azimuth_of_departure = azimuth_of_departure
self.__zenith_of_departure = zenith_of_departure
self.__delay_offset = delay_offset
self.__cluster_delays = cluster_delays
self.__cluster_delay_spread = cluster_delay_spread
self.__cluster_powers = cluster_powers
self.__polarization_transformations = polarization_transformations
self.__power_delay_visualization = _PowerDelayVisualization(self)
self.__angle_visualization = _AngleVisualization(self)
# Infer additional parameters
self.__num_clusters = cluster_delays.shape[0]
self.__num_rays = azimuth_of_arrival.shape[1]
self.__max_delay = (
max(np.max(cluster_delays[:3] + cluster_delay_spread * 2.56), cluster_delays.max())
+ self.__delay_offset
)
@property
def line_of_sight(self) -> bool:
"""Does the realization include direct line of sight between the two devices?"""
return self.__line_of_sight
@property
def rice_factor(self) -> float:
"""Rice factor."""
return self.__rice_factor
@property
def azimuth_of_arrival(self) -> np.ndarray:
"""Azimuth of arrival angles in radians."""
return self.__azimuth_of_arrival
@property
def zenith_of_arrival(self) -> np.ndarray:
"""Zenith of arrival angles in radians."""
return self.__zenith_of_arrival
@property
def azimuth_of_departure(self) -> np.ndarray:
"""Azimuth of departure angles in radians."""
return self.__azimuth_of_departure
@property
def zenith_of_departure(self) -> np.ndarray:
"""Zenith of departure angles in radians."""
return self.__zenith_of_departure
@property
def cluster_delays(self) -> np.ndarray:
"""Cluster delays in seconds."""
return self.__cluster_delays
@property
def cluster_delay_spread(self) -> float:
"""Cluster delay spread in seconds."""
return self.__cluster_delay_spread
@property
def cluster_powers(self) -> np.ndarray:
"""Cluster powers."""
return self.__cluster_powers
@property
def polarization_transformations(self) -> np.ndarray:
"""Polarization transformations."""
return self.__polarization_transformations
@property
def num_clusters(self) -> int:
"""Number of realized scatter clusters."""
return self.__num_clusters
@property
def num_rays(self) -> int:
"""Number of rays within each cluster."""
return self.__num_rays
@property
def max_delay(self) -> float:
"""Maximum expected delay in seconds."""
return self.__max_delay
@property
def delay_offset(self) -> float:
"""Delay offset in seconds."""
return self.__delay_offset
@property
def expected_energy_scale(self) -> float:
return float(np.sum(self.cluster_powers))
def __ray_impulse_generator(
self, sampling_rate: float, num_samples: int, center_frequency: float
) -> Generator[Tuple[np.ndarray, np.ndarray, float], None, None]:
"""Sequential generation of individual ray impulse responses.
Subroutine of :meth:`._propagate`.
Args:
sampling_rate (float):
Sampling rate in Hertz.
num_samples (int):
Number of samples to generate.
center_frequency (float):
Center frequency in Hertz.
"""
# First summand scaling of equation 7.5-30 in ETSI TR 138 901 v17.0.0
rice_factor_lin = db2lin(self.rice_factor)
nlos_scale = (1 + rice_factor_lin) ** -0.5 if self.line_of_sight else 1.0
# Compute the number of clusters, considering the first two clusters get split into 3 partitions
num_split_clusters = min(2, self.num_clusters)
virtual_num_clusters = 3 * num_split_clusters + max(0, self.num_clusters - 2)
# Prepare the cluster delays, equation 7.5-26
subcluster_delays: np.ndarray = np.repeat(
self.cluster_delays[:num_split_clusters, None], 3, axis=1
) + self.cluster_delay_spread * np.array([0.0, 1.28, 2.56])
virtual_cluster_delays = np.concatenate(
(subcluster_delays.flatten(), self.cluster_delays[num_split_clusters:])
)
# Wavelength factor
wavelength_factor = center_frequency / speed_of_light
relative_velocity = self.receiver_velocity - self.transmitter_velocity
fast_fading = wavelength_factor * np.arange(num_samples) / sampling_rate
# Compute the cluster propagation parameters
for subcluster_idx in range(0, virtual_num_clusters):
cluster_idx = int(subcluster_idx / 3) if subcluster_idx < 6 else subcluster_idx - 4
ray_indices = (
ClusterDelayLineSample.subcluster_indices[cluster_idx]
if cluster_idx < num_split_clusters
else range(self.num_rays)
)
delay: float = virtual_cluster_delays[subcluster_idx]
for aoa, zoa, aod, zod, jones in zip(
self.azimuth_of_arrival[cluster_idx, ray_indices],
self.zenith_of_arrival[cluster_idx, ray_indices],
self.azimuth_of_departure[cluster_idx, ray_indices],
self.zenith_of_departure[cluster_idx, ray_indices],
self.polarization_transformations[:, :, cluster_idx, ray_indices].transpose(
2, 0, 1
),
):
# Compute directive unit vectors
tx_direction = Direction.From_Spherical(aod, zod)
rx_direction = Direction.From_Spherical(aoa, zoa)
# Combination of Equation 7.5-23, 7.5.24 and 7.5.28
tx_array_response = self.transmitter_antennas.cartesian_array_response(
center_frequency, tx_direction.view(np.ndarray), "global", AntennaMode.TX
).conj()
rx_array_response = self.receiver_antennas.cartesian_array_response(
center_frequency, rx_direction.view(np.ndarray), "global", AntennaMode.RX
)
# Equation 7.5-22 and 7.5.28 in ETSI TR 138 901 v17.0.0
channel: np.ndarray = (
rx_array_response
@ jones
@ tx_array_response.T
* (sqrt(self.cluster_powers[cluster_idx] / self.num_rays) * nlos_scale)
)
wave_vector = np.array(
[cos(aoa) * sin(zoa), sin(aoa) * sin(zoa), cos(zoa)], dtype=float
)
impulse: np.ndarray = np.exp(
np.inner(wave_vector, relative_velocity) * fast_fading * 2j * pi
)
yield channel, impulse, delay
# If line of sight is enabled, yield an additional parameter touple for the line of sight component
if self.__line_of_sight:
device_vector = self.receiver_pose.translation - self.transmitter_pose.translation
los_distance = np.linalg.norm(device_vector, 2)
rx_wave_vector = device_vector / los_distance
# Equation 7.5-29
tx_array_response = self.transmitter_antennas.cartesian_array_response(
center_frequency, self.receiver_pose.translation, "global", AntennaMode.TX
).conj()
rx_array_response = self.receiver_antennas.cartesian_array_response(
center_frequency, self.transmitter_pose.translation, "global", AntennaMode.RX
)
# Second summand of equation 7.5-30, including equation 7.5-29 of ETSI TR 138 901 v17.0.0
los_delay: float = self.cluster_delays[0]
los_channel: np.ndarray = (
rx_array_response
@ np.array([[1, 0], [-1, 0]])
@ tx_array_response.T
* (rice_factor_lin / (1 + rice_factor_lin)) ** 0.5
)
los_impulse: np.ndarray = np.exp(-2j * pi * los_distance * wavelength_factor) * np.exp(
np.inner(rx_wave_vector, relative_velocity) * fast_fading * 2j * pi
)
yield los_channel, los_impulse, los_delay
def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
max_delay_in_samples = ceil((self.max_delay) * self.bandwidth)
propagated_samples = np.zeros(
(self.num_receive_antennas, signal.num_samples + max_delay_in_samples),
dtype=np.complex_,
)
# Prepare the optimal einsum path ahead of time for faster execution
channel = np.zeros(
(self.num_receive_antennas, self.num_transmit_antennas), dtype=np.complex_
)
impulse = np.zeros(signal.num_samples, dtype=np.complex_)
einsum_subscripts = "ij,jk,k->ik"
einsum_path = np.einsum_path(
einsum_subscripts, channel, signal, impulse, optimize="optimal"
)[0]
for channel, impulse, delay in self.__ray_impulse_generator(
self.bandwidth, signal.num_samples, self.carrier_frequency
):
if interpolation == InterpolationMode.NEAREST:
delay_in_samples = int((delay + self.delay_offset) * self.bandwidth)
propagated_samples[
:, delay_in_samples : delay_in_samples + signal.num_samples
] += np.einsum(einsum_subscripts, channel, signal, impulse, optimize=einsum_path)
return SignalBlock(propagated_samples, signal._offset)
[docs]
def state(
self,
num_samples: int,
max_num_taps: int,
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST,
) -> ChannelStateInformation:
max_delay_in_samples = min(max_num_taps, ceil(self.max_delay * self.bandwidth))
raw_state = np.zeros(
(
self.num_receive_antennas,
self.num_transmit_antennas,
num_samples,
1 + max_delay_in_samples,
),
dtype=np.complex_,
)
for channel, impulse, delay in self.__ray_impulse_generator(
self.bandwidth, num_samples, self.carrier_frequency
):
# Compute the delay in samples
delay_tap_index = int((delay + self.delay_offset) * self.bandwidth)
if delay_tap_index >= max_num_taps:
continue # pragma: no cover
# Compute the resulting channel state
raw_state[:, :, :, delay_tap_index] += np.einsum(
"ij,k->ijk", channel, impulse, optimize=False
)
return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)
@property
def plot_power_delay(self) -> _PowerDelayVisualization:
return self.__power_delay_visualization
@property
def plot_angles(self) -> _AngleVisualization:
return self.__angle_visualization
[docs]
def plot_rays(self, title: str | None = None) -> plt.Figure:
with Executable.style_context():
figure, axis = plt.subplots(subplot_kw={"projection": "3d"})
colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
magnitude = np.linalg.norm(
self.transmitter_pose.translation - self.receiver_pose.translation
)
for i, (aoa, zoa, aod, zod) in enumerate(
zip(
self.azimuth_of_arrival,
self.zenith_of_arrival,
self.azimuth_of_departure,
self.zenith_of_departure,
)
):
tx_vectors = np.array(
[np.sin(zod) * np.cos(aod), np.sin(zod) * np.sin(aod), np.cos(zod)]
)
rx_vectors = -np.array(
[np.sin(zoa) * np.cos(aoa), np.sin(zoa) * np.sin(aoa), np.cos(zoa)]
)
for vectors, origin in zip(
(tx_vectors, rx_vectors),
(self.transmitter_pose.translation, self.receiver_pose.translation),
):
stops = vectors * magnitude + origin[:, np.newaxis]
for stop in stops.T:
axis.plot(
[origin[0], stop[0]],
[origin[1], stop[1]],
[origin[2], stop[2]],
color=colors[i % len(colors)],
)
"""start_arrival = self.transmitter_position
stop_arrival = self.transmitter_position + arrival
start_departure = self.receiver_position
stop_departure = self.receiver_position + departure"""
axis.set_zlim(-magnitude, magnitude)
axis.scatter(*self.transmitter_pose.translation, marker="o", color="w")
axis.text(*self.transmitter_pose.translation, "Tx", zorder=1)
axis.scatter(*self.receiver_pose.translation, marker="o", color="w")
axis.text(*self.receiver_pose.translation, "Rx", zorder=1)
figure.suptitle("Rays" if title is None else title)
return figure
@staticmethod
def _angular_spread(angles: np.ndarray, powers: np.ndarray) -> float:
"""Compute the angular spread of a set of data.
Implements equation (A-1) of :footcite:t:`3GPP:TR38901`.
Args:
angles (np.ndarray):
Numpy array of angles in radians for all paths of propagation.
powers (np.ndarray):
Numpy angles of power for all paths of propagation.
Returns: The angluar spread in radians.
Raises:
ValueError: If the dimensions of `angles` and `powers` don't match.
"""
if angles.shape != powers.shape:
raise ValueError("Dimensions of angles and powers don't match")
summed_power = np.sum(powers, axis=None)
weighted_angle_sum = np.sum(np.exp(1j * angles) * powers, axis=None)
spread = np.sqrt(-2 * np.log(np.abs(weighted_angle_sum / summed_power)))
return spread
@cached_property
def azimuth_arrival_spread(self) -> float:
"""Spread of azimuth of arrival angles.
Returns: Angular spread in radians.
"""
return self._angular_spread(
self.azimuth_of_arrival,
np.repeat(self.cluster_powers[:, None], self.azimuth_of_arrival.shape[1], axis=1),
)
@cached_property
def azimuth_departure_spread(self) -> float:
"""Spread of azimuth of departure angles.
Returns: Angular spread in radians.
"""
return self._angular_spread(
self.azimuth_of_departure,
np.repeat(self.cluster_powers[:, None], self.azimuth_of_departure.shape[1], axis=1),
)
@cached_property
def zenith_arrival_spread(self) -> float:
"""Spread of zenith of arrival angles.
Returns: Angular spread in radians.
"""
return self._angular_spread(
self.zenith_of_arrival,
np.repeat(self.cluster_powers[:, None], self.zenith_of_arrival.shape[1], axis=1),
)
@cached_property
def zenith_departure_spread(self) -> float:
"""Spread of zenith of departure angles.
Returns: Angular spread in radians.
"""
return self._angular_spread(
self.zenith_of_departure,
np.repeat(self.cluster_powers[:, None], self.zenith_of_departure.shape[1], axis=1),
)
[docs]
def reciprocal(self, state: LinkState) -> ClusterDelayLineSample:
"""Generate a reciprocal sample of the current sample.
Args:
state (ChannelState):
State of the reciprocal sample.
Returns: A reciprocal sample.
"""
return ClusterDelayLineSample(
self.line_of_sight,
self.rice_factor,
self.azimuth_of_departure,
self.zenith_of_departure,
self.azimuth_of_arrival,
self.zenith_of_arrival,
self.delay_offset,
self.cluster_delays,
self.cluster_delay_spread,
self.cluster_powers,
self.polarization_transformations,
state,
)
[docs]
class LargeScaleState(object):
"""Large scale state of a 3GPP Cluster Delay Line channel model."""
@property
@abstractmethod
def line_of_sight(self) -> bool:
"""Direct line of sight between two linked devices."""
... # pragma: no cover
@property
@abstractmethod
def value(self) -> int:
"""Integer value of the state."""
... # pragma: no cover
class LOSState(LargeScaleState, IntEnum):
"""Line of sight state of the cluster delay line model."""
LOS = 0
"""Direct line of sight connection without blockage between
base station and user terminal.
"""
NLOS = 1
"""Physical blockage between base station and user terminal."""
@property
def line_of_sight(self) -> bool:
return self == LOSState.LOS
@property
def value(self) -> int:
return int(self)
class O2IState(LargeScaleState, IntEnum):
"""O2I state of the cluster delay line model."""
LOS = 0
"""Direct line of sight connection without blockage between
base station and user terminal.
"""
NLOS = 1
"""Physical blockage between base station and user terminal."""
O2I = 2
"""User terminal loacated inside a building."""
@property
def line_of_sight(self) -> bool:
return self == LOSState.LOS
@property
def value(self) -> int:
return int(self)
LSST = TypeVar("LSST", bound=LargeScaleState)
"""Type variable for large scale state types."""
class ClusterDelayLineSampleParameters(object):
"""Data class for initialization parameters of a 3GPP Cluster Delay Line channel model sample."""
__carrier_frequency: float
__distance_3d: float
__distance_2d: float
__base_height: float
__terminal_height: float
def __init__(
self,
carrier_frequency: float,
distance_3d: float,
distance_2d: float,
base_height: float,
terminal_height: float,
) -> None:
"""
Args:
carrier_frequency (float):
Carrier frequency in Hz.
distance_3d (float):
Distance between the base station and the user terminal in meters.
distance_2d (float):
Horizontal distance between the base station and the user terminal in meters.
base_height (float):
Height of the base station in meters.
terminal_height (float):
Height of the user terminal in meters.
"""
self.__carrier_frequency = carrier_frequency
self.__distance_3d = distance_3d
self.__distance_2d = distance_2d
self.__base_height = base_height
self.__terminal_height = terminal_height
@property
def carrier_frequency(self) -> float:
"""Carrier frequency in Hz."""
return self.__carrier_frequency
@property
def distance_3d(self) -> float:
"""Distance between the base station and the user terminal in meters."""
return self.__distance_3d
@property
def distance_2d(self) -> float:
"""Horizontal distance between the base station and the user terminal in meters."""
return self.__distance_2d
@property
def base_height(self) -> float:
"""Height of the base station in meters."""
return self.__base_height
@property
def terminal_height(self) -> float:
"""Height of the user terminal in meters."""
return self.__terminal_height
@dataclass
class ClusterDelayLineRealizationParameters(object):
"""Data class for initialization parameters of a 3GPP Cluster Delay Line channel model realization."""
state_variable: ConsistentUniform
delay_normalization: DelayNormalization
oxygen_absorption: bool
large_scale_parameters: np.ndarray
"""Large scale parameters of the channel model.
Numpy vector with the following elements with the elements containing
samples of a standard normal distribution with specific cross-correlations.
"""
cluster_shadowing_log_variable: ConsistentGaussian
cluster_delays_variable: ConsistentUniform
azimuth_angle_variation_variable: ConsistentGaussian
azimuth_spread_sign: np.ndarray
zenith_angle_variation_variable: ConsistentGaussian
zenith_spread_sign: np.ndarray
angle_coupling_indices: np.ndarray
cross_polarization_power_variable: ConsistentGaussian
cross_polarization_phase_variable: ConsistentUniform
[docs]
class ClusterDelayLineRealization(ChannelRealization[ClusterDelayLineSample], Generic[LSST]):
"""Realization of a 3GPP Cluster Delay Line channel model."""
# Cluster scaling factors for azimuth angles
# Table 7.5-2 in ETSI TR 138 901 v17.0.0
__azimuth_scaling_factors = np.array(
[
[4, 0.779],
[5, 0.86],
[8, 1.018],
[10, 1.090],
[11, 1.123],
[12, 1.146],
[14, 1.19],
[15, 1.211],
[16, 1.226],
[19, 1.273],
[20, 1.289],
[25, 1.358],
],
dtype=np.float_,
)
# Cluster scaling factors for zenith angles
# Table 7.5-4 in ETSI TR 138 901 v17.0.0
__zenith_scaling_factors = np.array(
[
[8, 0.889],
[10, 0.957],
[11, 1.031],
[12, 1.104],
[15, 1.1088],
[19, 1.184],
[20, 1.178],
[25, 1.282],
],
dtype=np.float_,
)
# Ray offset angles
# Table 7.5-3 in ETSI TR 138 901 v17.0.0
_ray_offset_angles = np.array(
[
0.0447,
-0.0447,
0.1413,
-0.1413,
0.2492,
-0.2492,
0.3715,
-0.3715,
0.5129,
-0.5129,
0.6797,
-0.6797,
0.8844,
-0.8844,
1.1481,
-1.1481,
1.5195,
-1.5195,
2.1551,
-2.1551,
],
dtype=np.float_,
)
# Oxygen absorption factors
# Table 7.6-1 in ETSI TR 138 901 v17.0.0
# First column: Carrier frequency in GHz
# Second column: Oxygen absorption factor in dB/km
__oxygen_loss = np.array(
[
[52.0, 0.0],
[53.0, 1.0],
[54.0, 2.0],
[55.0, 4.0],
[65.0, 6.6],
[75.0, 9.7],
[58.0, 12.6],
[59.0, 14.6],
[60.0, 15.0],
[61.0, 14.6],
[62.0, 14.3],
[63.0, 10.5],
[64.0, 6.8],
[65.0, 3.9],
[66.0, 1.9],
[67.0, 1.0],
[68.0, 0.0],
],
dtype=np.float_,
)
def __init__(
self,
expected_state: LSST | None,
state_realization: ConsistentRealization,
parameters: ClusterDelayLineRealizationParameters,
sample_hooks: Set[ChannelSampleHook[ClusterDelayLineSample]],
gain: float = 1.0,
) -> None:
"""
Args:
expected_state (LSST | None):
Expected large-scale state of the channel.
If not specified, the large-scale state is randomly generated.
state_realization (ConsistentRealization):
Realization of the large scale state.
parameters (ClusterDelayLineRealizationParameters):
General parameters of the cluster delay line realization.
sample_hooks (Set[ChannelSampleHook[ClusterDelayLineSample]]):
Hooks for callback functions during the sample generation.
gain (float, optional):
Linear amplitude scaling factor if signals propagated over the channel.
"""
# Initialize base class
ChannelRealization.__init__(self, sample_hooks, gain)
# Initialize attributes
self.__expected_state = expected_state
self.__state_realization = state_realization
self.__state_variable = parameters.state_variable
self.__delay_normalization = parameters.delay_normalization
self.__oxygen_absorption = parameters.oxygen_absorption
self.__large_scale_parameters = parameters.large_scale_parameters
self.__cluster_shadowing_log_variable = parameters.cluster_shadowing_log_variable
self.__cluster_delays_variable = parameters.cluster_delays_variable
self.__azimuth_angle_variation_variable = parameters.azimuth_angle_variation_variable
self.__azimuth_spread_sign = parameters.azimuth_spread_sign
self.__zenith_angle_variation_variable = parameters.zenith_angle_variation_variable
self.__zenith_spread_sign = parameters.zenith_spread_sign
self.__angle_coupling_indices = parameters.angle_coupling_indices
self.__xpr_variable = parameters.cross_polarization_power_variable
self.__xpr_phase = parameters.cross_polarization_phase_variable
@property
def expected_state(self) -> LSST | None:
"""Expected state of the channel realization.
If None, the state will be randomly drawn from the state variable for each sample.
"""
return self.__expected_state
@abstractmethod
def _pathloss_dB(self, state: LSST, parameters: ClusterDelayLineSampleParameters) -> float:
"""Power loss during signal propagation in dB.
Args:
state (LSST):
Large scale state of the channel.
parameters (ClusterDelayLineSampleParameters):
Parameters of the channel realization.
Returns: Pathloss in dB.
"""
... # pragma: no cover
@abstractmethod
def _small_scale_realization(self, state: LSST) -> ConsistentRealization:
"""Fetch the random realization of the small scale state.
Args:
state (LSST):
Large scale state of the channel.
Returns:
ConsistentRealization: Realization of the small scale state.
"""
... # pragma: no cover
@abstractmethod
def _sample_large_scale_state(
self, state_variable_realization: float, parameters: ClusterDelayLineSampleParameters
) -> LSST:
"""Sample the large scale state of the channel.
Returns:
LSST: Large scale state of the channel.
"""
... # pragma: no cover
@abstractmethod
def _delay_spread_mean(self, state: LSST, carrier_frequency: float) -> float:
"""Mean of the cluster delay spread.
The spread realization and its mean are referred to as
:math:`\\mathrm{DS}` and :math:`\\mu_{\\mathrm{lgDS}}` within the the standard, respectively.
Returns:
float: Mean delay spread in seconds.
"""
... # pragma: no cover
@abstractmethod
def _delay_spread_std(self, state: LSST, carrier_frequency: float) -> float:
"""Standard deviation of the cluster delay spread.
The spread realization and its standard deviation are referred to as
:math:`\\mathrm{DS}` and :math:`\\sigma_{\\mathrm{lgDS}}` within the the standard, respectively.
Returns:
float: Delay spread standard deviation in seconds.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _aod_spread_mean(self, state: LSST, carrier_frequency: float) -> float:
"""Mean of the Azimuth Angle-of-Departure spread.
The spread realization and its mean are referred to as
:math:`\\mathrm{ASD}` and :math:`\\mu_{\\mathrm{lgASD}}` within the the standard, respectively.
Returns:
float: Mean angle spread in seconds
"""
... # pragma: no cover
@abstractmethod
def _aod_spread_std(self, state: LSST, carrier_frequency: float) -> float:
"""Standard deviation of the Azimuth Angle-of-Departure spread.
The spread realization and its standard deviation are referred to as
:math:`\\mathrm{ASD}` and :math:`\\sigma_{\\mathrm{lgASD}}` within the the standard, respectively.
Returns:
float: Angle spread standard deviation in seconds.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _aoa_spread_mean(self, state: LSST, carrier_frequency: float) -> float:
"""Mean of the Azimuth Angle-of-Arriaval spread.
The spread realization and its mean are referred to as
:math:`\\mathrm{ASA}` and :math:`\\mu_{\\mathrm{lgASA}}` within the the standard, respectively.
Returns:
float: Mean angle spread in seconds
"""
... # pragma: no cover
@abstractmethod
def _aoa_spread_std(self, state: LSST, carrier_frequency: float) -> float:
"""Standard deviation of the Azimuth Angle-of-Arrival spread.
The spread realization and its standard deviation are referred to as
:math:`\\mathrm{ASA}` and :math:`\\sigma_{\\mathrm{lgASA}}` within the the standard, respectively.
Returns:
float: Angle spread standard deviation in seconds.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _zoa_spread_mean(self, state: LSST, carrier_frequency: float) -> float:
"""Mean of the Zenith Angle-of-Arriaval spread.
The spread realization and its mean are referred to as
:math:`\\mathrm{ZSA}` and :math:`\\mu_{\\mathrm{lgZSA}}` within the the standard, respectively.
Returns:
float: Mean angle spread in seconds
"""
... # pragma: no cover
@abstractmethod
def _zoa_spread_std(self, state: LSST, carrier_frequency: float) -> float:
"""Standard deviation of the Zenith Angle-of-Arrival spread.
The spread realization and its standard deviation are referred to as
:math:`\\mathrm{ZSA}` and :math:`\\sigma_{\\mathrm{lgZSA}}` within the the standard, respectively.
Returns:
float: Angle spread standard deviation in seconds.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
###############################
# ToDo: Shadow fading function
###############################
@abstractmethod
def _rice_factor_mean(self) -> float:
"""Mean of the rice factor distribution.
The rice factor realization and its mean are referred to as
:math:`K` and :math:`\\mu_K` within the the standard, respectively.
Returns:
float: Rice factor mean in dB.
"""
... # pragma: no cover
@abstractmethod
def _rice_factor_std(self) -> float:
"""Standard deviation of the rice factor distribution.
The rice factor realization and its standard deviation are referred to as
:math:`K` and :math:`\\sigma_K` within the the standard, respectively.
Returns:
float: Rice factor standard deviation in dB.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _delay_scaling(self, state: LSST) -> float:
"""Delay scaling proportionality factor
Referred to as :math:`r_{\\tau}` within the standard.
Returns:
float: Scaling factor.
Raises:
ValueError:
If scaling factor is smaller than one.
"""
... # pragma: no cover
@abstractmethod
def _cross_polarization_power_mean(self, state: LSST) -> float:
"""Mean of the cross-polarization power.
The cross-polarization power and its mean are referred to as
:math:`\\mathrm{XPR}` and :math:`\\mu_{\\mathrm{XPR}}` within the the standard, respectively.
Returns:
float: Mean power in dB.
"""
... # pragma: no cover
@abstractmethod
def _cross_polarization_power_std(self, state: LSST) -> float:
"""Standard deviation of the cross-polarization power.
The cross-polarization power and its standard deviation are referred to as
:math:`\\mathrm{XPR}` and :math:`\\sigma_{\\mathrm{XPR}}` within the the standard, respectively.
Returns:
float: Power standard deviation in dB.
Raises:
ValueError: If the standard deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _num_clusters(self, state: LSST) -> int:
"""Number of clusters in the channel realization."""
... # pragma: no cover
@abstractmethod
def _cluster_aod_spread(self, state: LSST) -> float:
"""Azimuth Angle-of-Departure spread within an individual cluster.
Referred to as :math:`c_{ASD}` within the standard.
Returns:
float: Angle spread in degrees.
Raises:
ValueError: If spread is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _cluster_aoa_spread(self, state: LSST) -> float:
"""Azimuth Angle-of-Arrival spread within an individual cluster.
Referred to as :math:`c_{ASA}` within the standard.
Returns:
float: Angle spread in degrees.
Raises:
ValueError: If spread is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _cluster_zoa_spread(self, state: LSST) -> float:
"""Zenith Angle-of-Arrival spread within an individual cluster.
Referred to as :math:`c_{ZSA}` within the standard.
Returns:
float: Angle spread in degrees.
Raises:
ValueError: If spread is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _cluster_shadowing_std(self, state: LSST) -> float:
"""Standard deviation of the cluster shadowing.
Referred to as :math:`\\zeta` within the the standard.
Returns:
float: Cluster shadowing standard deviation.
Raises:
ValueError: If the deviation is smaller than zero.
"""
... # pragma: no cover
@abstractmethod
def _zod_spread_mean(
self, state: LSST, parameters: ClusterDelayLineSampleParameters
) -> float: ... # pragma: no cover
@abstractmethod
def _zod_spread_std(
self, state: LSST, parameters: ClusterDelayLineSampleParameters
) -> float: ... # pragma: no cover
@abstractmethod
def _zod_offset(self, state: LSST, parameters: ClusterDelayLineSampleParameters) -> float:
"""Offset between Zenith Angle-of-Arrival and Angle-of-Departure.
The offset is referred to as :math:`\\mu_{\\mathrm{offset,ZOD}}` within the standard.
Returns:
float: The offset in degrees.
"""
... # pragma: no cover
def _sample_large_scale_parameters(
self, state: LSST, parameters: ClusterDelayLineSampleParameters
) -> Tuple[float, ...]:
"""Sample large-scale parameters.
Representation of step 4 in ETSI TR 138.901 v17.0.0.
Args:
state (LSST): Large scale state of the channel.
parameters (ClusterDelayLineSampleParameters): Parameters of the channel realization.
Returns: Tuple of large-scale parameters.
"""
mean = np.array(
[
self._delay_spread_mean(state, parameters.carrier_frequency),
self._aod_spread_mean(state, parameters.carrier_frequency),
self._aoa_spread_mean(state, parameters.carrier_frequency),
self._zoa_spread_mean(state, parameters.carrier_frequency),
self._zod_spread_mean(state, parameters),
self._rice_factor_mean(),
0.0,
],
dtype=np.float_,
)
std = np.array(
[
self._delay_spread_std(state, parameters.carrier_frequency),
self._aod_spread_std(state, parameters.carrier_frequency),
self._aoa_spread_std(state, parameters.carrier_frequency),
self._zoa_spread_std(state, parameters.carrier_frequency),
self._zod_spread_std(state, parameters),
self._rice_factor_std(),
1.0,
],
dtype=np.float_,
)
# Enforce the proper cross-correlations between the large-scale parameters
# The routine is described in section 3.3.1 of the WINNERII final report
S = mean + std * self.__large_scale_parameters[state.value, ...]
# Transform to the proper distribution
DS = 10 ** S[0]
ASD = min(10 ** S[1], 104)
ASA = min(10 ** S[2], 104)
ZSA = min(10 ** S[3], 52)
ZSD = min(10 ** S[4], 52)
K = S[5]
SF = S[6]
return DS, ASD, ASA, ZSA, ZSD, K, SF
def _cluster_delays(
self,
consistent_sample: ConsistentSample,
state: LSST,
delay_spread: float,
rice_factor: float,
) -> Tuple[np.ndarray, np.ndarray, float]:
"""Compute a single sample set of normalized cluster delays.
A single cluster delay is referred to as :math:`\\tau_n` within the the standard.
Implementation of equations 7.5-1 to 7.5-4 in TR 138.901 v17.0.0.
Args:
consistent_sample (ConsistentSample):
Sample of the consistent random process.
state (LSST):
Large scale state of the channel.
line_of_sight (bool):
Is the realization line of sight?
delay_spread (float):
Delay spread in seconds.
Denoted by :math:`\\mathrm{DS}` in the standard.
rice_factor (float):
Rice factor K in dB.
Denoted by :math:`K` in the standard.
Returns:
Tuple of
- Raw delay samples
- True delays
- Minimal delay
"""
# Equation 7.5-1 in TR 138.901 v17.0.0
r_tau = self._delay_scaling(state)
X_n = self.__cluster_delays_variable.sample(consistent_sample)[: self._num_clusters(state)]
unsorted_delays = -r_tau * delay_spread * np.log(X_n)
# Equation 7.5-2 in TR 138.901 v17.0.0
# Sort the delays in ascending order
sorted_delays = np.sort(unsorted_delays)
minimal_delay = sorted_delays[0]
sorted_delays -= minimal_delay
# In case of line of sight, scale the delays by the appropriate K-factor
# Equations 7.5-3 and 7.5-4 in TR 138.901 v17.0.0
scaled_delays = sorted_delays.copy()
if state.line_of_sight:
rice_scale = (
0.775 - 0.0433 * rice_factor + 2e-4 * rice_factor**2 + 17e-5 * rice_factor**3
)
scaled_delays /= rice_scale
minimal_delay = 0.0 # This is required for the oxygen absorption computation
# Return the raw and scaled delays, since they are both required for further processing
return sorted_delays, scaled_delays, minimal_delay
def _cluster_powers(
self,
consistent_sample: ConsistentSample,
state: LSST,
delay_spread: float,
delays: np.ndarray,
rice_factor: float,
) -> np.ndarray:
"""Compute a single sample set of normalized cluster power factors from delays.
A single cluster power factor is referred to as :math:`P_n` within the the standard.
Args:
consistent_sample (ConsistentSample):
Sample of the consistent random process.
state (LSST):
Large scale state of the channel.
delay_spread (float):
Delay spread in seconds.
Denoted by :math:`\\mathrm{DS}` in the standard.
delays (np.ndarray):
Vector of cluster delays.
Denoted by :math:`\\tau` in the standard.
rice_factor (float):
Rice factor in dB.
Denoted by :math:`K` in the standard.
Returns:
np.ndarray:
Vector of cluster power scales.
"""
delay_scaling = self._delay_scaling(state)
shadowing = 10 ** (
-0.1
* self.__cluster_shadowing_log_variable.sample(
consistent_sample, 0.0, self._cluster_shadowing_std(state)
)[: self._num_clusters(state)]
)
# Equation 7.5-5 in TR 138.901 v17.0.0
powers = np.exp(-delays * (delay_scaling - 1) / (delay_scaling * delay_spread)) * shadowing
# In case of line of sight, add a specular component to the cluster delays
if state.line_of_sight:
# Equations 7.5-7 and 7.5-8 in TR 138.901 v17.0.0
linear_rice_factor = db2lin(rice_factor)
powers /= (1 + linear_rice_factor) * np.sum(powers.flat)
powers[0] += linear_rice_factor / (1 + linear_rice_factor)
else:
# Equation 7.5-6 in TR 138.901 v17.0.0
powers /= np.sum(powers.flat)
# Eliminate clusters with powers less than -25 dB with respect to the strongest cluster
cutoff_power = powers[0] * 10 ** (-25 / 10)
powers = powers[np.where(powers >= cutoff_power)]
return powers
def _ray_azimuth_angles(
self,
consistent_sample: ConsistentSample,
state: LSST,
cluster_powers: np.ndarray,
spread: float,
rice_factor: float,
los_azimuth: float,
direction: Literal["arrival", "departure"],
) -> np.ndarray:
"""Compute cluster ray azimuth angles of arrival or departure.
Args:
consistent_sample (ConsistentSample):
Sample of the consistent random process.
state (LSST):
Large scale state of the channel.
cluster_powers (np.ndarray):
Vector of cluster powers. The length determines the number of clusters.
spread (float):
Root-mean squared Azimuth spread in degrees.
Denoted by :math:`\\mathrm{ASD}` or :math:`\\mathrm{ASA}` in the standard.
rice_factor (float):
Rice factor in dB.
Denoted by :math:`K` in the standard.
los_azimuth (float):
Line of sight azimuth angle to the target in degrees.
direction (Literal["arrival", "departure"]):
Direction of the angles. Either "arrival" or "departure".
Returns:
np.ndarray:
Matrix of angles in degrees.
The first dimension indicates the cluster index, the second dimension the ray index.
"""
# Determine the closest scaling factor
scale_index = np.argmin(np.abs(self.__azimuth_scaling_factors[:, 0] - len(cluster_powers)))
angle_scale = self.__azimuth_scaling_factors[scale_index, 1]
# Scale the scale (hehe) in the line of sight case
if state.line_of_sight:
angle_scale *= (
1.1035 - 0.028 * rice_factor - 2e-3 * rice_factor**2 + 1e-4 * rice_factor**3
)
angles: np.ndarray = (
2 * spread / 1.4 / angle_scale * np.sqrt(-np.log(cluster_powers / cluster_powers.max()))
)
# Assign positive / negative integers and add some noise, Equation 7.5-11 in TR 138.901 v17.0.0
angle_variation = (spread / 7) * self.__azimuth_angle_variation_variable.sample(
consistent_sample
)
spread_angles = (
self.__azimuth_spread_sign[: cluster_powers.size] * angles
+ angle_variation[: cluster_powers.size]
)
# Add the actual line of sight term
if state.line_of_sight:
# The first angle within the list is exactly the line of sight component
spread_angles += los_azimuth - spread_angles[0]
else:
spread_angles += los_azimuth
# Spread the angles
cluster_spread = (
self._cluster_aoa_spread(state)
if direction == "arrival"
else self._cluster_aod_spread(state)
)
ray_offsets = cluster_spread * self._ray_offset_angles
ray_angles = np.tile(spread_angles[:, None], len(ray_offsets)) + ray_offsets
return ray_angles
def _ray_zoa(
self,
consistent_sample: ConsistentSample,
state: LSST,
cluster_powers: np.ndarray,
spread: float,
rice_factor: float,
los_zenith: float,
) -> np.ndarray:
"""Compute cluster ray zenith angles of arrival.
Args:
consistent_sample (ConsistentSample):
Sample of the consistent random process.
state (LSST):
Large scale state of the channel.
cluster_powers (np.ndarray):
Vector of cluster powers. The length determines the number of clusters.
spread (float):
Zenith of arrival angular spread in degrees.
Denoted by :math:`\\mathrm{ZSA}` in the standard.
rice_factor (float):
Rice factor in dB.
Denoted by :math:`K` in the standard.
los_zenith (float):
Line of sight zenith angle to the target in degrees.
Returns:
np.ndarray:
Matrix of angles in degrees.
The first dimension indicates the cluster index, the second dimension the ray index.
"""
# Equation 7.5-15 of TR 138.901 v17.0.0
scaling_factor_index = np.argmin(
np.abs(self.__zenith_scaling_factors[:, 0] - cluster_powers.size)
)
zenith_scaling_factor = self.__zenith_scaling_factors[scaling_factor_index, 1]
if state.line_of_sight:
zenith_scaling_factor *= (
1.3086 + 0.0339 * rice_factor - 0.0077 * rice_factor**2 + 2e-4 * rice_factor**3
)
# Generate angle starting point
# Equation 7.5-14 of TR 138.901 v17.0.0
zenith_centroids: np.ndarray = (
-spread * np.log(cluster_powers / cluster_powers.max()) / zenith_scaling_factor
)
# Equation 7.5-16 of TR 138.901 v17.0.0
# ToDo: Treat the BST-UT case!!!! (los_zenith = 90°)
cluster_variation = self.__zenith_angle_variation_variable.sample(
consistent_sample, 0.0, (spread / 7)
)
cluster_zenith = (
self.__zenith_spread_sign[: cluster_powers.size] * zenith_centroids
+ cluster_variation[: cluster_powers.size]
)
# Equation 7.5-17 of TR 138.901 v17.0.0
if state == LOSState.LOS:
cluster_zenith += los_zenith - cluster_zenith[0]
else:
cluster_zenith += los_zenith
# Spread the angles
# Equation 7.5 -18 of TR 138.901 v17.0.0
ray_offsets = self._cluster_zoa_spread(state) * self._ray_offset_angles
ray_zenith = np.tile(cluster_zenith[:, None], len(ray_offsets)) + ray_offsets
return ray_zenith
def _ray_zod(
self,
consistent_sample: ConsistentSample,
state: LSST,
parameters: ClusterDelayLineSampleParameters,
cluster_powers: np.ndarray,
spread: float,
rice_factor: float,
los_zenith: float,
) -> np.ndarray:
"""Compute cluster ray zenith angles of departure.
Args:
consistent_sample (ConsistentSample):
Sample of the consistent random process.
state (LSST):
Large scale state of the channel.
cluster_powers (np.ndarray):
Vector of cluster powers. The length determines the number of clusters.
spread (float):
Zenith of departure angular spread in degrees.
Denoted by :math:`\\mathrm{ZSD}` in the standard.
rice_factor (float):
Rice factor in dB.
los_zenith (float):
Line of sight zenith angle to the target in degrees.
Returns:
np.ndarray:
Matrix of angles in degrees.
The first dimension indicates the cluster index, the second dimension the ray index.
"""
# Equation 7.5-15 of TR 138.901 v17.0.0
scaling_factor_index = np.argmin(
np.abs(self.__zenith_scaling_factors[:, 0] - cluster_powers.size)
)
zenith_scaling_factor = self.__zenith_scaling_factors[scaling_factor_index, 1]
if state.line_of_sight:
zenith_scaling_factor *= (
1.3086 + 0.0339 * rice_factor - 0.0077 * rice_factor**2 + 2e-4 * rice_factor**3
)
# Generate angle starting point
# Equation 7.5-14 of TR 138.901 v17.0.0
zenith_centroids: np.ndarray = (
-spread * np.log(cluster_powers / cluster_powers.max()) / zenith_scaling_factor
)
# Equation 7.5-19 of TR 138.901 v17.0.0
# ToDo: Treat the BST-UT case!!!! (los_zenith = 90°)
cluster_variation = self.__zenith_angle_variation_variable.sample(
consistent_sample, 0.0, (spread / 7)
)
cluster_zenith = (
self.__zenith_spread_sign[: cluster_powers.size] * zenith_centroids
+ cluster_variation[: cluster_powers.size]
+ self._zod_offset(
state, parameters
) # This line is the only difference to equation 7.5-16
)
# Equation 7.5-17 of TR 138.901 v17.0.0
if state == LOSState.LOS:
cluster_zenith += los_zenith - cluster_zenith[0]
else:
cluster_zenith += los_zenith
# Spread the angles
# Equation 7.5-20 of TR 138.901 v17.0.0
zod_spread_mean = self._zod_spread_mean(state, parameters)
ray_offsets = 3 / 8 * 10**zod_spread_mean * self._ray_offset_angles
ray_zenith = np.tile(cluster_zenith[:, None], len(ray_offsets)) + ray_offsets
return ray_zenith
def _sample(self, channel_state: LinkState) -> ClusterDelayLineSample:
# Compute the line of sight distance between the devices, i.e. base station to user terminal
distance_3d: float = np.linalg.norm(channel_state.transmitter.position - channel_state.receiver.position, 2) # type: ignore[assignment]
distance_2d: float = np.linalg.norm( # type: ignore[assignment]
channel_state.transmitter.position[:2] - channel_state.receiver.position[:2], 2
)
base_height: float = max(
channel_state.transmitter.position[2], channel_state.receiver.position[2]
)
terminal_height: float = min(
channel_state.transmitter.position[2], channel_state.receiver.position[2]
)
parameters = ClusterDelayLineSampleParameters(
channel_state.carrier_frequency, distance_3d, distance_2d, base_height, terminal_height
)
# Ensure transmiter and receiver are not located at identical positions
if distance_3d == 0.0:
raise RuntimeError(
"Identical device positions violate the far-field assumption of the 3GPP CDL channel model"
)
# Decide on the overall large-scale state of the channel
if self.expected_state is None:
state_variable_sample = float(
self.__state_variable.sample(
self.__state_realization.sample(
channel_state.transmitter.position, channel_state.receiver.position
)
)
)
state = self._sample_large_scale_state(state_variable_sample, parameters)
else:
state = self.expected_state
small_scale_realization = self._small_scale_realization(state)
parameters_sample = small_scale_realization.sample(
channel_state.transmitter.position, channel_state.receiver.position
)
# Sample large-scale parameters
DS, ASD, ASA, ZSA, ZSD, K, SF = self._sample_large_scale_parameters(state, parameters)
# Compute line of sight directions in local coordinates for both devices
tx_los_direction = channel_state.transmitter.pose.invert().transform_direction(
channel_state.receiver.position - channel_state.transmitter.position, True
)
rx_los_direction = channel_state.receiver.pose.invert().transform_direction(
channel_state.transmitter.position - channel_state.receiver.position, True
)
# Extract directive angles in spherical coordinates
tx_los_angles = tx_los_direction.to_spherical()
rx_los_angles = rx_los_direction.to_spherical()
raw_cluster_delays, cluster_delays, minimal_delay = self._cluster_delays(
parameters_sample, state, DS, K
)
cluster_powers = self._cluster_powers(parameters_sample, state, DS, raw_cluster_delays, K)
raw_cluster_delays = raw_cluster_delays[: cluster_powers.size]
cluster_delays = cluster_delays[: cluster_powers.size]
# Compute cluster angles
ray_aod = (
pi
/ 180
* self._ray_azimuth_angles(
parameters_sample,
state,
cluster_powers,
ASD,
K,
180 * tx_los_angles[0] / pi,
"departure",
)
)
ray_aoa = (
pi
/ 180
* self._ray_azimuth_angles(
parameters_sample,
state,
cluster_powers,
ASA,
K,
180 * rx_los_angles[0] / pi,
"arrival",
)
)
ray_zod = (
pi
/ 180
* self._ray_zod(
parameters_sample,
state,
parameters,
cluster_powers,
ZSD,
K,
180 * tx_los_angles[1] / pi,
)
)
ray_zoa = (
pi
/ 180
* self._ray_zoa(
parameters_sample, state, cluster_powers, ZSA, K, 180 * rx_los_angles[1] / pi
)
)
# Couple cluster angles randomly
# Step 8 in the small-scale parameter generation of ETSI TR 138.901 v17.0.0
# This is equivalent of shuffeling the cluster ray indices according to the coupling indices
# ToDo: Re-check if this is correct
shuffled_ray_aoa = np.take_along_axis(
ray_aoa, self.__angle_coupling_indices[0, : cluster_powers.size, ...], axis=1
)
shuffled_ray_zoa = np.take_along_axis(
ray_zoa, self.__angle_coupling_indices[1, : cluster_powers.size, ...], axis=1
)
shuffled_ray_aod = np.take_along_axis(
ray_aod, self.__angle_coupling_indices[2, : cluster_powers.size, ...], axis=1
)
shuffled_ray_zod = np.take_along_axis(
ray_zod, self.__angle_coupling_indices[3, : cluster_powers.size, ...], axis=1
)
# Generate cross-polarization power ratios (step 9)
# Equation 7.5-21 in TR 138.901 v17.0.0
cross_polarization_factor = 10 ** (
-0.05
* self.__xpr_variable.sample(
parameters_sample,
self._cross_polarization_power_mean(state),
self._cross_polarization_power_std(state),
)
)
# Draw initial random phases (step 10)
# A single 2x2 slice represents the jones matrix transforming the polarization of a single ray
polarization_transformations = np.exp(2j * pi * self.__xpr_phase.sample(parameters_sample))
polarization_transformations[0, 1, ::] *= cross_polarization_factor
polarization_transformations[1, 0, ::] *= cross_polarization_factor
# Compute a delay offset accounting for the time of flight over the line of sight
# Equations 7.6-43 and 7.6-44 in TR 138.901 v17.0.0
if self.__delay_normalization == DelayNormalization.TOF:
delay_offset = distance_3d / speed_of_light
else:
delay_offset = 0.0
# Step 12
# Model pathloss
# Table 7.4.1-1 in TR 138.901 v17.0.0
path_power_scale = db2lin(-self._pathloss_dB(state, parameters))
# Model oxygen power loss
# 7.6-1 in TR 138.901 v17.0.0
if self.__oxygen_absorption:
oxygen_loss_factor = np.interp(
channel_state.carrier_frequency * 1e-9,
self.__oxygen_loss[0, :],
self.__oxygen_loss[1, :],
)
oxygen_loss = 10 ** (
-1e-2
* oxygen_loss_factor
* (distance_3d + speed_of_light * (cluster_delays + minimal_delay))
)
else:
oxygen_loss = 1.0
# Compute the expected amplitude scale factor of the sample
expected_loss = self.gain * oxygen_loss
true_cluster_powers = cluster_powers * path_power_scale * expected_loss**2
return ClusterDelayLineSample(
state.line_of_sight,
K,
shuffled_ray_aoa,
shuffled_ray_zoa,
shuffled_ray_aod,
shuffled_ray_zod,
delay_offset,
cluster_delays,
DS,
true_cluster_powers,
polarization_transformations,
channel_state,
)
def _reciprocal_sample(
self, sample: ClusterDelayLineSample, state: LinkState
) -> ClusterDelayLineSample:
# Check if an optimized resampling is possible, otherwise sample the realization again
# Resample if the sample parameters don't match
if (
state.bandwidth != sample.bandwidth
or state.carrier_frequency != sample.carrier_frequency
):
return self._sample(state)
if np.any(state.transmitter.position != sample.receiver_pose.translation) or np.any(
state.receiver.position != sample.transmitter_pose.translation
):
return self._sample(state)
return sample.reciprocal(state)
[docs]
def to_HDF(self, group: Group) -> None:
self.__state_realization.to_HDF(HDFSerializable._create_group(group, "state_realization"))
group.attrs["gain"] = self.gain
group.attrs["delay_normalization"] = self.__delay_normalization.value
group.attrs["oxygen_absorption"] = self.__oxygen_absorption
group.create_dataset("large_scale_parameters", data=self.__large_scale_parameters)
group.create_dataset("zenith_spread_sign", data=self.__zenith_spread_sign)
group.create_dataset("azimuth_spread_sign", data=self.__azimuth_spread_sign)
group.create_dataset("angle_coupling_indices", data=self.__angle_coupling_indices)
CDLRT = TypeVar("CDLRT", bound=ClusterDelayLineRealization)
"""Type of a 3GPP Cluster Delay Line channel realization."""
[docs]
class ClusterDelayLineBase(Channel[CDLRT, ClusterDelayLineSample], Generic[CDLRT, LSST]):
"""Base class for all 3GPP Cluster Delay Line channel models."""
__delay_normalization: DelayNormalization
__oxygen_absorption: bool
__expected_state: LSST | None
def __init__(
self,
gain: float = 1.0,
delay_normalization: DelayNormalization = DelayNormalization.ZERO,
oxygen_absorption: bool = True,
expected_state: LSST | None = None,
**kwargs,
) -> None:
"""
Args:
gain (float, optional):
Linear gain factor a signal amplitude experiences when being propagated over this realization.
:math:`1.0` by default.
delay_normalization (DelayNormalization, optional):
The delay normalization routine applied during channel sampling.
oxygen_absorption (bool, optional):
Model oxygen absorption in the channel.
Enabled by default.
expected_state (LSST, optional):
Expected large-scale state of the channel.
If `None`, the state is randomly generated during each sample of the channel's realization.
\**kwargs:
Additional keyword arguments passed to the base class.
"""
# Initialize base class
Channel.__init__(self, gain, **kwargs)
# Initialize class attributes
self.delay_normalization = delay_normalization
self.oxygen_absorption = oxygen_absorption
self.expected_state = expected_state
# Generate dual consistent random variables
self.__state_generator = ConsistentGenerator(self)
self.__parameter_generator = ConsistentGenerator(self)
# Configure the random variable distributions
self.__state_variable = self.__state_generator.uniform()
num_clusters = self.max_num_clusters
num_rays = self.max_num_rays
self.__cluster_shadowing_log_variable = self.__parameter_generator.gaussian((num_clusters,))
self.__cluster_delays_variable = self.__parameter_generator.uniform((num_clusters,))
self.__azimuth_variation_variable = self.__parameter_generator.gaussian((num_clusters,))
self.__zenith_variation_variable = self.__parameter_generator.gaussian((num_clusters,))
self.__cross_polarization_power_variable = self.__parameter_generator.gaussian(
(num_clusters, num_rays)
)
self.__cross_polarization_phase_variable = self.__parameter_generator.uniform(
(2, 2, num_clusters, num_rays)
)
@property
def delay_normalization(self) -> DelayNormalization:
"""The delay normalization routine applied during channel sampling."""
return self.__delay_normalization
@delay_normalization.setter
def delay_normalization(self, normalization: DelayNormalization) -> None:
self.__delay_normalization = normalization
@property
def oxygen_absorption(self) -> bool:
"""Model oxygen absorption in the channel."""
return self.__oxygen_absorption
@oxygen_absorption.setter
def oxygen_absorption(self, absorption: bool) -> None:
self.__oxygen_absorption = absorption
@property
def expected_state(self) -> LSST | None:
"""Expected large-scale state of the channel.
If `None`, the state is randomly generated during each sample of the channel's realization.
"""
return self.__expected_state
@expected_state.setter
def expected_state(self, state: LSST | None) -> None:
self.__expected_state = state
@property
@abstractmethod
def max_num_clusters(self) -> int:
"""Maximum number of clusters a realization will generate."""
... # pragma: no cover
@property
@abstractmethod
def max_num_rays(self) -> int:
"""Maximum number of rays a realization will generate per cluster."""
... # pragma: no cover
@property
@abstractmethod
def _large_scale_correlations(self) -> np.ndarray:
"""Correlation factors between the large-scale parameters."""
... # pragma: no cover
@abstractmethod
def _initialize_realization(
self,
state_generator: ConsistentGenerator,
parameter_generator: ConsistentGenerator,
parameters: ClusterDelayLineRealizationParameters,
) -> CDLRT:
"""Initialize a new realization of the channel model."""
... # pragma: no cover
@cache
def __C_MM(self) -> np.ndarray:
"""Subroutine to compute the cross-correlation matrix of the large-scale parameters.
Args:
state (int): Large scale state index of the channel.
Returns: Lower-triangular root of the cross-correlation matrix.
"""
C_MM = np.empty((3, 7, 7), dtype=np.float_)
for c, C in enumerate(self._large_scale_correlations):
C_MM_squared = np.array(
[
# DS, ASD, ASA, ZSA, ZSD, K, SF
[1.0, C[0], C[1], C[15], C[14], C[8], C[4]], # DS
[C[0], 1.0, C[5], C[17], C[16], C[6], C[3]], # ASD
[C[1], C[5], 1.0, C[19], C[18], C[7], C[2]], # ASA
[C[15], C[17], C[19], 1.0, C[20], C[13], C[11]], # ZSA
[C[14], C[16], C[18], C[20], 1.0, C[12], C[10]], # ZSD
[C[8], C[6], C[7], C[13], C[12], 1.0, C[9]], # K
[C[4], C[3], C[2], C[11], C[10], C[9], 1.0], # SF
],
dtype=np.float_,
)
# Section 4 of ETSI TR 138.901 v17.0.0 hints at using the cholesky decomposition
# to enforce the expected cross-correlations between the large-scale parameters
C_MM[c, ...] = np.linalg.cholesky(C_MM_squared)
return C_MM
def _realize(self) -> CDLRT:
# Realize static random variables
azimuth_spread_sign = self._rng.choice([-1.0, 1.0], size=(self.max_num_clusters,))
zenith_spread_sign = self._rng.choice([-1.0, 1.0], size=(self.max_num_clusters,))
# Random coupling indices for step 8 in ETSI TR 138 901 v17.0.0
angle_candidate_indices = np.arange(self.max_num_rays)
angle_coupling_indices = np.array(
[
[
self._rng.permutation(angle_candidate_indices)
for _ in range(self.max_num_clusters)
]
for _ in range(4)
]
)
# Generate standard normal realizations of the large-scale parameters
LSPs = self.__C_MM() @ self._rng.standard_normal(7)
parameters = ClusterDelayLineRealizationParameters(
self.__state_variable,
self.delay_normalization,
self.oxygen_absorption,
LSPs,
self.__cluster_shadowing_log_variable,
self.__cluster_delays_variable,
self.__azimuth_variation_variable,
azimuth_spread_sign,
self.__zenith_variation_variable,
zenith_spread_sign,
angle_coupling_indices,
self.__cross_polarization_power_variable,
self.__cross_polarization_phase_variable,
)
return self._initialize_realization(
self.__state_generator, self.__parameter_generator, parameters
)
@abstractmethod
def _recall_specific_realization(
self, group: Group, parameters: ClusterDelayLineRealizationParameters
) -> CDLRT: ... # pragma: no cover
[docs]
def recall_realization(self, group: Group) -> CDLRT:
# Recall the static random variables
LSPs = np.array(group["large_scale_parameters"], dtype=np.float_)
azimuth_spread_sign = np.array(group["azimuth_spread_sign"], dtype=np.float_)
zenith_spread_sign = np.array(group["zenith_spread_sign"], dtype=np.float_)
angle_coupling_indices = np.array(group["angle_coupling_indices"], dtype=np.int_)
parameters = ClusterDelayLineRealizationParameters(
self.__state_variable,
self.delay_normalization,
self.oxygen_absorption,
LSPs,
self.__cluster_shadowing_log_variable,
self.__cluster_delays_variable,
self.__azimuth_variation_variable,
azimuth_spread_sign,
self.__zenith_variation_variable,
zenith_spread_sign,
angle_coupling_indices,
self.__cross_polarization_power_variable,
self.__cross_polarization_phase_variable,
)
return self._recall_specific_realization(group, parameters)