# -*- coding: utf-8 -*-
"""
===============
Device Modeling
===============
.. mermaid::
%%{init: {'theme': 'dark'}}%%
classDiagram-v2
direction RL
class Device {
topology
carrier_frequency
transmit()
receive()
}
class SimulatedDevice {
position
orientation
velocity
acceleration
}
class HardwareDevice {
trigger()
}
class Operator {
transmit()
receive()
}
class Modem {
tx_bits
rx_bits
...
waveform
transmit()
receive()
}
class Radar {
waveform
...
target_estimates
transmit()
receive()
}
Operator <|-- Modem
Operator <|-- Radar
Device *-- Operator
SimulatedDevice <|-- Device
HardwareDevice <|-- Device
.. mermaid::
%%{init: {'theme': 'dark'}}%%
flowchart LR
channel{Real World}
subgraph devicea[HardwareDevice]
direction TB
deviceatx>Tx Slot] --> deviceabinding[Hardware Binding] --> devicearx>Rx Slot]
end
subgraph deviceb[HardwareDevice]
direction TB
devicebtx>Tx Slot] --> devicebbinding[Hardware Binding] --> devicebrx>Rx Slot]
end
deviceabinding <--> channel
devicebbinding <--> channel
.. mermaid::
%%{init: {'theme': 'dark'}}%%
flowchart LR
channel{Channel Modeling}
subgraph devicea[SimulatedDevice]
direction TB
deviceatx>Tx Slot]
devicearx>Rx Slot]
end
subgraph deviceb[SimulatedDevice]
direction TB
devicebtx>Tx Slot]
devicebrx>Rx Slot]
end
deviceatx --> channel --> devicearx
devicebtx --> channel --> devicebrx
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from itertools import chain
from math import ceil
from typing import Callable, Generic, Iterator, overload, Type, TypeVar
from h5py import Group
from ruamel.yaml import MappingNode, Node, SafeConstructor, SafeRepresenter, ScalarNode
from scipy.constants import speed_of_light
from .antennas import AntennaArray
from .factory import HDFSerializable, Serializable
from .hooks import Hook, Hookable
from .precoding import TransmitSignalCoding, ReceiveSignalCoding
from .random_node import RandomNode
from .signal_model import Signal
from .state import DeviceState, DST, ReceiveState, TransmitState
from .transformation import Transformable, Transformation
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class FloatingError(RuntimeError):
"""Exception raised if an operation fails due to a currently being considered floating."""
... # pragma: no cover
ORT = TypeVar("ORT", bound="OperationResult")
"""Type of OperationResult"""
OperatorType = TypeVar("OperatorType", bound="Operator")
"""Type of operator."""
DeviceType = TypeVar("DeviceType", bound="Device")
"""Type of device."""
[docs]
class OperationResult(HDFSerializable):
"""Base class for outputs of device operators."""
signal: Signal
"""Base-band model of the processed signal."""
def __init__(self, signal: Signal) -> None:
"""
Args:
signal (signal):
Base-band waveform model.
"""
self.signal = signal
@classmethod
def from_HDF(cls: Type[ORT], group: Group) -> ORT:
signal = Signal.from_HDF(group["signal"])
return cls(signal=signal)
def to_HDF(self, group: Group) -> None:
self.signal.to_HDF(self._create_group(group, "signal"))
[docs]
class Transmission(OperationResult):
"""Information generated by transmitting over a transmit operator."""
... # pragma: no cover
[docs]
class Reception(OperationResult):
"""Information generated by receiving over a receive operator."""
... # pragma: no cover
ReceptionType = TypeVar("ReceptionType", bound=Reception)
"""Type variable of a :class:`Reception`."""
SlotType = TypeVar("SlotType", bound="OperatorSlot")
"""Type of slot."""
class Operator(ABC):
"""Base class for operators of devices.
In HermesPy, operators may configure devices, broadcast signals over them or capture signals from them.
Each operator is attached to a single device instance it operates on.
"""
@property
@abstractmethod
def frame_duration(self) -> float:
"""Duration of a single sample frame in seconds.
Denoted as :math:`T_{\\mathrm{F}}` of unit :math:`\\left[ T_{\\mathrm{F}} \\right] = \\mathrm{s}` in literature.
"""
... # pragma: no cover
@property
@abstractmethod
def sampling_rate(self) -> float:
"""The operator's preferred sampling rate in Hz.
Denoted as :math:`f_{\\mathrm{S}}` of unit :math:`\\left[ f_{\\mathrm{S}} \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in literature.
"""
... # pragma: no cover
[docs]
class DeviceOutput(HDFSerializable):
"""Base class for the output of a device."""
__mixed_signal: Signal | None
def __init__(self, mixed_signal: Signal) -> None:
"""
Args:
mixed_signal (Signal): Mixed signal emerging from the device.
"""
self.__mixed_signal = mixed_signal
@property
def sampling_rate(self) -> float:
"""Sampling rate at which the device operates.
Returns: Sampling rate in Hz.
"""
return self.__mixed_signal.sampling_rate
@property
def num_antennas(self) -> int:
"""Number of transmitting device antennas.
Returns: Number of antennas.
"""
return self.__mixed_signal.num_streams
@property
def carrier_frequency(self) -> float:
"""Carrier frequency at which the device operates.
Returns: Carrier frequency in Hz.
"""
return self.__mixed_signal.carrier_frequency
@property
def emerging_signals(self) -> Sequence[Signal]:
"""Models of electromagnetic signals generated by the device.
Returns: List of signal models.
"""
return [self.__mixed_signal]
@property
def num_emerging_signals(self) -> int:
"""Number of emerging signal models.
Returns: Count of models.
"""
return len(self.emerging_signals)
@property
def mixed_signal(self) -> Signal:
"""Combined signal of the device's transmissions.
Returns: Signal model of the device's electromagnetic transmission.
"""
return self.__mixed_signal
@classmethod
def from_HDF(cls: Type[DeviceOutput], group: Group) -> DeviceOutput:
# Recall serialized mixed signal group
mixed_signal = Signal.from_HDF(group["mixed_signal"])
# Initialize object
return cls(mixed_signal)
def to_HDF(self, group: Group) -> None:
# Serialize groups
self.mixed_signal.to_HDF(self._create_group(group, "mixed_signal"))
DTT = TypeVar("DTT", bound="DeviceTransmission")
"""Type of device transmission."""
[docs]
class DeviceTransmission(DeviceOutput):
"""Information generated by transmitting over a device."""
__operator_transmissions: Sequence[Transmission]
def __init__(
self, operator_transmissions: Sequence[Transmission], mixed_signal: Signal
) -> None:
"""
Args:
operator_transmissions (Sequence[Transmission]):
List of information generated by transmit operators.
mixed_signal (Signal):
Mixed signal transmitted by the device.
"""
# Initialize base class
DeviceOutput.__init__(self, mixed_signal)
# Initialze class attributes
self.__operator_transmissions = operator_transmissions
[docs]
@classmethod
def From_Output(
cls: Type[DeviceTransmission],
output: DeviceOutput,
operator_transmissions: Sequence[Transmission],
) -> DeviceTransmission:
"""Initialize a device transmission from a device output.
Args:
output (DeviceOutput):
Device output object.
operator_transmissions (Sequence[Transmission]):
List of information generated by transmit operators.
"""
return cls(operator_transmissions, output.mixed_signal)
@property
def operator_transmissions(self) -> Sequence[Transmission]:
"""Information transmitted by device operators.
Returns: Operator information.
"""
return self.__operator_transmissions
@property
def num_operator_transmissions(self) -> int:
"""Number of operator transmissions."""
return len(self.__operator_transmissions)
[docs]
@classmethod
def from_HDF(
cls: Type[DeviceTransmission], group: Group, operators: Sequence[Transmitter] | None = None
) -> DeviceTransmission:
"""Recall a device transmission from a serialization.
Args:
group (Group):
HDF5 group containing the serialized device transmission.
operators (Sequence[Transmitter], optional):
List of device transmitters to recall the specific transmissions.
If not specified, the transmissions are recalled as their base class.
"""
# Recall base class
device_output = DeviceOutput.from_HDF(group)
# Recall attributes
num_transmissions = group.attrs.get("num_transmissions", 1)
# Recall transmissions
if operators is None:
transmissions = [
Transmission.from_HDF(group[f"transmission_{t:02d}"])
for t in range(num_transmissions)
]
else:
transmissions = [
operator.recall_transmission(group[f"transmission_{t:02d}"])
for t, operator in zip(range(num_transmissions), operators)
]
# Initialize object
return cls.From_Output(device_output, transmissions)
[docs]
@classmethod
def Recall(cls: Type[DeviceTransmission], group: Group, device: Device) -> DeviceTransmission:
# Recall base class
device_output = DeviceOutput.from_HDF(group)
# Recall attributes
num_transmissions = group.attrs.get("num_transmissions", 1)
transmissions = [
transmitter.recall_transmission(group[f"transmission_{t:02d}"])
for t, transmitter in zip(range(num_transmissions), device.transmitters)
]
# Initialize object
return cls.From_Output(device_output, transmissions)
def to_HDF(self, group: Group) -> None:
# Serialize base class
DeviceOutput.to_HDF(self, group)
# Serialize attributes
group.attrs["num_transmissions"] = self.num_operator_transmissions
# Serialize groups
for t, transmission in enumerate(self.operator_transmissions):
transmission.to_HDF(self._create_group(group, f"transmission_{t:02d}"))
DRT = TypeVar("DRT", bound="DeviceReception")
"""Type of device reception."""
[docs]
class DeviceReception(ProcessedDeviceInput):
"""Information generated by receiving over a device and its operators."""
__operator_receptions: Sequence[Reception]
def __init__(
self,
impinging_signals: Signal | Sequence[Signal],
operator_inputs: Sequence[Signal],
operator_receptions: Sequence[Reception],
) -> None:
"""
Args:
impinging_signals (Signal | Sequence[Signal]):
Numpy vector containing lists of signals impinging onto the device.
operator_inputs (Sequence[Signal]):
Signal models to be processed by receive DSP algorithms.
operator_separation (bool):
Is the operator separation flag enabled?
operator_receptions (Sequence[Reception]):
Information inferred from receive operators.
"""
# Initialize base class
ProcessedDeviceInput.__init__(self, impinging_signals, operator_inputs)
# Initialize attributes
self.__operator_receptions = operator_receptions
@property
def operator_receptions(self) -> Sequence[Reception]:
"""Information transmitted by device operators.
Returns: Operator information.
"""
return self.__operator_receptions
@property
def num_operator_receptions(self) -> int:
"""Number of operator receptions.
Returns: Number of receptions.
"""
return len(self.__operator_receptions)
@classmethod
def from_HDF(cls: Type[DRT], group: Group, operators: Sequence[Receiver] | None = None) -> DRT:
# Recall base class
device_input = ProcessedDeviceInput.from_HDF(group)
# Recall individual operator receptions
num_receptions = group.attrs.get("num_operator_receptions", 0)
# Recall receptions
if operators is None:
receptions = [
Reception.from_HDF(group[f"reception_{r:02d}"]) for r in range(num_receptions)
]
else:
receptions = [
operator.recall_reception(group[f"reception_{r:02d}"])
for r, operator in zip(range(num_receptions), operators)
]
# Initialize object
return cls.From_ProcessedDeviceInput(device_input, receptions)
[docs]
@classmethod
def Recall(cls: Type[DRT], group: Group, device: Device) -> DRT:
# Recall base class
device_input = ProcessedDeviceInput.from_HDF(group)
# Recall individual operator receptions
num_receptions = group.attrs.get("num_operator_receptions", 0)
# recall operator receptions
receptions = [
receiver.recall_reception(group[f"reception_{r:02d}"])
for r, receiver in zip(range(num_receptions), device.receivers)
]
# Initialize object
return cls.From_ProcessedDeviceInput(device_input, receptions)
def to_HDF(self, group: Group) -> None:
# Serialize base class
ProcessedDeviceInput.to_HDF(self, group)
# Serialize groups
for t, reception in enumerate(self.operator_receptions):
reception.to_HDF(self._create_group(group, f"reception_{t:02d}"))
# Serialize attributes
group.attrs["num_operator_receptions"] = self.num_operator_receptions
class MixingOperator(Operator):
"""Base class for operators performing mixing operations."""
__carrier_frequency: float | None # Carrier frequency
def __init__(self, carrier_frequency: float | None = None) -> None:
"""
Args:
carrier_frequency (float, optional):
Central frequency of the mixed signal in radio-frequency transmission band.
"""
# Initialize base class
Operator.__init__(self)
# Initialize class attributes
self.carrier_frequency = carrier_frequency
@property
def carrier_frequency(self) -> float | None:
"""Central frequency of the mixed signal in radio-frequency transmission band in Hz.
Denoted by :math:`f_c` with unit :math:`\\left[ f_c \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in the literature.
If not specified, i.e. :py:obj:`None`, the operated device's default carrier frequency will be assumed
during signal processing.
Raises:
ValueError: If the carrier frequency is smaller than zero.
"""
return self.__carrier_frequency
@carrier_frequency.setter
def carrier_frequency(self, value: float | None) -> None:
if value is None:
self.__carrier_frequency = None
return
if value < 0.0:
raise ValueError("Carrier frequency must be greater or equal to zero")
self.__carrier_frequency = value
[docs]
class Receiver(Generic[ReceptionType], MixingOperator, RandomNode):
"""Operator receiving from a device."""
__reference: Device | None
__selected_receive_ports: Sequence[int]
__receive_callbacks: Hookable[ReceptionType]
def __init__(
self,
seed: int | None = None,
reference: Device | None = None,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
) -> None:
"""
Args:
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
reference (Device, optional):
Reference device to which the channel shall be estimated.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available antenna ports will be considered.
carrier_frequency (float, optional):
Central frequency of the mixed signal in radio-frequency transmission band.
If not specified, the operated device's default carrier frequency will be assumed during signal processing.
"""
# Initialize base classes
RandomNode.__init__(self, seed=seed)
MixingOperator.__init__(self, carrier_frequency)
# Initialize class attributes
self.__receive_callbacks = Hookable()
self.reference = reference
self.selected_receive_ports = selected_receive_ports
[docs]
def add_receive_callback(
self, callback: Callable[[ReceptionType], None]
) -> Hook[ReceptionType]:
"""Add a callback to be called after processing the receive DSP algorithm.
Args:
callback (Callable[[ReceptionType], None]):
Function to be called after processing the receive DSP algorithm.
The receive output is passed as the only argument.
Returns: Hook to be used for removal.
"""
return self.__receive_callbacks.add_callback(callback)
@property
def reference(self) -> Device | None:
"""Reference transmitter for this receiver.
Returns:
A handle to the referenced device.
:py:obj:`None` if the device was not specified.
"""
return self.__reference
@reference.setter
def reference(self, value: Device | None) -> None:
self.__reference = value
@property
def selected_receive_ports(self) -> Sequence[int] | None:
"""Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If :py:obj:`None`, all available antenna ports will be considered.
"""
return self.__selected_receive_ports
@selected_receive_ports.setter
def selected_receive_ports(self, value: Sequence[int] | None) -> None:
self.__selected_receive_ports = value
[docs]
def receive(self, signal: Signal, state: DeviceState, notify: bool = True) -> ReceptionType:
"""Process a received signal by the receiver.
Wrapper around the abstract :meth:`_receive<Receiver._receive>` method.
Args:
signal (Signal):
Model of the signal samples to be processed by the DSP algorithm.
state (DeviceState):
State of the receiving device to be conmsidered by the DSP algorithm.
notify (bool, optional):
Notify the receiver's callbacks.
Enabled by default.
Returns: Information received by this operator.
Raises:
ValueError: If the number of signal streams does not match the number of receive ports.
"""
# Generate the state for this DSP algorithm's assigned ports
receive_state = state.receive_state(self.selected_receive_ports)
# Assert the number of signal streams
if signal.num_streams != receive_state.num_digital_receive_ports:
raise ValueError(
f"Number of signal streams does not match the number of receive ports ({signal.num_streams} != {receive_state.num_digital_receive_ports})"
)
# Run the receive DSP routine
reception = self._receive(signal, receive_state)
# Notify the receiver's callbacks
if notify:
self.__receive_callbacks.notify(reception)
# Return received information
return reception
@abstractmethod
def _receive(self, signal: Signal, state: ReceiveState) -> ReceptionType:
"""Process a received signal by the receiver.
Subroutine of the public :meth:`receive<hermespy.core.device.Receiver.receive>`
method that performs the pipeline-specific receive processing
and consolidates the inferred information into a single
:class:`Reception<hermespy.core.device.Reception>` object.
Args:
signal (Signal): Multi-stream signal model to be processed.
state (ReceiveState): Device state to be considered during reception.
Returns:
Information inferred from the received signal.
"""
... # pragma: no cover
def recall_reception(self, group: Group, notify: bool = True) -> ReceptionType:
"""Recall a specific reception from a serialization.
Internally calls the abstract method
:meth:`_recall_reception<Receiver._recall_reception>`.
Caches the reception if the respective flag is enabled.
Args:
group (Group):
HDF group containing the reception.
notify (bool, optional):
Notify the receiver's callbacks.
Enabled by default.
Returns: The recalled reception.
:meta private:
"""
# Recall the reception
recalled_reception = self._recall_reception(group)
# Notify the receiver's callbacks
if notify:
self.__receive_callbacks.notify(recalled_reception)
return recalled_reception
@abstractmethod
def _recall_reception(self, group: Group) -> ReceptionType:
"""Recall a specific reception from a serialization.
Args:
group (Group):
HDF group containing the reception.
Returns: The recalled reception.
"""
... # pragma: no cover
@property
@abstractmethod
def power(self) -> float:
"""Expected power of the received signal in Watts.
.. note::
Applies only to the signal-carrying parts of the transmission,
silent parts shuch as guard intervals should not be considered.
"""
... # pragma: no cover
class OperatorSlot(Generic[OperatorType], Sequence[OperatorType]):
"""Slot list for operators of a single device."""
__device: Device # Device this operator belongs to
__operators: list[OperatorType] # List of registered operators
def __init__(self, device: Device) -> None:
"""
Args:
device (Device):
Device this slot belongs to.
"""
self.__device = device
self.__operators = []
@property
def device(self) -> Device:
"""Device this operator slot belongs to.
Returns:
Device: Handle to the device.
"""
return self.__device
def operator_index(self, operator: OperatorType) -> int:
"""Index of an operator within this slot.
Returns:
index (int): The `operator`'s index.
Raises:
ValueError: If the `operator` is not registered at this slot.
"""
return self.__operators.index(operator)
def add(self, operator: OperatorType) -> None:
"""Add a new operator to this slot.
Args:
operator (OperatorType):
Operator to be added to this slot.
Raises:
RuntimeError: If the `operator` is already registered at this slot.
"""
# Do nothing if the operator is already registered
if operator in self.__operators:
return
self.__operators.append(operator)
def remove(self, operator: OperatorType) -> None:
"""Remove an operator from this slot.
Args:
operator (OperatorType):
Handle to the operator to be removed.
"""
if operator in self.__operators:
self.__operators.remove(operator)
def registered(self, operator: OperatorType) -> bool:
"""Check if an operator is registered at this slot.
Returns:
bool: Boolean indicating the registration status.
"""
return operator in self.__operators
@property
def num_operators(self) -> int:
"""Number of operators on this slot.
Returns:
int: Number of operators.
"""
return len(self.__operators)
@property
def max_sampling_rate(self) -> float:
"""Maximum requested sampling rate of all operators.
Returns:
The maximum sampling rate in Hz.
"""
sampling_rate = 0.0
for operator in self.__operators:
sampling_rate = max(sampling_rate, operator.sampling_rate)
return sampling_rate
@property
def min_frame_duration(self) -> float:
"""The mininum frame duration requried to process at least a single frame for each operator.
Returns:
The minimum frame duration in seconds.
"""
frame_duration = 0.0
for operator in self.__operators:
frame_duration = max(frame_duration, operator.frame_duration)
return frame_duration
@property
def min_num_samples_per_frame(self) -> int:
"""The number of samples required to collect at last a single frame for each operator.
Returns:
Number of samples.
"""
sampling_rate = self.max_sampling_rate
frame_duration = self.min_frame_duration
return ceil(frame_duration * sampling_rate)
@overload
def __getitem__(self, item: int) -> OperatorType: ... # pragma: no cover
@overload
def __getitem__(self, item: slice) -> Sequence[OperatorType]: ... # pragma: no cover
def __getitem__(self, item: int | slice) -> OperatorType | Sequence[OperatorType]:
return self.__operators[item]
def __iter__(self) -> Iterator[OperatorType]:
"""Iterating over operator slots returns the iterator over the slots."""
return self.__operators.__iter__()
def __contains__(self, operator: object) -> bool:
"""Contains is just a convenient mask for registered.
Returns: Registration indicator.
"""
return operator in self.__operators
def __len__(self) -> int:
return self.num_operators
TransmissionType = TypeVar("TransmissionType", bound="Transmission")
"""Type variable of a :class:`Transmission`."""
[docs]
class Transmitter(Generic[TransmissionType], MixingOperator, RandomNode):
"""Operator transmitting over a device."""
__transmit_callbacks: Hookable[TransmissionType]
__selected_transmit_ports: Sequence[int] | None
def __init__(
self,
seed: int | None = None,
selected_transmit_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
) -> None:
"""
Args:
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
carrier_frequency (float, optional):
Central frequency of the mixed signal in radio-frequency transmission band.
If not specified, the operated device's default carrier frequency will be assumed during signal processing.
"""
# Initialize operator base class
RandomNode.__init__(self, seed=seed)
MixingOperator.__init__(self, carrier_frequency)
# Initialize class attributes
self.__transmit_callbacks = Hookable()
self.__selected_transmit_ports = selected_transmit_ports
[docs]
def add_transmit_callback(
self, callback: Callable[[TransmissionType], None]
) -> Hook[TransmissionType]:
"""Add a callback to be called after processing the transmit DSP algorithm.
Args:
callback (Callable[[TransmissionType], None]):
Function to be called after processing the transmit DSP algorithm.
The transmit output is passed as the only argument.
Returns: Hook to be used for removal.
"""
return self.__transmit_callbacks.add_callback(callback)
[docs]
def transmit(
self, state: DeviceState, duration: float = 0.0, notify: bool = True
) -> TransmissionType:
"""Transmit a signal.
Registers the signal samples to be transmitted by the underlying device.
Wrapper around the abstract :meth:`_transmit<Transmitter._transmit>` method.
Args:
state (DeviceState, optional):
State of the device at the time of transmission.
If not specified, the device's current state will be queried by calling :meth:`hermespy.core.device.Device.state`.
duration (float, optional):
Duration of the transmitted signal.
If not specified, i.e. zero, the duration will be inferred by the transmitter.
notify (bool, optional):
Notify the transmitter's callbacks about the generated transmission.
Returns:
Information generated while transmitting over this operator.
Raises:
FloatingError: If the transmitter is currently considered floating.
"""
# Generate the state for this DSP algorithm's assigned ports
# Run the transmit DSP routine
transmission = self._transmit(state.transmit_state(self.selected_transmit_ports), duration)
# Notify the registered callbacks if the respective flag is enabled
if notify:
self.__transmit_callbacks.notify(transmission)
return transmission
@abstractmethod
def _transmit(self, state: TransmitState, duration: float) -> TransmissionType:
"""Generate information to be transmitted.
Subroutine of the public :meth:`transmit<Transmitter.transmit>` method that performs the pipeline-specific transmit-processing
and consolidates the generated information into a single :class:`Transmission` object.
Args:
state (TransmitState):
State of the device at the time of transmission.
duration (float, optional):
Duration of the transmitted signal in seconds.
If not specified, the duration of a single frame will be assumed.
Returns: Information to be transmitted.
"""
... # pragma: no cover
def recall_transmission(self, group: Group, notify: bool = True) -> TransmissionType:
"""Recall a specific transmission from a serialization.
Internally calls the abstract method :meth:`Transmitter._recall_transmission`.
Caches the transmission if the respective flag is enabled.
Args:
group (Group):
HDF group containing the transmission.
notify (bool, optional):
Notify the transmitter's callbacks about the generated transmission.
Enabled by default.
Returns: The recalled transmission
:meta private:
"""
# Recall the transmission from HDF
recalled_transmission = self._recall_transmission(group)
# Notify the registered callbacks if the respective flag is enabled
if notify:
self.__transmit_callbacks.notify(recalled_transmission)
return recalled_transmission
@abstractmethod
def _recall_transmission(self, group: Group) -> TransmissionType:
"""Recall a specific transmission from a serialization.
Args:
group (Group):
HDF group containing the transmission.
Returns: The recalled transmission
"""
... # pragma: no cover
@property
@abstractmethod
def power(self) -> float:
"""Expected power of the transmitted signal in Watts.
.. note::
Applies only to the signal-carrying parts of the transmission,
silent parts shuch as guard intervals should not be considered.
"""
... # pragma: no cover
@property
def selected_transmit_ports(self) -> Sequence[int] | None:
"""Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If :py:obj:`None`, all available transmit ports will be considered.
"""
return self.__selected_transmit_ports
@selected_transmit_ports.setter
def selected_transmit_ports(self, value: Sequence[int] | None) -> None:
self.__selected_transmit_ports = value
class TransmitterSlot(OperatorSlot[Transmitter]):
"""Slot for transmitting operators within devices."""
... # pragma: no cover
class ReceiverSlot(OperatorSlot[Receiver]):
"""Slot for receiving operators within devices."""
... # pragma: no cover
class UnsupportedSlot(OperatorSlot):
"""Slot for unsupported operations within devices."""
def add(self, operator: Operator) -> None:
raise RuntimeError("Slot not supported by this device")
[docs]
class Device(ABC, Generic[DST], Transformable, RandomNode, Serializable):
"""Physical device representation within HermesPy.
It acts as the basis for all transmissions and receptions of sampled electromagnetic signals.
"""
transmitters: TransmitterSlot
"""Transmitters broadcasting signals over this device."""
receivers: ReceiverSlot
"""Receivers capturing signals from this device"""
__transmit_coding: TransmitSignalCoding # Codings applied to transmitted samples before upload
__receive_coding: (
ReceiveSignalCoding # Codings applied to received samples directly after download
)
__power: float # Average power of the transmitted signals
__output_callbacks: Hookable[DeviceOutput] # Callbacks notified about generated outputs
__input_callbacks: Hookable[ProcessedDeviceInput] # Callbacks notified about processed inputs
def __init__(
self, power: float = 1.0, pose: Transformation | None = None, seed: int | None = None
) -> None:
"""
Args:
power (float, optional):
Average power of the transmitted signals in Watts.
1.0 Watt by default.
pose (Transformation, optional):
Pose of the device with respect to its scenario coordinate system origin.
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
RandomNode.__init__(self, seed=seed)
Transformable.__init__(self, pose=pose)
# Initalize device attributes and properties
self.transmitters = TransmitterSlot(self)
self.receivers = ReceiverSlot(self)
self.__transmit_coding = TransmitSignalCoding()
self.__receive_coding = ReceiveSignalCoding()
self.__input_callbacks = Hookable()
self.__output_callbacks = Hookable()
self.power = power
[docs]
@abstractmethod
def state(self) -> DST:
"""Query the immutable physical state of the device.
Returns: The physical device state.
"""
... # pragma: no cover
@property
@abstractmethod
def antennas(self) -> AntennaArray:
"""Model of the device's antenna array."""
... # pragma: no cover
@property
def transmit_coding(self) -> TransmitSignalCoding:
"""Digital coding applied to transmitted samples before digital-to-analog conversion."""
return self.__transmit_coding
@property
def receive_coding(self) -> ReceiveSignalCoding:
"""Digital coding applied to received samples after analog-to-digital conversion."""
return self.__receive_coding
@property
def output_callbacks(self) -> Hookable[DeviceOutput]:
"""Callbacks notified about generated outputs."""
return self.__output_callbacks
@property
def input_callbacks(self) -> Hookable[ProcessedDeviceInput]:
"""Callbacks notified about processed inputs."""
return self.__input_callbacks
@property
def num_digital_transmit_ports(self) -> int:
"""Number of available digital signal stream ports for transmission.
If no transmit coding was configured, this value is equal to the number of transmit antenna ports.
"""
return self.__transmit_coding.num_transmit_input_streams(self.num_transmit_antenna_ports)
@property
def num_digital_receive_ports(self) -> int:
"""Number of available digital signal stream ports for reception.
If no receive coding was configured, this value is equal to the number of receive antenna ports.
"""
return self.__receive_coding.num_receive_output_streams(self.num_receive_antenna_ports)
@property
def num_transmit_antenna_ports(self) -> int:
"""Number of available transmit antenna ports.
Shorthand to :attr:`antennas'<.antennas>` :attr:`num_transmit_ports<AntennaArray.num_transmit_ports>`.
"""
return self.antennas.num_transmit_ports
@property
def num_receive_antenna_ports(self) -> int:
"""Number of available receive antenna ports.
Short hand to :attr:`antennas'<.antennas>` :attr:`num_receive_ports<AntennaArray.num_receive_ports>`.
"""
return self.antennas.num_receive_ports
@property
def num_transmit_antennas(self) -> int:
"""Number of available transmit antennas.
Shorthand to :attr:`antennas'<antennas>` :meth:`num_transmit_antennas<AntennaArrayBase.num_transmit_antennas>`.
"""
return self.antennas.num_transmit_antennas
@property
def num_receive_antennas(self) -> int:
"""Number of available receive antennas.
Shorthand to :attr:`antennas'<antennas>` :meth:`num_receive_antennas<AntennaArrayBase.num_receive_antennas>`.
"""
return self.antennas.num_receive_antennas
@property
def power(self) -> float:
"""Average power of the transmitted signal signal.
Returns:
power (float): Power in Watt.
Raises:
ValueError: If `value` is smaller than zero.
"""
return self.__power
@power.setter
def power(self, value: float) -> None:
"""Set the average power of the transmitted signal."""
if value < 0.0:
raise ValueError("Average signal power must be greater or equal to zero")
self.__power = value
@property
def snr(self) -> float:
"""Signal to noise ratio of the received signal.
Returns: Linear signal to noise ratio
"""
# By default, we have no knowledge about the snr and assum no noise
return float("inf")
@property
def num_antennas(self) -> int:
"""Number of antennas within this device's topology.
Returns:
int: Number of antennas, greater or equal to one.
"""
return self.antennas.num_antennas
@property
def max_frame_duration(self) -> float:
"""Maximum frame duration transmitted by this device.
Returns:
max_duration (float): Maximum duration in seconds.
"""
max_duration = 0.0
for operator in chain(self.transmitters, self.receivers):
max_duration = max(max_duration, operator.frame_duration) # type: ignore
return max_duration
@property
@abstractmethod
def carrier_frequency(self) -> float:
"""Central frequency of the device's emissions in the RF-band.
Returns:
frequency (float):
Carrier frequency in Hz.
Raises:
ValueError: On negative carrier frequencies.
"""
... # pragma: no cover
@property
def wavelength(self) -> float:
"""Central wavelength of the device's emmissions in the RF-band.
Returns:
wavelength (float):
Wavelength in m.
Raises:
ValueError:
On wavelengths smaller or equal to zero.
"""
return speed_of_light / self.carrier_frequency
@property
@abstractmethod
def sampling_rate(self) -> float:
"""Sampling rate at which the device's analog-to-digital converters operate.
Returns:
sampling_rate (float): Sampling rate in Hz.
Raises:
ValueError: If the sampling rate is not greater than zero.
"""
... # pragma: no cover
[docs]
def add_dsp(self, dsp: Transmitter | Receiver) -> None:
"""Add a DSP algorithm to this device.
Args:
dsp (Transmitter | Receiver):
DSP algorithm to be added to the device.
If the DSP algorithm is a transmitter, it will be added to the transmit layer.
If the DSP algorithm is a receiver, it will be added to the receive layer.
If the DSP is both, it will be added to both layers.
"""
if isinstance(dsp, Transmitter):
self.transmitters.add(dsp)
if isinstance(dsp, Receiver):
self.receivers.add(dsp)
[docs]
def transmit_operators(
self, state: DST | None = None, notify: bool = True
) -> list[Transmission]:
"""Generate transmitted information for all registered operators.
Calls :meth:`Transmitter.transmit` for each transmit operator.
Args:
state (DST, optional):
Device state to be used for the transmission.
If not provided, query the current device state by calling :meth:`state`.
notify (bool, optional):
Notify the transmitter's callbacks about the generated transmission.
Enabled by default.
Returns: List of operator transmisisons.
"""
_state = self.state() if state is None else state
return [transmitter.transmit(_state, 0.0, notify) for transmitter in self.transmitters]
[docs]
def generate_output(
self,
operator_transmissions: Sequence[Transmission],
state: DST | None = None,
resample: bool = True,
) -> DeviceOutput:
"""Generate the device's output.
Args:
operator_transmissions (Sequence[Transmissions]):
List of operator transmissions from which to generate the output.
state (DST, optional):
Device state to be used for the output generation.
If not provided, query the current device state
resample (bool, optional):
Resample the output signal to the device's sampling rate.
Enabled by default.
Returns: The device's output.
Raises:
RuntimeError: If the transmit coding is incompatible with the number of transmit antenna ports.
"""
_state = self.state() if state is None else state
# Ensure that the precoded signal will match the number of antenna ports
if self.num_digital_transmit_ports != self.num_transmit_antenna_ports:
raise RuntimeError(
f"Number of streams after transmit coding does not match the number of transmit antenna ports ({self.num_digital_transmit_ports} != {self.num_transmit_antenna_ports})"
)
# Query the intended transmission ports of each operator
operator_streams = [o.selected_transmit_ports for o in self.transmitters]
# Superimpose the operator transmissions to the device's RF configuration
superimposed_signal = Signal.Empty(
_state.sampling_rate,
self.num_digital_transmit_ports,
carrier_frequency=_state.carrier_frequency,
)
for transmission, indices in zip(operator_transmissions, operator_streams):
if transmission is not None:
superimposed_signal.superimpose(
transmission.signal, resample=resample, stream_indices=indices
)
# Apply the transmit coding
encoded_signal = self.transmit_coding.encode_streams(
superimposed_signal, _state.transmit_state(list(range(self.num_transmit_antennas)))
)
# Generate the output
output = DeviceOutput(encoded_signal)
# Notify the output callbacks
self.__output_callbacks.notify(output)
return output
[docs]
def transmit(self, state: DST | None = None, notify: bool = True) -> DeviceTransmission:
"""Transmit over this device.
Args:
state (DST, optional):
Device state to be used for the transmission.
If not provided, query the current device state by calling :meth:`state`.
notify (bool, optional):
Notify the transmitter's callbacks about the generated transmission.
Enabled by default.
Returns: Information transmitted by this device.
"""
_state = self.state() if state is None else state
operator_transmissions = self.transmit_operators(_state, notify)
device_output = self.generate_output(operator_transmissions, _state)
return DeviceTransmission.From_Output(device_output, operator_transmissions)
[docs]
def recall_transmission(self, group: Group) -> DeviceTransmission:
"""Recall a specific transmission from a HDF5 serialization.
Args:
group (Group):
HDF group containing the transmission.
Returns: The recalled transmission.
"""
# Recall the specific operator transmissions
return DeviceTransmission.from_HDF(group, list(self.transmitters))
[docs]
def receive_operators(
self,
operator_inputs: ProcessedDeviceInput | Sequence[Signal],
state: DST | None = None,
notify: bool = True,
) -> list[Reception]:
"""Receive over the registered operators.
Args:
operator_inputs (ProcessedDeviceInput | Sequence[Signal]):
The signal samples to be processed by the device's operators.
If a :class:`ProcessedDeviceInput` is provided, the operator inputs will be extracted from it.
Otherwise, the operator inputs are assumed to be provided directly.
state (DST, optional):
Assumed state of the device during reception.
If not provided, the current device state will be queried by calling :meth:`state`.
notfiy (bool, optional):
Notify all registered callbacks within the involved DSP algorithms.
Enabled by default.
Returns: List of information generated by receiving over the device's operators.
Raises:
ValueError: If the number of operator inputs does not match the number of receive operators.
"""
_operator_inputs = (
operator_inputs.operator_inputs
if isinstance(operator_inputs, ProcessedDeviceInput)
else operator_inputs
)
_state = self.state() if state is None else state
if len(_operator_inputs) != self.receivers.num_operators:
raise ValueError(
f"Number of operator inputs ({len(_operator_inputs)}) does not match the number of receive operators ({self.receivers.num_operators})"
)
# Generate receive information
receptions: list[Reception] = []
for operator, signal in zip(self.receivers, _operator_inputs):
receptions.append(operator.receive(signal, _state, notify))
return receptions
[docs]
def receive(
self,
impinging_signals: DeviceInput | Signal | Sequence[Signal],
state: DST | None = None,
notify: bool = True,
) -> DeviceReception:
"""Receive over this device.
Internally calls :meth:`Device.process_input` and :meth:`Device.receive_operators`.
Args:
impinging_signals (DeviceInput | Signal | Sequence[Signal]):
The samples to be processed by the device.
state (DST, optional):
Device state to be used for the processing.
If not provided, query the current device state by calling :meth:`state`.
notify (bool, optional):
Notify all registered callbacks within the involved DSP algorithms.
Enabled by default.
Returns: The received device information.
"""
_state = self.state() if state is None else state
# Process input
processed_input = self.process_input(impinging_signals, _state)
# Generate receptions
receptions = self.receive_operators(processed_input.operator_inputs, _state, notify)
# Generate device reception
return DeviceReception.From_ProcessedDeviceInput(processed_input, receptions)
[docs]
def recall_reception(self, group: Group) -> DeviceReception:
"""Recall a specific reception from a HDF5 serialization.
Args:
group (Group):
HDF group containing the reception.
Returns: The recalled reception.
"""
return DeviceReception.Recall(group, self)
@classmethod
def to_yaml(cls: Type[Device], representer: SafeRepresenter, node: Device) -> MappingNode:
return node._mapping_serialization_wrapper(
representer,
None,
{"transmitters": list(node.transmitters), "receivers": list(node.receivers)},
)
@classmethod
def from_yaml(cls: Type[Device], constructor: SafeConstructor, node: Node) -> Device:
# Abort if the node is scalar
if isinstance(node, ScalarNode):
return cls()
device_mapping: dict = constructor.construct_mapping(node, deep=True)
# Pop transmitter and receiver lists
transmitters = device_mapping.pop("transmitters", list())
receivers = device_mapping.pop("receivers", list())
# Construct the device
device = cls.InitializationWrapper(device_mapping)
# Add transmitters and receivers
for transmitter in transmitters:
device.transmitters.add(transmitter)
for receiver in receivers:
device.receivers.add(receiver)
return device