Source code for hermespy.channel.cdl.indoor_factory

# -*- coding: utf-8 -*-

from __future__ import annotations
from math import exp, log, log10
from typing import Mapping, Set, Tuple
from typing_extensions import override

import numpy as np

from hermespy.core import SerializableEnum, SerializationProcess, DeserializationProcess
from .cluster_delay_lines import (
    ClusterDelayLineBase,
    ClusterDelayLineRealizationParameters,
    ClusterDelayLineSample,
    ClusterDelayLineSampleParameters,
    ClusterDelayLineRealization,
    DelayNormalization,
    LOSState,
)
from ..channel import ChannelSampleHook
from ..consistent import ConsistentGenerator, ConsistentRealization

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class FactoryType(SerializableEnum): """Type of indoor factory. Defined in TR 138.901 v17.0.0 Table 7.2-4. """ SL = 0, 10.0, 0.20 SH = 1, 10.0, 0.20 DL = 2, 02.0, 0.60 DH = 3, 02.0, 0.60 HH = 4, 05.0, 0.00 def __new__(cls, *args, **kwargs): obj = object.__new__(cls) obj._value_ = args[0] return obj def __init__(self, _: int, clutter_size: float, clutter_density: float) -> None: self.__clutter_size = clutter_size self.__clutter_density = clutter_density @property def clutter_size(self) -> float: return self.__clutter_size @property def clutter_density(self) -> float: return self.__clutter_density
[docs] class IndoorFactoryRealization(ClusterDelayLineRealization[LOSState]): """Realization of the indoor factory channel model.""" __los_realization: ConsistentRealization __nlos_realization: ConsistentRealization __volume: float __surface: float __factory_type: FactoryType __clutter_height: float def __init__( self, expected_state: LOSState | None, state_realization: ConsistentRealization, los_realization: ConsistentRealization, nlos_realization: ConsistentRealization, parameters: ClusterDelayLineRealizationParameters, volume: float, surface: float, factory_type: FactoryType, clutter_height: float, sample_hooks: Set[ChannelSampleHook[ClusterDelayLineSample]], gain: float = 1.0, ) -> None: """ Args: expected_state: Expected large-scale state of the channel. If not specified, the large-scale state is randomly generated. state_realization: Realization of a spatially consistent random number generator for the large-scale state. los_realization: Realization of a spatially consistent random number generator for small-scale parameters in the LOS state. nlos_realization: Realization of a spatially consistent random number generator for small-scale parameters in the NLOS state. parameters: General parameters of the cluster delay line realization. volume: Volume of the modeled factory hall in :math:`\\mathrm{m}^3`. surface: Surface area of the modeled factory hall in :math:`\\mathrm{m}^2`. factory_type: Type of the factory. clutter_height: Height of the clutter in the factory hall in meters above the floor. gain: Linear amplitude scaling factor if signals propagated over the channel. """ # Initialize base class ClusterDelayLineRealization.__init__( self, expected_state, state_realization, parameters, sample_hooks, gain ) # Initialize class attributes self.__los_realization = los_realization self.__nlos_realization = nlos_realization self.__volume = volume self.__surface = surface self.__factory_type = factory_type self.__clutter_height = clutter_height # Table 7.4.4-1 in TR 138.901 v17.0.0 def _pathloss_dB(self, state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float: PL_LOS = ( 31.84 + 21.5 * log10(parameters.distance_3d) + 19 * log10(parameters.carrier_frequency / 1e9) ) if state == LOSState.LOS: return PL_LOS if self.__factory_type == FactoryType.SL: PL_NLOS = ( 33 + 25.5 * log10(parameters.distance_3d) + 20 * log10(parameters.carrier_frequency / 1e9) ) elif self.__factory_type == FactoryType.DL: PL_NLOS = ( 18.6 + 35.7 * log10(parameters.distance_3d) + 20 * log10(parameters.carrier_frequency / 1e9) ) elif self.__factory_type == FactoryType.SH: PL_NLOS = ( 32.4 + 23.0 * log10(parameters.distance_3d) + 20 * log10(parameters.carrier_frequency / 1e9) ) else: # FactoryType.DH PL_NLOS = ( 33.63 + 21.9 * log10(parameters.distance_3d) + 20 * log10(parameters.carrier_frequency / 1e9) ) return max(PL_LOS, PL_NLOS) def _small_scale_realization(self, state: LOSState) -> ConsistentRealization: if state == LOSState.LOS: return self.__los_realization else: return self.__nlos_realization def _sample_large_scale_state( self, state_variable_sample: float, parameters: ClusterDelayLineSampleParameters ) -> LOSState: # Implementation of TR 138.901 v17.0.0 Table 7.4.2-1 if self.__factory_type == FactoryType.HH: return LOSState.LOS if self.__factory_type == FactoryType.SL or self.__factory_type == FactoryType.DL: k_subspace = -self.__factory_type.clutter_size / log( 1 - self.__factory_type.clutter_density ) else: k_subspace = ( -self.__factory_type.clutter_size * (parameters.base_height - parameters.terminal_height) / ( log(1 - self.__factory_type.clutter_density) * (self.__clutter_height - parameters.terminal_height) ) ) los_probability = exp(-parameters.distance_2d / k_subspace) if k_subspace > 0.0 else 0.0 return LOSState.LOS if state_variable_sample < los_probability else LOSState.NLOS @staticmethod def __parameter_dependency(carrier_frequency: float, factor: float, summand: float) -> float: """An implementation of the frequently used equation .. math:: y = a \\log_{10}(1 + f_c) + b Args: carrier_frequency: Carrier frequency factor: Factor scaling the logarithmic frequency dependency. summand: Added constant. Returns: The result. """ # Note that the standard does not lower-bound the frequency for the indoor-office scenario # This might be an error!!!! fc = carrier_frequency * 1e-9 return factor * log10(1 + fc) + summand # Parameters for computing the mean delay spread # TR 138.901 v17.0.0 Table 7.5-6 __delay_spread_mean: Mapping[LOSState, Tuple[float, float, float]] = { LOSState.LOS: (26, 14, -9.35), LOSState.NLOS: (30, 32, -9.44), } def _delay_spread_mean(self, state: LOSState, carrier_frequency: float) -> float: parameters = IndoorFactoryRealization.__delay_spread_mean[state] return log10(parameters[0] * self.__volume / self.__surface + parameters[1]) + parameters[2] # Parameters for computing the standard deviation of the delay spread # TR 138.901 v17.0.0 Table 7.5-6 __delay_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.15, LOSState.NLOS: 0.19} @staticmethod def _delay_spread_std(state: LOSState, carrier_frequency: float) -> float: return IndoorFactoryRealization.__delay_spread_std[state] # Parameters for computing the mean angle of departure spread # TR 138.901 v17.0.0 Table 7.5-6 __aod_spread_mean: Mapping[LOSState, float] = {LOSState.LOS: 1.56, LOSState.NLOS: 1.57} @staticmethod def _aod_spread_mean(state: LOSState, carrier_frequency: float) -> float: return IndoorFactoryRealization.__aod_spread_mean[state] # Parameters for computing the standard deviation of the angle of departure spread # TR 138.901 v17.0.0 Table 7.5-6 __aod_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.25, LOSState.NLOS: 0.2} @staticmethod def _aod_spread_std(state: LOSState, carrier_frequency: float) -> float: return IndoorFactoryRealization.__aod_spread_std[state] # Parameters for computing the mean angle of arrival spread # TR 138.901 v17.0.0 Table 7.5-6 __aoa_spread_mean: Mapping[LOSState, Tuple[float, float]] = { LOSState.LOS: (-0.18, 1.78), LOSState.NLOS: (0.0, 1.72), } @staticmethod def _aoa_spread_mean(state: LOSState, carrier_frequency: float) -> float: mean_parameters = IndoorFactoryRealization.__aoa_spread_mean[state] return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *mean_parameters) # Parameters for computing the standard deviation of the angle of arrival spread # TR 138.901 v17.0.0 Table 7.5-6 __aoa_spread_std: Mapping[LOSState, Tuple[float, float]] = { LOSState.LOS: (0.12, 0.2), LOSState.NLOS: (0.0, 0.3), } @staticmethod def _aoa_spread_std(state: LOSState, carrier_frequency: float) -> float: std_parameters = IndoorFactoryRealization.__aoa_spread_std[state] return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *std_parameters) # Parameters for computing the mean zenith of arrival spread # TR 138.901 v17.0.0 Table 7.5-6 __zoa_spread_mean: Mapping[LOSState, Tuple[float, float]] = { LOSState.LOS: (-0.2, 1.5), LOSState.NLOS: (-0.13, 1.45), } @staticmethod def _zoa_spread_mean(state: LOSState, carrier_frequency: float) -> float: mean_parameters = IndoorFactoryRealization.__zoa_spread_mean[state] return IndoorFactoryRealization.__parameter_dependency(carrier_frequency, *mean_parameters) # Parameters for computing the standard deviation of the zenith of arrival spread # TR 138.901 v17.0.0 Table 7.5-6 __zoa_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.35, LOSState.NLOS: 0.45} @staticmethod def _zoa_spread_std(state: LOSState, carrier_frequency: float) -> float: return IndoorFactoryRealization.__zoa_spread_std[state] @staticmethod def _rice_factor_mean() -> float: return 7.0 @staticmethod def _rice_factor_std() -> float: return 8.0 # Delay scaling factors for different LOS states # TR 138.901 v17.0.0 Table 7.5-6 __delay_scaling: Mapping[LOSState, float] = {LOSState.LOS: 2.7, LOSState.NLOS: 3.0} @staticmethod def _delay_scaling(state: LOSState) -> float: return IndoorFactoryRealization.__delay_scaling[state] # Mean cross-polarization power ratio for different LOS states # TR 138.901 v17.0.0 Table 7.5-6 __cross_polarization_power_mean: Mapping[LOSState, float] = { LOSState.LOS: 12.0, LOSState.NLOS: 11.0, } @staticmethod def _cross_polarization_power_mean(state: LOSState) -> float: return IndoorFactoryRealization.__cross_polarization_power_mean[state] @staticmethod def _cross_polarization_power_std(state: LOSState) -> float: # TR 138.901 v17.0.0 Table 7.5-6 return 6.0 @staticmethod def _num_clusters(state: LOSState) -> int: return 25 @staticmethod def _cluster_delay_spread(state: LOSState, carrier_frequency: float) -> float: return 0.0 # pragma: no cover @staticmethod def _cluster_aod_spread(state: LOSState) -> float: return 5.0 @staticmethod def _cluster_aoa_spread(state: LOSState) -> float: return 8.0 @staticmethod def _cluster_zoa_spread(state: LOSState) -> float: return 9.0 # Standard deviation of the shadowing for different LOS states in dB # TR 138.901 v17.0.0 Table 7.5-6 __cluster_shadowing_std: Mapping[LOSState, float] = {LOSState.LOS: 4.0, LOSState.NLOS: 3.0} @staticmethod def _cluster_shadowing_std(state: LOSState) -> float: return IndoorFactoryRealization.__cluster_shadowing_std[state] # Mean zenith of departure spread for different LOS states # TR 138.901 v17.0.0 Table 7.5-11 __zod_spread_mean: Mapping[LOSState, float] = {LOSState.LOS: 1.35, LOSState.NLOS: 1.2} @staticmethod def _zod_spread_mean(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float: return IndoorFactoryRealization.__zod_spread_mean[state] # Standard deviation of the zenith of departure spread for different LOS states # TR 138.901 v17.0.0 Table 7.5-11 __zod_spread_std: Mapping[LOSState, float] = {LOSState.LOS: 0.35, LOSState.NLOS: 0.5} @staticmethod def _zod_spread_std(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float: return IndoorFactoryRealization.__zod_spread_std[state] @staticmethod def _zod_offset(state: LOSState, parameters: ClusterDelayLineSampleParameters) -> float: # TR 138.901 v17.0.0 Table 7.5-11 return 0.0
[docs] @override def serialize(self, process: SerializationProcess) -> None: ClusterDelayLineRealization.serialize(self, process) process.serialize_object(self.__los_realization, "los_realization") process.serialize_object(self.__nlos_realization, "nlos_realization") process.serialize_floating(self.__volume, "volume") process.serialize_floating(self.__surface, "surface") process.serialize_object(self.__factory_type, "factory_type") process.serialize_floating(self.__clutter_height, "clutter_height")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> IndoorFactoryRealization: return IndoorFactoryRealization( expected_state=process.deserialize_object("expected_state", LOSState, None), los_realization=process.deserialize_object("los_realization", ConsistentRealization), nlos_realization=process.deserialize_object("nlos_realization", ConsistentRealization), volume=process.deserialize_floating("volume"), surface=process.deserialize_floating("surface"), factory_type=process.deserialize_object("factory_type", FactoryType), clutter_height=process.deserialize_floating("clutter_height"), sample_hooks=set(), **ClusterDelayLineRealization._DeserializeParameters(process), # type: ignore[arg-type] )
[docs] class IndoorFactory(ClusterDelayLineBase[IndoorFactoryRealization, LOSState]): """3GPP cluster delay line preset modeling an indoor factory scenario.""" __DEFAULT_CLUTTER_HEIGHT = 0.0 __volume: float # Hall volume in m3 __surface: float # Total surface hall area in m2 (walls/floor/ceiling) __factory_type: FactoryType __clutter_height: float def __init__( self, volume: float, surface: float, factory_type: FactoryType, clutter_height: float = __DEFAULT_CLUTTER_HEIGHT, delay_normalization: DelayNormalization = ClusterDelayLineBase._DEFAULT_DELAY_NORMALIZATION, oxygen_absorption: bool = ClusterDelayLineBase._DEFAULT_OXYGEN_ABSORPTION, expected_state: LOSState | None = None, gain: float = ClusterDelayLineBase._DEFAULT_GAIN, seed: int | None = None, ) -> None: """ Args: volume: Hall volume in :math:`\\mathrm{m}^3`. surfac: Total surface hall area in :math:`\\mathrm{m}^2`. (walls/floor/ceiling). factory_type: Type of the factory. clutter_height: Height of the clutter in the factory hall in meters above the floor. Zero by default, meaning virtually no clutter. gain: Linear channel energy gain factor. Initializes the :meth:`gain<hermespy.channel.channel.Channel.gain>` property. :math:`1.0` by default. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base class ClusterDelayLineBase.__init__( self, delay_normalization, oxygen_absorption, expected_state, gain, seed ) # Initialize class attributes self.volume = volume self.surface = surface self.factory_type = factory_type self.clutter_height = clutter_height @property def volume(self) -> float: """Assumed factory hall volume in :math:`\\mathrm{m}^3`. Raises: ValueError: For values smaller or equal to zero. """ return self.__volume @volume.setter def volume(self, value: float) -> None: if value <= 0.0: raise ValueError("Hall volume must be greater than zero") self.__volume = value @property def surface(self) -> float: """Assumed factory hall surface in :math:`\\mathrm{m}^2`. Raises: ValueError: For values smaller or equal to zero. """ return self.__surface @surface.setter def surface(self, value: float) -> None: if value <= 0.0: raise ValueError("Hall surface area must be greater than zero") self.__surface = value @property def factory_type(self) -> FactoryType: """Assumed type of factory.""" return self.__factory_type @factory_type.setter def factory_type(self, value: FactoryType) -> None: self.__factory_type = value @property def clutter_height(self) -> float: """Cluter height in m. Denoted by :math:`h_c` within the respective equations. Should be lower than ceiling height and in between zero and 10m. """ return self.__clutter_height @clutter_height.setter def clutter_height(self, value: float) -> None: if value < 0.0 or value > 10.0: raise ValueError("Clutter height should be in the interval 0-10m") self.__clutter_height = value @property def max_num_clusters(self) -> int: return 25 @property def max_num_rays(self) -> int: return 20 @property def _large_scale_correlations(self) -> np.ndarray: # Large scale cross correlations # TR 138.901 v17.0.0 Table 7.5-6 return np.array( [ # LOS NLOS [+0.00, +0.00], # 0: ASD vs DS [+0.00, +0.00], # 1: ASA vs DS [+0.00, +0.00], # 2: ASA VS SF [+0.00, +0.00], # 3: ASD vs SF [+0.00, +0.00], # 4: DS vs SF [+0.00, +0.00], # 5: ASD vs ASA [-0.50, +0.00], # 6: ASD vs K [+0.00, +0.00], # 7: ASA vs K [-0.70, +0.00], # 8: DS vs K [+0.00, +0.00], # 9: SF vs K [+0.00, +0.00], # 10: ZSD vs SF [+0.00, +0.00], # 11: ZSA vs SF [+0.00, +0.00], # 12: ZSD vs K [+0.00, +0.00], # 13: ZSA vs K [+0.00, +0.00], # 14: ZSD vs DS [+0.00, +0.00], # 15: ZSA vs DS [+0.00, +0.00], # 16: ZSD vs ASD [+0.00, +0.00], # 17: ZSA vs ASD [+0.00, +0.00], # 18: ZSD vs ASA [+0.00, +0.00], # 19: ZSA vs ASA [+0.00, +0.00], # 20: ZSD vs ZSA ], dtype=np.float64, ).T def _initialize_realization( self, state_generator: ConsistentGenerator, parameter_generator: ConsistentGenerator, parameters: ClusterDelayLineRealizationParameters, ) -> IndoorFactoryRealization: # Generate realizations for each large scale state # TR 138.901 v17.0.0 Table 7.6.3.1-2 state_realization = state_generator.realize(0.5 * self.factory_type.clutter_size) los_realization = parameter_generator.realize(10.0) nlos_realization = parameter_generator.realize(10.0) return IndoorFactoryRealization( self.expected_state, state_realization, los_realization, nlos_realization, parameters, self.volume, self.surface, self.factory_type, self.clutter_height, self.sample_hooks, self.gain, )
[docs] @override def serialize(self, process: SerializationProcess) -> None: ClusterDelayLineBase.serialize(self, process) process.serialize_object(self.factory_type, "factory_type") process.serialize_floating(self.volume, "volume") process.serialize_floating(self.surface, "surface") process.serialize_floating(self.clutter_height, "clutter_height")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> IndoorFactory: return IndoorFactory( process.deserialize_floating("volume"), process.deserialize_floating("surface"), process.deserialize_object("factory_type", FactoryType), process.deserialize_floating("clutter_height", cls.__DEFAULT_CLUTTER_HEIGHT), **ClusterDelayLineBase._DeserializeParameters(process), # type: ignore[arg-type] )