# -*- coding: utf-8 -*-
"""
===============
Device Modeling
===============
.. mermaid::
%%{init: {'theme': 'dark'}}%%
classDiagram-v2
direction RL
class Device {
topology
carrier_frequency
transmit()
receive()
}
class SimulatedDevice {
position
orientation
velocity
acceleration
}
class HardwareDevice {
trigger()
}
class Operator {
transmit()
receive()
}
class Modem {
tx_bits
rx_bits
...
waveform
transmit()
receive()
}
class Radar {
waveform
...
target_estimates
transmit()
receive()
}
Operator <|-- Modem
Operator <|-- Radar
Device *-- Operator
SimulatedDevice <|-- Device
HardwareDevice <|-- Device
.. mermaid::
%%{init: {'theme': 'dark'}}%%
flowchart LR
channel{Real World}
subgraph devicea[HardwareDevice]
direction TB
deviceatx>Tx Slot] --> deviceabinding[Hardware Binding] --> devicearx>Rx Slot]
end
subgraph deviceb[HardwareDevice]
direction TB
devicebtx>Tx Slot] --> devicebbinding[Hardware Binding] --> devicebrx>Rx Slot]
end
deviceabinding <--> channel
devicebbinding <--> channel
.. mermaid::
%%{init: {'theme': 'dark'}}%%
flowchart LR
channel{Channel Modeling}
subgraph devicea[SimulatedDevice]
direction TB
deviceatx>Tx Slot]
devicearx>Rx Slot]
end
subgraph deviceb[SimulatedDevice]
direction TB
devicebtx>Tx Slot]
devicebrx>Rx Slot]
end
deviceatx --> channel --> devicearx
devicebtx --> channel --> devicebrx
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from itertools import chain
from math import ceil
from typing import Generic, Iterator, List, Optional, overload, Type, TypeVar
from h5py import Group
from scipy.constants import speed_of_light
from .antennas import AntennaArray
from .factory import HDFSerializable, Serializable
from .random_node import RandomNode
from .signal_model import Signal
from .transformation import Transformable, Transformation
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class FloatingError(RuntimeError):
"""Exception raised if an operation fails due to a currently being considered floating."""
pass
OperationResultType = TypeVar("OperationResultType", bound="OperationResult")
"""Type of OperationResult"""
OperatorType = TypeVar("OperatorType", bound="Operator")
"""Type of operator."""
DeviceType = TypeVar("DeviceType", bound="Device")
"""Type of device."""
[docs]
class OperationResult(HDFSerializable):
"""Base class for outputs of device operators."""
signal: Signal
"""Base-band model of the processed signal."""
def __init__(self, signal: Signal) -> None:
"""
Args:
signal (signal):
Base-band waveform model.
"""
self.signal = signal
@classmethod
def from_HDF(cls: Type[OperationResultType], group: Group) -> OperationResultType:
signal = Signal.from_HDF(group["signal"])
return cls(signal=signal)
def to_HDF(self, group: Group) -> None:
self.signal.to_HDF(self._create_group(group, "signal"))
[docs]
class Transmission(OperationResult):
"""Information generated by transmitting over a transmit operator."""
... # pragma: no cover
[docs]
class Reception(OperationResult):
"""Information generated by receiving over a receive operator."""
... # pragma: no cover
ReceptionType = TypeVar("ReceptionType", bound=Reception)
"""Type variable of a :class:`Reception`."""
SlotType = TypeVar("SlotType", bound="OperatorSlot")
"""Type of slot."""
class Operator(Generic[SlotType], Serializable):
"""Base class for operators of devices.
In HermesPy, operators may configure devices, broadcast signals over them or capture signals from them.
Each operator is attached to a single device instance it operates on.
"""
property_blacklist = {"slot"}
__slot: Optional[SlotType] # Slot within a device this operator 'operates'
def __init__(self, slot: Optional[SlotType] = None) -> None:
"""
Args:
slot (OperatorSlot, optional):
Device slot this operator is attached to.
By default, the operator is not attached and considered floating.
"""
self.slot = slot
@property
def slot(self) -> Optional[OperatorSlot]:
"""Device slot this operator operates.
Returns:
Handle to the device slot.
`None` if the operator is currently considered floating.
:meta private:
"""
return self.__slot
@slot.setter
def slot(self, value: Optional[SlotType]) -> None:
# A None argument indicates the slot should be unbound
if value is None:
if hasattr(self, "_Operator__slot") and self.__slot is not None:
# This is necessary to prevent event-loops. Just ignore it.
slot = self.__slot
self.__slot = None
if slot.registered(self):
slot.remove(self)
else:
self.__slot = None
elif not hasattr(self, "_Operator__slot"):
self.__slot = value
if not self.__slot.registered(self):
self.__slot.add(self)
elif self.__slot is not value:
# if self.__slot is not None and self.__slot.registered(self):
# self.__slot.remove(self)
self.__slot = value
if not self.__slot.registered(self):
self.__slot.add(self)
@property
def slot_index(self) -> int | None:
"""Index of the operator within its slot.
Returns:
Index of the operator.
`None` if the operator is currently considered floating.
:meta private:
"""
if self.__slot is None:
return None
return self.__slot.operator_index(self)
@property
def device(self) -> Device | None:
"""Device this object is assigned to.
:obj:`None` if this object is currently considered floating / unassigned.
"""
if self.__slot is None:
return None
return self.__slot.device
@property
def attached(self) -> bool:
"""Attachment state of the operator.
Indicates whether this object is currently assigned a :class:`.Device` instance.
"""
return self.__slot is not None
@property
@abstractmethod
def frame_duration(self) -> float:
"""Duration of a single sample frame in seconds.
Denoted as :math:`T_{\\mathrm{F}}` of unit :math:`\\left[ T_{\\mathrm{F}} \\right] = \\mathrm{s}` in literature.
"""
... # pragma: no cover
@property
@abstractmethod
def sampling_rate(self) -> float:
"""The operator's preferred sampling rate in Hz.
Denoted as :math:`f_{\\mathrm{S}}` of unit :math:`\\left[ f_{\\mathrm{S}} \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in literature.
"""
... # pragma: no cover
[docs]
class DeviceOutput(HDFSerializable):
"""Base class for the output of a device."""
__mixed_signal: Signal | None
def __init__(self, mixed_signal: Signal) -> None:
"""
Args:
mixed_signal (Signal): Mixed signal emerging from the device.
"""
self.__mixed_signal = mixed_signal
@property
def sampling_rate(self) -> float:
"""Sampling rate at which the device operates.
Returns: Sampling rate in Hz.
"""
return self.__mixed_signal.sampling_rate
@property
def num_antennas(self) -> int:
"""Number of transmitting device antennas.
Returns: Number of antennas.
"""
return self.__mixed_signal.num_streams
@property
def carrier_frequency(self) -> float:
"""Carrier frequency at which the device operates.
Returns: Carrier frequency in Hz.
"""
return self.__mixed_signal.carrier_frequency
@property
def emerging_signals(self) -> Sequence[Signal]:
"""Models of electromagnetic signals generated by the device.
Returns: List of signal models.
"""
return [self.__mixed_signal]
@property
def num_emerging_signals(self) -> int:
"""Number of emerging signal models.
Returns: Count of models.
"""
return len(self.emerging_signals)
@property
def mixed_signal(self) -> Signal:
"""Combined signal of the device's transmissions.
Returns: Signal model of the device's electromagnetic transmission.
"""
return self.__mixed_signal
@classmethod
def from_HDF(cls: Type[DeviceOutput], group: Group) -> DeviceOutput:
# Recall serialized mixed signal group
mixed_signal = Signal.from_HDF(group["mixed_signal"])
# Initialize object
return cls(mixed_signal)
def to_HDF(self, group: Group) -> None:
# Serialize groups
self.mixed_signal.to_HDF(self._create_group(group, "mixed_signal"))
[docs]
class DeviceTransmission(DeviceOutput):
"""Information generated by transmitting over a device."""
__operator_transmissions: Sequence[Transmission]
def __init__(
self, operator_transmissions: Sequence[Transmission], mixed_signal: Signal
) -> None:
"""
Args:
operator_transmissions (Sequence[Transmission]):
List of information generated by transmit operators.
mixed_signal (Signal):
Mixed signal transmitted by the device.
"""
# Initialize base class
DeviceOutput.__init__(self, mixed_signal)
# Initialze class attributes
self.__operator_transmissions = operator_transmissions
[docs]
@classmethod
def From_Output(
cls: Type[DeviceTransmission],
output: DeviceOutput,
operator_transmissions: Sequence[Transmission],
) -> DeviceTransmission:
"""Initialize a device transmission from a device output.
Args:
output (DeviceOutput):
Device output object.
operator_transmissions (List[Transmission]):
List of information generated by transmit operators.
"""
return cls(operator_transmissions, output.mixed_signal)
@property
def operator_transmissions(self) -> Sequence[Transmission]:
"""Information transmitted by device operators.
Returns: Operator information.
"""
return self.__operator_transmissions
@property
def num_operator_transmissions(self) -> int:
"""Number of operator transmissions."""
return len(self.__operator_transmissions)
@classmethod
def from_HDF(cls: Type[DeviceTransmission], group: Group) -> DeviceTransmission:
# Recall base class
device_output = DeviceOutput.from_HDF(group)
# Recall attributes
num_transmissions = group.attrs.get("num_transmissions", 1)
# Recall transmissions
transmissions = [
Transmission.from_HDF(group[f"transmission_{t:02d}"]) for t in range(num_transmissions)
]
# Initialize object
return cls.From_Output(device_output, transmissions)
[docs]
@classmethod
def Recall(cls: Type[DeviceTransmission], group: Group, device: Device) -> DeviceTransmission:
# Recall base class
device_output = DeviceOutput.from_HDF(group)
# Recall attributes
num_transmissions = group.attrs.get("num_transmissions", 1)
transmissions = [
transmitter.recall_transmission(group[f"transmission_{t:02d}"])
for t, transmitter in zip(range(num_transmissions), device.transmitters)
]
# Initialize object
return cls.From_Output(device_output, transmissions)
def to_HDF(self, group: Group) -> None:
# Serialize base class
DeviceOutput.to_HDF(self, group)
# Serialize attributes
group.attrs["num_transmissions"] = self.num_operator_transmissions
# Serialize groups
for t, transmission in enumerate(self.operator_transmissions):
transmission.to_HDF(self._create_group(group, f"transmission_{t:02d}"))
DRT = TypeVar("DRT", bound="DeviceReception")
"""Type of device reception."""
[docs]
class DeviceReception(ProcessedDeviceInput):
"""Information generated by receiving over a device and its operators."""
__operator_receptions: Sequence[Reception]
def __init__(
self,
impinging_signals: Signal | Sequence[Signal],
operator_inputs: Sequence[Signal],
operator_receptions: Sequence[Reception],
) -> None:
"""
Args:
impinging_signals (Signal | Sequence[Signal]):
Numpy vector containing lists of signals impinging onto the device.
operator_inputs (Sequence[Signal]):
Information cached by the device operators.
operator_separation (bool):
Is the operator separation flag enabled?
operator_receptions (Sequence[Reception]):
Information inferred from receive operators.
"""
# Initialize base class
ProcessedDeviceInput.__init__(self, impinging_signals, operator_inputs)
# Initialize attributes
self.__operator_receptions = operator_receptions
@property
def operator_receptions(self) -> Sequence[Reception]:
"""Information transmitted by device operators.
Returns: Operator information.
"""
return self.__operator_receptions
@property
def num_operator_receptions(self) -> int:
"""Number of operator receptions.
Returns: Number of receptions.
"""
return len(self.__operator_receptions)
@classmethod
def from_HDF(cls: Type[DRT], group: Group) -> DRT:
# Recall base class
device_input = ProcessedDeviceInput.from_HDF(group)
# Recall individual operator receptions
num_receptions = group.attrs.get("num_operator_receptions", 0)
# Recall operator receptions
operator_receptions = [
Reception.from_HDF(group[f"reception_{f:02d}"]) for f in range(num_receptions)
]
# Initialize object
return cls.From_ProcessedDeviceInput(device_input, operator_receptions)
[docs]
@classmethod
def Recall(cls: Type[DRT], group: Group, device: Device) -> DRT:
# Recall base class
device_input = ProcessedDeviceInput.from_HDF(group)
# Recall individual operator receptions
num_receptions = group.attrs.get("num_operator_receptions", 0)
# recall operator receptions
receptions = [
receiver.recall_reception(group[f"reception_{r:02d}"])
for r, receiver in zip(range(num_receptions), device.receivers)
]
# Initialize object
return cls.From_ProcessedDeviceInput(device_input, receptions)
def to_HDF(self, group: Group) -> None:
# Serialize base class
ProcessedDeviceInput.to_HDF(self, group)
# Serialize groups
for t, reception in enumerate(self.operator_receptions):
reception.to_HDF(self._create_group(group, f"reception_{t:02d}"))
# Serialize attributes
group.attrs["num_operator_receptions"] = self.num_operator_receptions
class MixingOperator(ABC, Generic[SlotType], Operator[SlotType]):
"""Base class for operators performing mixing operations."""
__carrier_frequency: Optional[float] # Carrier frequency
def __init__(
self, carrier_frequency: Optional[float] = None, slot: Optional[SlotType] = None
) -> None:
"""
Args:
carrier_frequency (float, optional):
Central frequency of the mixed signal in radio-frequency transmission band.
slot (SlotType, optional):
Device slot this operator operators.
"""
self.carrier_frequency = carrier_frequency
Operator.__init__(self, slot)
@property
def carrier_frequency(self) -> float:
"""Central frequency of the mixed signal in radio-frequency transmission band in Hz.
Denoted by :math:`f_c` with unit :math:`\\left[ f_c \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in the literature.
By default, the carrier frequency of the assigned :class:`.Device` is returned.
If no device is assigned, we assume a base band, i.e. :math:`f_c = 0`.
Raises:
ValueError: If the carrier frequency is smaller than zero.
"""
if self.__carrier_frequency is None:
if self.device is None:
return 0.0
else:
return self.device.carrier_frequency
return self.__carrier_frequency
@carrier_frequency.setter
def carrier_frequency(self, value: Optional[float]) -> None:
if value is None:
self.__carrier_frequency = None
return
if value < 0.0:
raise ValueError("Carrier frequency must be greater or equal to zero")
self.__carrier_frequency = value
[docs]
class Receiver(MixingOperator["ReceiverSlot"], Generic[ReceptionType], RandomNode):
"""Operator receiving from a device."""
__reference: Device | None
__signal: Signal | None
__reception: ReceptionType | None
__selected_receive_ports: Sequence[int]
def __init__(
self,
seed: int | None = None,
reference: Device | None = None,
selected_receive_ports: Sequence[int] | None = None,
*args,
**kwargs,
) -> None:
"""
Args:
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
reference (Device, optional):
Reference device to which the channel shall be estimated.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available antenna ports will be considered.
*args:
Operator base class initialization parameters.
\**kwargs:
Operator base class initialization parameters.
"""
# Initialize base classes
RandomNode.__init__(self, seed=seed)
MixingOperator.__init__(self, *args, **kwargs)
# Initialize class attributes
self.reference = reference
self.selected_receive_ports = selected_receive_ports
self.__signal = None
self.__reception = None
@Operator.device.setter # type: ignore
def device(self, value: Device | None) -> None: # type: ignore
value.receivers.add(self)
@Operator.slot.setter # type: ignore
def slot(self, value: Optional[ReceiverSlot]) -> None:
Operator.slot.fset(self, value) # type: ignore
self.random_mother = None if value is None else value.device
@property
def reference(self) -> Optional[Device]:
"""Reference transmitter for this receiver.
Returns:
A handle to the referenced device.
`None` if the device was not specified.
"""
return self.__reference
@reference.setter
def reference(self, value: Optional[Device]) -> None:
self.__reference = value
@property
def selected_receive_ports(self) -> Sequence[int] | None:
"""Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If :py:obj:`None`, all available antenna ports will be considered.
Raises:
ValueError: If the selected ports don't match the configured device's receive antenna array configuration.
"""
return self.__selected_receive_ports
@selected_receive_ports.setter
def selected_receive_ports(self, value: Sequence[int] | None) -> None:
if value is None:
self.__selected_receive_ports = None
return
# Make sure the provided indices match the underlying device's antenna array configuration
if self.device is not None:
if max(value) > (self.device.antennas.num_receive_ports - 1):
raise ValueError(
"Receive antenna selection indices don't match the device's receive antenna configuration"
)
self.__selected_receive_ports = value
@property
def num_receive_ports(self) -> int:
"""Number of considered receiving ports."""
if self.selected_receive_ports is None:
if self.device is None:
return 0
return self.device.antennas.num_receive_ports
return len(self.selected_receive_ports)
@property
def num_receive_antennas(self) -> int:
"""Number of considered receiving antennas."""
if self.device is None:
return 0
if self.selected_receive_ports is None:
return self.device.antennas.num_receive_antennas
return self.device.antennas.count_receive_antennas(self.selected_receive_ports)
@property
def reception(self) -> ReceptionType | None:
"""Information inferred from the most recent reception.
Updated during the :meth:`receive<receive>` routine.
:py:obj:`None` if the cache has been cleared or :meth:`receive<receive>`
has not been called yet.
"""
return self.__reception
[docs]
def receive(self, signal: Signal | None = None, cache: bool = True) -> ReceptionType:
"""Process a received signal by the receiver.
Wrapper around the abstract :meth:`_receive<Receiver._receive>` method.
Args:
signal (Signal, optional): Signal model to be processed.
cache (bool, optional): Cache the received information
Returns: Information rceived by this operator.
Raises:
RuntimeError: If signal model was not provided and no signal is cached.
ValueError: If the number of signal streams does not match the number of receive ports.
"""
if signal is None:
if self.signal is None:
raise RuntimeError("Error attempting to fetch a cached receiver signal")
signal = self.signal
if signal.num_streams != self.num_receive_ports and self.num_receive_ports > 0:
raise ValueError(
f"Number of signal streams does not match the number of receive ports ({signal.num_streams} != {self.num_receive_ports})"
)
# Generate received information
reception = self._receive(signal)
# Cache the reception if the respective flag is enabled
if cache:
self.__reception = reception
# Return received information
return reception
@abstractmethod
def _receive(self, signal: Signal) -> ReceptionType:
"""Process a received signal by the receiver.
Subroutine of the public :meth:`receive<hermespy.core.device.Receiver.receive>`
method that performs the pipeline-specific receive processing
and consolidates the inferred information into a single
:class:`Reception<hermespy.core.device.Reception>` object.
Args:
signal (Signal): Multi-stream signal model to be processed.
Returns:
Information inferred from the received signal.
"""
... # pragma: no cover
@property
def signal(self) -> Signal | None:
"""Cached signal model to be received.
Signal model assumed by :meth:`receive` if no signal model is provided.
Updated with :meth:`cache_reception`.
:obj:`None` if cache has been cleared or :meth:`receive` has not been called yet.
"""
return self.__signal
[docs]
def cache_reception(self, signal: Signal | None) -> None:
"""Cache recent reception at this receiver.
Args:
signal (Signal | None):
Signal model to be cached for the next reception.
channel_realization (ChannelRealizationBase | None):
Recently received channel state.
"""
self.__signal = signal
def recall_reception(self, group: Group, cache: bool = True) -> ReceptionType:
"""Recall a specific reception from a serialization.
Internally calls the abstract method
:meth:`_recall_reception<Receiver._recall_reception>`.
Caches the reception if the respective flag is enabled.
Args:
group (Group):
HDF group containing the reception.
cache (bool, optional):
Cache the reception.
Enabled by default.
Returns: The recalled reception.
:meta private:
"""
recalled_reception = self._recall_reception(group)
if cache:
self.__reception = recalled_reception
return recalled_reception
@abstractmethod
def _recall_reception(self, group: Group) -> ReceptionType:
"""Recall a specific reception from a serialization.
Args:
group (Group):
HDF group containing the reception.
Returns: The recalled reception.
"""
... # pragma: no cover
@property
@abstractmethod
def power(self) -> float:
"""Expected power of the received signal in Watts.
.. note::
Applies only to the signal-carrying parts of the transmission,
silent parts shuch as guard intervals should not be considered.
"""
... # pragma: no cover
class OperatorSlot(Generic[OperatorType], Sequence[OperatorType]):
"""Slot list for operators of a single device."""
__device: Device # Device this operator belongs to
__operators: List[OperatorType] # List of registered operators
def __init__(self, device: Device) -> None:
"""
Args:
device (Device):
Device this slot belongs to.
"""
self.__device = device
self.__operators = []
@property
def device(self) -> Device:
"""Device this operator slot belongs to.
Returns:
Device: Handle to the device.
"""
return self.__device
def operator_index(self, operator: OperatorType) -> int:
"""Index of an operator within this slot.
Returns:
index (int): The `operator`'s index.
Raises:
ValueError: If the `operator` is not registered at this slot.
"""
return self.__operators.index(operator)
def add(self, operator: OperatorType) -> None:
"""Add a new operator to this slot.
Args:
operator (OperatorType):
Operator to be added to this slot.
Raises:
RuntimeError: If the `operator` is already registered at this slot.
"""
# Do nothing if the operator is already registered
if operator in self.__operators:
return
self.__operators.append(operator)
operator.slot = self
def remove(self, operator: OperatorType) -> None:
"""Remove an operator from this slot.
Args:
operator (OperatorType):
Handle to the operator to be removed.
"""
if operator in self.__operators:
self.__operators.remove(operator)
# Only detach from the operator's slot if it matches to this signal
# This is required to prevent duplex operators from accidentally detaching themselves
if isinstance(operator.slot, type(self)):
operator.slot = None
def registered(self, operator: OperatorType) -> bool:
"""Check if an operator is registered at this slot.
Returns:
bool: Boolean indicating the registration status.
"""
return operator in self.__operators
@property
def num_operators(self) -> int:
"""Number of operators on this slot.
Returns:
int: Number of operators.
"""
return len(self.__operators)
@property
def max_sampling_rate(self) -> float:
"""Maximum requested sampling rate of all operators.
Returns:
The maximum sampling rate in Hz.
"""
sampling_rate = 0.0
for operator in self.__operators:
sampling_rate = max(sampling_rate, operator.sampling_rate)
return sampling_rate
@property
def min_frame_duration(self) -> float:
"""The mininum frame duration requried to process at least a single frame for each operator.
Returns:
The minimum frame duration in seconds.
"""
frame_duration = 0.0
for operator in self.__operators:
frame_duration = max(frame_duration, operator.frame_duration)
return frame_duration
@property
def min_num_samples_per_frame(self) -> int:
"""The number of samples required to collect at last a single frame for each operator.
Returns:
Number of samples.
"""
sampling_rate = self.max_sampling_rate
frame_duration = self.min_frame_duration
return ceil(frame_duration * sampling_rate)
@overload
def __getitem__(self, item: int) -> OperatorType: ... # pragma: no cover
@overload
def __getitem__(self, item: slice) -> Sequence[OperatorType]: ... # pragma: no cover
def __getitem__(self, item: int | slice) -> OperatorType | Sequence[OperatorType]:
return self.__operators[item]
def __iter__(self) -> Iterator[OperatorType]:
"""Iterating over operator slots returns the iterator over the slots."""
return self.__operators.__iter__()
def __contains__(self, operator: object) -> bool:
"""Contains is just a convenient mask for registered.
Returns: Registration indicator.
"""
return operator in self.__operators
def __len__(self) -> int:
return self.num_operators
TransmissionType = TypeVar("TransmissionType", bound="Transmission")
"""Type variable of a :class:`Transmission`."""
[docs]
class Transmitter(MixingOperator["TransmitterSlot"], Generic[TransmissionType], RandomNode):
"""Operator transmitting over a device."""
__transmission: TransmissionType | None
__selected_transmit_ports: Sequence[int] | None
def __init__(
self,
seed: int | None = None,
selected_transmit_ports: Sequence[int] | None = None,
*args,
**kwargs,
) -> None:
"""
Args:
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args:
Operator base class initialization parameters.
\**kwargs:
Operator base class initialization parameters.
"""
# Initialize operator base class
RandomNode.__init__(self, seed=seed)
MixingOperator.__init__(self, *args, **kwargs)
# Initialize class attributes
self.__selected_transmit_ports = selected_transmit_ports
self.__transmission = None
[docs]
def transmit(self, duration: float = 0.0, cache: bool = True) -> TransmissionType:
"""Transmit a signal.
Registers the signal samples to be transmitted by the underlying device.
Wrapper around the abstract :meth:`_transmit<Transmitter._transmit>` method.
Args:
duration (float, optional):
Duration of the transmitted signal.
If not specified, the duration will be inferred by the transmitter.
cache (bool, optional):
Cache the transmission by the transmitter.
Enabled by default.
Returns:
Information generated while transmitting over this operator.
Raises:
FloatingError: If the transmitter is currently considered floating.
"""
transmission = self._transmit(duration)
if cache:
self.__transmission = transmission
return transmission
@abstractmethod
def _transmit(self, duration: float = -1.0) -> TransmissionType:
"""Generate information to be transmitted.
Subroutine of the public :meth:`transmit<Transmitter.transmit>` method that performs the pipeline-specific transmit-processing
and consolidates the generated information into a single :class:`Transmission` object.
Args:
duration (float, optional):
Duration of the transmitted signal in seconds.
If not specified, the duration of a single frame will be assumed.
Returns: Information to be transmitted.
"""
... # pragma: no cover
@property
def transmission(self) -> TransmissionType | None:
"""Recent transmission of the transmitter.
Updated during the :meth:`transmit<Transmitter.transmit>` routine.
:py:obj:`None` if the cache has been cleared or :meth:`transmit<Transmitter.transmit>`
has not been called yet.
"""
return self.__transmission
[docs]
def cache_transmission(self, transmission: TransmissionType) -> None:
"""Update the information resulting from the latest transmission.
Called by the :meth:`transmit<Transmitter.transmit>` routine.
Args:
transmission (TransmissionType):
The transmission to be cached.
"""
self.__transmission = transmission
def recall_transmission(self, group: Group, cache: bool = True) -> TransmissionType:
"""Recall a specific transmission from a serialization.
Internally calls the abstract method :meth:`Transmitter._recall_transmission`.
Caches the transmission if the respective flag is enabled.
Args:
group (Group):
HDF group containing the transmission.
cache (bool, optional):
Cache the transmission.
Enabled by default.
Returns: The recalled transmission
:meta private:
"""
recalled_transmission = self._recall_transmission(group)
if cache:
self.cache_transmission(recalled_transmission)
return recalled_transmission
@abstractmethod
def _recall_transmission(self, group: Group) -> TransmissionType:
"""Recall a specific transmission from a serialization.
Args:
group (Group):
HDF group containing the transmission.
Returns: The recalled transmission
"""
... # pragma: no cover
@property
@abstractmethod
def power(self) -> float:
"""Expected power of the transmitted signal in Watts.
.. note::
Applies only to the signal-carrying parts of the transmission,
silent parts shuch as guard intervals should not be considered.
"""
... # pragma: no cover
@property
def selected_transmit_ports(self) -> Sequence[int] | None:
"""Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If `None`, all available transmit ports will be considered.
Raises:
ValueError: If the selected ports don't match the configured device's transmit antenna array configuration.
"""
return self.__selected_transmit_ports
@selected_transmit_ports.setter
def selected_transmit_ports(self, value: Sequence[int] | None) -> None:
if value is None:
self.__selected_transmit_ports = None
return
# Make sure the provided indices match the underlying device's antenna array configuration
if self.device is not None:
if max(value) > (self.device.antennas.num_transmit_ports - 1):
raise ValueError(
"Transmit antenna selection indices don't match the device's receive antenna configuration"
)
self.__selected_transmit_ports = value
@property
def num_transmit_ports(self) -> int:
"""Number of considered transmitting ports."""
if self.selected_transmit_ports is None:
if self.device is None:
return 0
return self.device.antennas.num_transmit_ports
return len(self.selected_transmit_ports)
@property
def num_transmit_antennas(self) -> int:
"""Number of considered receiving antennas."""
if self.device is None:
return 0
if self.selected_transmit_ports is None:
return self.device.antennas.num_receive_antennas
return self.device.antennas.count_transmit_antennas(self.selected_transmit_ports)
@Operator.device.setter # type: ignore
def device(self, value: Device) -> None: # type: ignore
value.transmitters.add(self)
@Operator.slot.setter # type: ignore
def slot(self, value: Optional[TransmitterSlot]) -> None:
Operator.slot.fset(self, value) # type: ignore
self.random_mother = None if value is None else value.device
class TransmitterSlot(OperatorSlot[Transmitter]):
"""Slot for transmitting operators within devices."""
# Next signals to be transmitted
__transmissions: List[Optional[Transmission]]
def __init__(self, *args, **kwargs) -> None:
"""
Args:
*args:
OperatorSlot base class initialization parameters.
**kwargs:
OperatorSlot base class initialization parameters.
"""
self.__transmissions = []
# Init base class
OperatorSlot.__init__(self, *args, **kwargs)
def add(self, operator: Transmitter) -> None:
OperatorSlot.add(self, operator)
self.__transmissions.append(None)
def add_transmission(self, transmitter: Transmitter, transmission: Transmission) -> None:
"""Add a transmitter's most recent transmission.
Args:
transmitter(Transmitter):
The transmitter emitting the signal.
transmission (OperatorTransmission):
The information to be transmitted.
Raises:
ValueError: If the transmitter is not registered at this slot.
"""
if not self.registered(transmitter):
raise ValueError("Transmitter not registered at this slot")
if transmission.signal.num_streams != transmitter.num_transmit_ports:
raise ValueError(
f"Transmitted signal has invalid number of streams ({transmission.signal.num_streams} instead of {transmitter.num_transmit_ports})"
)
self.__transmissions[transmitter.slot_index] = transmission
def get_transmissions(self, clear_cache: bool = True) -> List[Transmission]:
"""Get recent transmissions.
Returns: A list of recent operator transmissions.
"""
transmissions: List[Transmission] = self.__transmissions.copy()
# Clear the cache if the respective flag is enabled
if clear_cache:
self.clear_cache()
return transmissions
def clear_cache(self) -> None:
"""Clear the cached transmission of all registered transmit operators."""
self.__transmissions = [None] * self.num_operators
class ReceiverSlot(OperatorSlot[Receiver]):
"""Slot for receiving operators within devices."""
def __init__(self, *args, **kwargs) -> None:
"""
Args:
*args:
OperatorSlot base class initialization parameters.
**kwargs:
OperatorSlot base class initialization parameters.
"""
# Init base class
OperatorSlot.__init__(self, *args, **kwargs)
class UnsupportedSlot(OperatorSlot):
"""Slot for unsupported operations within devices."""
def add(self, operator: Operator) -> None:
raise RuntimeError("Slot not supported by this device")
[docs]
class Device(ABC, Transformable, RandomNode, Serializable):
"""Physical device representation within HermesPy.
It acts as the basis for all transmissions and receptions of sampled electromagnetic signals.
"""
transmitters: TransmitterSlot
"""Transmitters broadcasting signals over this device."""
receivers: ReceiverSlot
"""Receivers capturing signals from this device"""
__power: float # Average power of the transmitted signals
def __init__(
self, power: float = 1.0, pose: Transformation | None = None, seed: Optional[int] = None
) -> None:
"""
Args:
power (float, optional):
Average power of the transmitted signals in Watts.
1.0 Watt by default.
pose (Transformation, optional):
Pose of the device with respect to its scenario coordinate system origin.
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
RandomNode.__init__(self, seed=seed)
Transformable.__init__(self, pose=pose)
# Initalize device attributes and properties
self.transmitters = TransmitterSlot(self)
self.receivers = ReceiverSlot(self)
self.power = power
@property
@abstractmethod
def antennas(self) -> AntennaArray:
"""Model of the device's antenna array."""
... # pragma: no cover
@property
def num_transmit_ports(self) -> int:
"""Number of available transmit antenna ports.
Shorthand to :attr:`antennas'<.antennas>` :attr:`num_transmit_ports<AntennaArray.num_transmit_ports>`.
"""
return self.antennas.num_transmit_ports
@property
def num_receive_ports(self) -> int:
"""Number of available receive antenna ports.
Short hand to :attr:`antennas'<.antennas>` :attr:`num_receive_ports<AntennaArray.num_receive_ports>`.
"""
return self.antennas.num_receive_ports
@property
def num_transmit_antennas(self) -> int:
"""Number of available transmit antennas.
Shorthand to :attr:`antennas'<antennas>` :meth:`num_transmit_antennas<AntennaArrayBase.num_transmit_antennas>`.
"""
return self.antennas.num_transmit_antennas
@property
def num_receive_antennas(self) -> int:
"""Number of available receive antennas.
Shorthand to :attr:`antennas'<antennas>` :meth:`num_receive_antennas<AntennaArrayBase.num_receive_antennas>`.
"""
return self.antennas.num_receive_antennas
@property
def power(self) -> float:
"""Average power of the transmitted signal signal.
Returns:
power (float): Power in Watt.
Raises:
ValueError: If `value` is smaller than zero.
"""
return self.__power
@power.setter
def power(self, value: float) -> None:
"""Set the average power of the transmitted signal."""
if value < 0.0:
raise ValueError("Average signal power must be greater or equal to zero")
self.__power = value
@property
def snr(self) -> float:
"""Signal to noise ratio of the received signal.
Returns: Linear signal to noise ratio
"""
# By default, we have no knowledge about the snr and assum no noise
return float("inf")
@property
def num_antennas(self) -> int:
"""Number of antennas within this device's topology.
Returns:
int: Number of antennas, greater or equal to one.
"""
return self.antennas.num_antennas
@property
def max_frame_duration(self) -> float:
"""Maximum frame duration transmitted by this device.
Returns:
max_duration (float): Maximum duration in seconds.
"""
max_duration = 0.0
for operator in chain(self.transmitters, self.receivers):
max_duration = max(max_duration, operator.frame_duration) # type: ignore
return max_duration
@property
@abstractmethod
def carrier_frequency(self) -> float:
"""Central frequency of the device's emissions in the RF-band.
Returns:
frequency (float):
Carrier frequency in Hz.
Raises:
ValueError: On negative carrier frequencies.
"""
... # pragma: no cover
@property
def wavelength(self) -> float:
"""Central wavelength of the device's emmissions in the RF-band.
Returns:
wavelength (float):
Wavelength in m.
Raises:
ValueError:
On wavelengths smaller or equal to zero.
"""
return speed_of_light / self.carrier_frequency
@property
@abstractmethod
def sampling_rate(self) -> float:
"""Sampling rate at which the device's analog-to-digital converters operate.
Returns:
sampling_rate (float): Sampling rate in Hz.
Raises:
ValueError: If the sampling rate is not greater than zero.
"""
... # pragma: no cover
[docs]
def transmit_operators(self) -> List[Transmission]:
"""Generate transmitted information for all registered operators.
Calls :meth:`Transmitter.transmit` for each transmit operator.
Returns: List of operator transmisisons.
"""
return [transmitter.transmit() for transmitter in self.transmitters]
[docs]
def generate_output(
self, operator_transmissions: List[Transmission] | None = None, resample: bool = True
) -> DeviceOutput:
"""Generate the device's output.
Args:
operator_transmissions (List[Transmissions], optional):
List of operator transmissions from which to generate the output.
If the `operator_transmissions` are not provided, the transmitter's caches will be queried.
Returns: The device's output.
Raises:
RuntimeError: If no `operator_transmissions` were provided and an operator has no cached transmission.
"""
# Generate operator transmissions if None were provided:
operator_transmissions = (
[o.transmission for o in self.transmitters]
if operator_transmissions is None
else operator_transmissions
)
# Query the intended transmission ports of each operator
operator_streams = [o.selected_transmit_ports for o in self.transmitters]
# Superimpose the operator transmissions to the device's RF configuration
superimposed_signal = Signal.Empty(
self.sampling_rate, self.num_transmit_ports, carrier_frequency=self.carrier_frequency
)
for transmission, indices in zip(operator_transmissions, operator_streams):
if transmission is not None:
superimposed_signal.superimpose(
transmission.signal, resample=resample, stream_indices=indices
)
return DeviceOutput(superimposed_signal)
[docs]
def transmit(self, clear_cache: bool = True) -> DeviceTransmission:
"""Transmit over this device.
Args:
clear_cache (bool, optional):
Clear the cache of operator signals to be transmitted.
Enabled by default.
Returns: Information transmitted by this device.
"""
operator_transmissions = self.transmit_operators()
device_output = self.generate_output(operator_transmissions)
return DeviceTransmission.From_Output(device_output, operator_transmissions)
[docs]
def cache_transmission(self, transmission: DeviceTransmission) -> None:
for transmitter, operator_transmission in zip(
self.transmitters, transmission.operator_transmissions
):
transmitter.cache_transmission(operator_transmission)
[docs]
def receive_operators(
self,
operator_inputs: ProcessedDeviceInput | Sequence[Signal | None] | None = None,
cache: bool = True,
) -> List[Reception]:
"""Receive over the registered operators.
Args:
operator_inputs (ProcessedDeviceInput | Sequence[Signal | None], optional):
Signal models fed to the receive operators.
If not provided, the operatores are expected to have inputs cached
cache (bool, optional):
Cache the generated received information at the device's receive operators.
Enabled by default.
Returns: List of information generated by receiving over the device's operators.
Raises:
ValueError: If the number of operator inputs does not match the number of receive operators.
RuntimeError: If no operator inputs were specified and an operator has no cached inputs.
"""
_operator_inputs: Sequence[Signal | None]
# Refer to the operator caches if no inputs were specified
if operator_inputs is None:
_operator_inputs = [o.signal for o in self.receivers]
# Convert object oriented input
elif isinstance(operator_inputs, ProcessedDeviceInput):
_operator_inputs = operator_inputs.operator_inputs
else:
_operator_inputs = operator_inputs
if len(_operator_inputs) != self.receivers.num_operators:
raise ValueError(
f"Number of operator inputs ({len(_operator_inputs)}) does not match the number of receive operators ({self.receivers.num_operators})"
)
# Generate receive information
receptions: List[Reception] = []
for operator, signal in zip(self.receivers, _operator_inputs):
reception = operator.receive(signal, cache)
receptions.append(reception)
return receptions
[docs]
def receive(
self, impinging_signals: DeviceInput | Signal | Sequence[Signal], cache: bool = True
) -> DeviceReception:
"""Receive over this device.
Internally calls :meth:`Device.process_input` and :meth:`Device.receive_operators`.
Args:
impinging_signals (Union[DeviceInput, Signal, Sequence[Signal]]):
The samples to be processed by the device.
cache (bool, optional):
Cache the received information.
Enabled by default.
Returns: The received device information.
"""
# Process input
processed_input = self.process_input(impinging_signals)
# Generate receptions
receptions = self.receive_operators(processed_input.operator_inputs, cache)
# Generate device reception
return DeviceReception.From_ProcessedDeviceInput(processed_input, receptions)