# -*- coding: utf-8 -*-
from __future__ import annotations
from math import exp, log, log10
from typing import Mapping, Set, Tuple
from typing_extensions import override
import numpy as np
from hermespy.core import SerializableEnum, SerializationProcess, DeserializationProcess
from .cluster_delay_lines import (
ClusterDelayLineBase,
ClusterDelayLineRealizationParameters,
ClusterDelayLineSample,
ClusterDelayLineSampleParameters,
ClusterDelayLineRealization,
DelayNormalization,
LOSState,
)
from ..channel import ChannelSampleHook
from ..consistent import ConsistentGenerator, ConsistentRealization
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class FactoryType(SerializableEnum):
"""Type of indoor factory.
Defined in TR 138.901 v17.0.0 Table 7.2-4.
"""
SL = 0, 10.0, 0.20
SH = 1, 10.0, 0.20
DL = 2, 02.0, 0.60
DH = 3, 02.0, 0.60
HH = 4, 05.0, 0.00
def __new__(cls, *args, **kwargs):
obj = object.__new__(cls)
obj._value_ = args[0]
return obj
def __init__(self, _: int, clutter_size: float, clutter_density: float) -> None:
self.__clutter_size = clutter_size
self.__clutter_density = clutter_density
@property
def clutter_size(self) -> float:
return self.__clutter_size
@property
def clutter_density(self) -> float:
return self.__clutter_density
[docs]
class IndoorFactoryRealization(ClusterDelayLineRealization[LOSState]):
"""Realization of the indoor factory channel model."""
__los_realization: ConsistentRealization
__nlos_realization: ConsistentRealization
__volume: float
__surface: float
__factory_type: FactoryType
__clutter_height: float
def __init__(
self,
expected_state: LOSState | None,
state_realization: ConsistentRealization,
los_realization: ConsistentRealization,
nlos_realization: ConsistentRealization,
parameters: ClusterDelayLineRealizationParameters,
volume: float,
surface: float,
factory_type: FactoryType,
clutter_height: float,
sample_hooks: Set[ChannelSampleHook[ClusterDelayLineSample]],
gain: float = 1.0,
) -> None:
"""
Args:
expected_state:
Expected large-scale state of the channel.
If not specified, the large-scale state is randomly generated.
state_realization:
Realization of a spatially consistent random number generator for the large-scale state.
los_realization:
Realization of a spatially consistent random number generator for small-scale parameters in the LOS state.
nlos_realization:
Realization of a spatially consistent random number generator for small-scale parameters in the NLOS state.
parameters:
General parameters of the cluster delay line realization.
volume:
Volume of the modeled factory hall in :math:`\\mathrm{m}^3`.
surface:
Surface area of the modeled factory hall in :math:`\\mathrm{m}^2`.
factory_type:
Type of the factory.
clutter_height:
Height of the clutter in the factory hall in meters above the floor.
gain:
Linear amplitude scaling factor if signals propagated over the channel.
"""
# Initialize base class
ClusterDelayLineRealization.__init__(
self, expected_state, state_realization, parameters, sample_hooks, gain
)
# Initialize class attributes
self.__los_realization = los_realization
self.__nlos_realization = nlos_realization
self.__volume = volume
self.__surface = surface
self.__factory_type = factory_type
self.__clutter_height = clutter_height
# Table 7.4.4-1 in TR 138.901 v17.0.0
def _pathloss_dB(self, state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float:
PL_LOS = (
31.84
+ 21.5 * log10(parameters.distance_3d)
+ 19 * log10(parameters.carrier_frequency / 1e9)
)
if state == LOSState.LOS:
return PL_LOS
if self.__factory_type == FactoryType.SL:
PL_NLOS = (
33
+ 25.5 * log10(parameters.distance_3d)
+ 20 * log10(parameters.carrier_frequency / 1e9)
)
elif self.__factory_type == FactoryType.DL:
PL_NLOS = (
18.6
+ 35.7 * log10(parameters.distance_3d)
+ 20 * log10(parameters.carrier_frequency / 1e9)
)
elif self.__factory_type == FactoryType.SH:
PL_NLOS = (
32.4
+ 23.0 * log10(parameters.distance_3d)
+ 20 * log10(parameters.carrier_frequency / 1e9)
)
else: # FactoryType.DH
PL_NLOS = (
33.63
+ 21.9 * log10(parameters.distance_3d)
+ 20 * log10(parameters.carrier_frequency / 1e9)
)
return max(PL_LOS, PL_NLOS)
def _small_scale_realization(self, state: LOSState) -> ConsistentRealization:
if state == LOSState.LOS:
return self.__los_realization
else:
return self.__nlos_realization
def _sample_large_scale_state(
self, state_variable_sample: float, parameters: ClusterDelayLineSampleParameters
) -> LOSState:
# Implementation of TR 138.901 v17.0.0 Table 7.4.2-1
if self.__factory_type == FactoryType.HH:
return LOSState.LOS
if self.__factory_type == FactoryType.SL or self.__factory_type == FactoryType.DL:
k_subspace = -self.__factory_type.clutter_size / log(
1 - self.__factory_type.clutter_density
)
else:
k_subspace = (
-self.__factory_type.clutter_size
* (parameters.base_height - parameters.terminal_height)
/ (
log(1 - self.__factory_type.clutter_density)
* (self.__clutter_height - parameters.terminal_height)
)
)
los_probability = exp(-parameters.distance_2d / k_subspace) if k_subspace > 0.0 else 0.0
return LOSState.LOS if state_variable_sample < los_probability else LOSState.NLOS
@staticmethod
def __parameter_dependency(carrier_frequency: float, factor: float, summand: float) -> float:
"""An implementation of the frequently used equation
.. math::
y = a \\log_{10}(1 + f_c) + b
Args:
carrier_frequency: Carrier frequency
factor: Factor scaling the logarithmic frequency dependency.
summand: Added constant.
Returns: The result.
"""
# Note that the standard does not lower-bound the frequency for the indoor-office scenario
# This might be an error!!!!
fc = carrier_frequency * 1e-9
return factor * log10(1 + fc) + summand
# Parameters for computing the mean delay spread
# TR 138.901 v17.0.0 Table 7.5-6
__delay_spread_mean: Mapping[LOSState, Tuple[float, float, float]] = {
LOSState.LOS: (26, 14, -9.35),
LOSState.NLOS: (30, 32, -9.44),
}
def _delay_spread_mean(self, state: LOSState, carrier_frequency: float) -> float:
parameters = IndoorFactoryRealization.__delay_spread_mean[state]
return log10(parameters[0] * self.__volume / self.__surface + parameters[1]) + parameters[2]
# Parameters for computing the standard deviation of the delay spread
# TR 138.901 v17.0.0 Table 7.5-6
__delay_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.15, LOSState.NLOS: 0.19}
@staticmethod
def _delay_spread_std(state: LOSState, carrier_frequency: float) -> float:
return IndoorFactoryRealization.__delay_spread_std[state]
# Parameters for computing the mean angle of departure spread
# TR 138.901 v17.0.0 Table 7.5-6
__aod_spread_mean: Mapping[LOSState, float] = {LOSState.LOS: 1.56, LOSState.NLOS: 1.57}
@staticmethod
def _aod_spread_mean(state: LOSState, carrier_frequency: float) -> float:
return IndoorFactoryRealization.__aod_spread_mean[state]
# Parameters for computing the standard deviation of the angle of departure spread
# TR 138.901 v17.0.0 Table 7.5-6
__aod_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.25, LOSState.NLOS: 0.2}
@staticmethod
def _aod_spread_std(state: LOSState, carrier_frequency: float) -> float:
return IndoorFactoryRealization.__aod_spread_std[state]
# Parameters for computing the mean angle of arrival spread
# TR 138.901 v17.0.0 Table 7.5-6
__aoa_spread_mean: Mapping[LOSState, Tuple[float, float]] = {
LOSState.LOS: (-0.18, 1.78),
LOSState.NLOS: (0.0, 1.72),
}
@staticmethod
def _aoa_spread_mean(state: LOSState, carrier_frequency: float) -> float:
mean_parameters = IndoorFactoryRealization.__aoa_spread_mean[state]
return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *mean_parameters)
# Parameters for computing the standard deviation of the angle of arrival spread
# TR 138.901 v17.0.0 Table 7.5-6
__aoa_spread_std: Mapping[LOSState, Tuple[float, float]] = {
LOSState.LOS: (0.12, 0.2),
LOSState.NLOS: (0.0, 0.3),
}
@staticmethod
def _aoa_spread_std(state: LOSState, carrier_frequency: float) -> float:
std_parameters = IndoorFactoryRealization.__aoa_spread_std[state]
return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *std_parameters)
# Parameters for computing the mean zenith of arrival spread
# TR 138.901 v17.0.0 Table 7.5-6
__zoa_spread_mean: Mapping[LOSState, Tuple[float, float]] = {
LOSState.LOS: (-0.2, 1.5),
LOSState.NLOS: (-0.13, 1.45),
}
@staticmethod
def _zoa_spread_mean(state: LOSState, carrier_frequency: float) -> float:
mean_parameters = IndoorFactoryRealization.__zoa_spread_mean[state]
return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *mean_parameters)
# Parameters for computing the standard deviation of the zenith of arrival spread
# TR 138.901 v17.0.0 Table 7.5-6
__zoa_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.35, LOSState.NLOS: 0.45}
@staticmethod
def _zoa_spread_std(state: LOSState, carrier_frequency: float) -> float:
return IndoorFactoryRealization.__zoa_spread_std[state]
@staticmethod
def _rice_factor_mean() -> float:
return 7.0
@staticmethod
def _rice_factor_std() -> float:
return 8.0
# Delay scaling factors for different LOS states
# TR 138.901 v17.0.0 Table 7.5-6
__delay_scaling: Mapping[LOSState, float] = {LOSState.LOS: 2.7, LOSState.NLOS: 3.0}
@staticmethod
def _delay_scaling(state: LOSState) -> float:
return IndoorFactoryRealization.__delay_scaling[state]
# Mean cross-polarization power ratio for different LOS states
# TR 138.901 v17.0.0 Table 7.5-6
__cross_polarization_power_mean: Mapping[LOSState, float] = {
LOSState.LOS: 12.0,
LOSState.NLOS: 11.0,
}
@staticmethod
def _cross_polarization_power_mean(state: LOSState) -> float:
return IndoorFactoryRealization.__cross_polarization_power_mean[state]
@staticmethod
def _cross_polarization_power_std(state: LOSState) -> float:
# TR 138.901 v17.0.0 Table 7.5-6
return 6.0
@staticmethod
def _num_clusters(state: LOSState) -> int:
return 25
@staticmethod
def _cluster_delay_spread(state: LOSState, carrier_frequency: float) -> float:
return 0.0 # pragma: no cover
@staticmethod
def _cluster_aod_spread(state: LOSState) -> float:
return 5.0
@staticmethod
def _cluster_aoa_spread(state: LOSState) -> float:
return 8.0
@staticmethod
def _cluster_zoa_spread(state: LOSState) -> float:
return 9.0
# Standard deviation of the shadowing for different LOS states in dB
# TR 138.901 v17.0.0 Table 7.5-6
__cluster_shadowing_std: Mapping[LOSState, float] = {LOSState.LOS: 4.0, LOSState.NLOS: 3.0}
@staticmethod
def _cluster_shadowing_std(state: LOSState) -> float:
return IndoorFactoryRealization.__cluster_shadowing_std[state]
# Mean zenith of departure spread for different LOS states
# TR 138.901 v17.0.0 Table 7.5-11
__zod_spread_mean: Mapping[LOSState, float] = {LOSState.LOS: 1.35, LOSState.NLOS: 1.2}
@staticmethod
def _zod_spread_mean(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float:
return IndoorFactoryRealization.__zod_spread_mean[state]
# Standard deviation of the zenith of departure spread for different LOS states
# TR 138.901 v17.0.0 Table 7.5-11
__zod_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.35, LOSState.NLOS: 0.5}
@staticmethod
def _zod_spread_std(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float:
return IndoorFactoryRealization.__zod_spread_std[state]
@staticmethod
def _zod_offset(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float:
# TR 138.901 v17.0.0 Table 7.5-11
return 0.0
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
ClusterDelayLineRealization.serialize(self, process)
process.serialize_object(self.__los_realization, "los_realization")
process.serialize_object(self.__nlos_realization, "nlos_realization")
process.serialize_floating(self.__volume, "volume")
process.serialize_floating(self.__surface, "surface")
process.serialize_object(self.__factory_type, "factory_type")
process.serialize_floating(self.__clutter_height, "clutter_height")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> IndoorFactoryRealization:
return IndoorFactoryRealization(
expected_state=process.deserialize_object("expected_state", LOSState, None),
los_realization=process.deserialize_object("los_realization", ConsistentRealization),
nlos_realization=process.deserialize_object("nlos_realization", ConsistentRealization),
volume=process.deserialize_floating("volume"),
surface=process.deserialize_floating("surface"),
factory_type=process.deserialize_object("factory_type", FactoryType),
clutter_height=process.deserialize_floating("clutter_height"),
sample_hooks=set(),
**ClusterDelayLineRealization._DeserializeParameters(process), # type: ignore[arg-type]
)
[docs]
class IndoorFactory(ClusterDelayLineBase[IndoorFactoryRealization, LOSState]):
"""3GPP cluster delay line preset modeling an indoor factory scenario."""
__DEFAULT_CLUTTER_HEIGHT = 0.0
__volume: float # Hall volume in m3
__surface: float # Total surface hall area in m2 (walls/floor/ceiling)
__factory_type: FactoryType
__clutter_height: float
def __init__(
self,
volume: float,
surface: float,
factory_type: FactoryType,
clutter_height: float = __DEFAULT_CLUTTER_HEIGHT,
delay_normalization: DelayNormalization = ClusterDelayLineBase._DEFAULT_DELAY_NORMALIZATION,
oxygen_absorption: bool = ClusterDelayLineBase._DEFAULT_OXYGEN_ABSORPTION,
expected_state: LOSState | None = None,
gain: float = ClusterDelayLineBase._DEFAULT_GAIN,
seed: int | None = None,
) -> None:
"""
Args:
volume:
Hall volume in :math:`\\mathrm{m}^3`.
surfac:
Total surface hall area in :math:`\\mathrm{m}^2`. (walls/floor/ceiling).
factory_type:
Type of the factory.
clutter_height:
Height of the clutter in the factory hall in meters above the floor.
Zero by default, meaning virtually no clutter.
gain:
Linear channel energy gain factor.
Initializes the :meth:`gain<hermespy.channel.channel.Channel.gain>` property.
:math:`1.0` by default.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base class
ClusterDelayLineBase.__init__(
self, delay_normalization, oxygen_absorption, expected_state, gain, seed
)
# Initialize class attributes
self.volume = volume
self.surface = surface
self.factory_type = factory_type
self.clutter_height = clutter_height
@property
def volume(self) -> float:
"""Assumed factory hall volume in :math:`\\mathrm{m}^3`.
Raises:
ValueError: For values smaller or equal to zero.
"""
return self.__volume
@volume.setter
def volume(self, value: float) -> None:
if value <= 0.0:
raise ValueError("Hall volume must be greater than zero")
self.__volume = value
@property
def surface(self) -> float:
"""Assumed factory hall surface in :math:`\\mathrm{m}^2`.
Raises:
ValueError: For values smaller or equal to zero.
"""
return self.__surface
@surface.setter
def surface(self, value: float) -> None:
if value <= 0.0:
raise ValueError("Hall surface area must be greater than zero")
self.__surface = value
@property
def factory_type(self) -> FactoryType:
"""Assumed type of factory."""
return self.__factory_type
@factory_type.setter
def factory_type(self, value: FactoryType) -> None:
self.__factory_type = value
@property
def clutter_height(self) -> float:
"""Cluter height in m.
Denoted by :math:`h_c` within the respective equations.
Should be lower than ceiling height and in between zero and 10m.
"""
return self.__clutter_height
@clutter_height.setter
def clutter_height(self, value: float) -> None:
if value < 0.0 or value > 10.0:
raise ValueError("Clutter height should be in the interval 0-10m")
self.__clutter_height = value
@property
def max_num_clusters(self) -> int:
return 25
@property
def max_num_rays(self) -> int:
return 20
@property
def _large_scale_correlations(self) -> np.ndarray:
# Large scale cross correlations
# TR 138.901 v17.0.0 Table 7.5-6
return np.array(
[
# LOS NLOS
[+0.00, +0.00], # 0: ASD vs DS
[+0.00, +0.00], # 1: ASA vs DS
[+0.00, +0.00], # 2: ASA VS SF
[+0.00, +0.00], # 3: ASD vs SF
[+0.00, +0.00], # 4: DS vs SF
[+0.00, +0.00], # 5: ASD vs ASA
[-0.50, +0.00], # 6: ASD vs K
[+0.00, +0.00], # 7: ASA vs K
[-0.70, +0.00], # 8: DS vs K
[+0.00, +0.00], # 9: SF vs K
[+0.00, +0.00], # 10: ZSD vs SF
[+0.00, +0.00], # 11: ZSA vs SF
[+0.00, +0.00], # 12: ZSD vs K
[+0.00, +0.00], # 13: ZSA vs K
[+0.00, +0.00], # 14: ZSD vs DS
[+0.00, +0.00], # 15: ZSA vs DS
[+0.00, +0.00], # 16: ZSD vs ASD
[+0.00, +0.00], # 17: ZSA vs ASD
[+0.00, +0.00], # 18: ZSD vs ASA
[+0.00, +0.00], # 19: ZSA vs ASA
[+0.00, +0.00], # 20: ZSD vs ZSA
],
dtype=np.float64,
).T
def _initialize_realization(
self,
state_generator: ConsistentGenerator,
parameter_generator: ConsistentGenerator,
parameters: ClusterDelayLineRealizationParameters,
) -> IndoorFactoryRealization:
# Generate realizations for each large scale state
# TR 138.901 v17.0.0 Table 7.6.3.1-2
state_realization = state_generator.realize(0.5 * self.factory_type.clutter_size)
los_realization = parameter_generator.realize(10.0)
nlos_realization = parameter_generator.realize(10.0)
return IndoorFactoryRealization(
self.expected_state,
state_realization,
los_realization,
nlos_realization,
parameters,
self.volume,
self.surface,
self.factory_type,
self.clutter_height,
self.sample_hooks,
self.gain,
)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
ClusterDelayLineBase.serialize(self, process)
process.serialize_object(self.factory_type, "factory_type")
process.serialize_floating(self.volume, "volume")
process.serialize_floating(self.surface, "surface")
process.serialize_floating(self.clutter_height, "clutter_height")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> IndoorFactory:
return IndoorFactory(
process.deserialize_floating("volume"),
process.deserialize_floating("surface"),
process.deserialize_object("factory_type", FactoryType),
process.deserialize_floating("clutter_height", cls.__DEFAULT_CLUTTER_HEIGHT),
**ClusterDelayLineBase._DeserializeParameters(process), # type: ignore[arg-type]
)