Source code for hermespy.core.device

# -*- coding: utf-8 -*-
"""
===============
Device Modeling
===============

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   classDiagram-v2

   direction RL

   class Device {

       topology
       carrier_frequency
       transmit()
       receive()
   }

   class SimulatedDevice {

       position
       orientation
       velocity
       acceleration
   }

   class HardwareDevice {

       trigger()
   }

   class Operator {

       transmit()
       receive()
   }

   class Modem {

       tx_bits
       rx_bits
       ...
       waveform
       transmit()
       receive()
   }

   class Radar {

       waveform
       ...
       target_estimates
       transmit()
       receive()
   }

   Operator <|-- Modem
   Operator <|-- Radar
   Device *-- Operator
   SimulatedDevice <|-- Device
   HardwareDevice <|-- Device


.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   channel{Real World}

   subgraph devicea[HardwareDevice]

       direction TB
       deviceatx>Tx Slot] --> deviceabinding[Hardware Binding] --> devicearx>Rx Slot]

   end

   subgraph deviceb[HardwareDevice]

       direction TB
       devicebtx>Tx Slot] --> devicebbinding[Hardware Binding] --> devicebrx>Rx Slot]
   end

   deviceabinding <--> channel
   devicebbinding <--> channel

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   channel{Channel Modeling}

   subgraph devicea[SimulatedDevice]

       direction TB
       deviceatx>Tx Slot]
       devicearx>Rx Slot]

   end

   subgraph deviceb[SimulatedDevice]

       direction TB
       devicebtx>Tx Slot]
       devicebrx>Rx Slot]
   end

   deviceatx --> channel --> devicearx
   devicebtx --> channel --> devicebrx

"""

from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from itertools import chain
from math import ceil
from typing import Callable, Generic, Iterator, overload, Type, TypeVar

from h5py import Group
from ruamel.yaml import MappingNode, Node, SafeConstructor, SafeRepresenter, ScalarNode
from scipy.constants import speed_of_light

from .antennas import AntennaArray
from .factory import HDFSerializable, Serializable
from .hooks import Hook, Hookable
from .precoding import TransmitSignalCoding, ReceiveSignalCoding
from .random_node import RandomNode
from .signal_model import Signal
from .state import DeviceState, DST, ReceiveState, TransmitState
from .transformation import Transformable, Transformation

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


class FloatingError(RuntimeError):
    """Exception raised if an operation fails due to a currently being considered floating."""

    ...  # pragma: no cover


ORT = TypeVar("ORT", bound="OperationResult")
"""Type of OperationResult"""


OperatorType = TypeVar("OperatorType", bound="Operator")
"""Type of operator."""


DeviceType = TypeVar("DeviceType", bound="Device")
"""Type of device."""


[docs] class OperationResult(HDFSerializable): """Base class for outputs of device operators.""" signal: Signal """Base-band model of the processed signal.""" def __init__(self, signal: Signal) -> None: """ Args: signal (signal): Base-band waveform model. """ self.signal = signal @classmethod def from_HDF(cls: Type[ORT], group: Group) -> ORT: signal = Signal.from_HDF(group["signal"]) return cls(signal=signal) def to_HDF(self, group: Group) -> None: self.signal.to_HDF(self._create_group(group, "signal"))
[docs] class Transmission(OperationResult): """Information generated by transmitting over a transmit operator.""" ... # pragma: no cover
[docs] class Reception(OperationResult): """Information generated by receiving over a receive operator.""" ... # pragma: no cover
ReceptionType = TypeVar("ReceptionType", bound=Reception) """Type variable of a :class:`Reception`.""" SlotType = TypeVar("SlotType", bound="OperatorSlot") """Type of slot.""" class Operator(ABC): """Base class for operators of devices. In HermesPy, operators may configure devices, broadcast signals over them or capture signals from them. Each operator is attached to a single device instance it operates on. """ @property @abstractmethod def frame_duration(self) -> float: """Duration of a single sample frame in seconds. Denoted as :math:`T_{\\mathrm{F}}` of unit :math:`\\left[ T_{\\mathrm{F}} \\right] = \\mathrm{s}` in literature. """ ... # pragma: no cover @property @abstractmethod def sampling_rate(self) -> float: """The operator's preferred sampling rate in Hz. Denoted as :math:`f_{\\mathrm{S}}` of unit :math:`\\left[ f_{\\mathrm{S}} \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in literature. """ ... # pragma: no cover
[docs] class DeviceOutput(HDFSerializable): """Base class for the output of a device.""" __mixed_signal: Signal | None def __init__(self, mixed_signal: Signal) -> None: """ Args: mixed_signal (Signal): Mixed signal emerging from the device. """ self.__mixed_signal = mixed_signal @property def sampling_rate(self) -> float: """Sampling rate at which the device operates. Returns: Sampling rate in Hz. """ return self.__mixed_signal.sampling_rate @property def num_antennas(self) -> int: """Number of transmitting device antennas. Returns: Number of antennas. """ return self.__mixed_signal.num_streams @property def carrier_frequency(self) -> float: """Carrier frequency at which the device operates. Returns: Carrier frequency in Hz. """ return self.__mixed_signal.carrier_frequency @property def emerging_signals(self) -> Sequence[Signal]: """Models of electromagnetic signals generated by the device. Returns: List of signal models. """ return [self.__mixed_signal] @property def num_emerging_signals(self) -> int: """Number of emerging signal models. Returns: Count of models. """ return len(self.emerging_signals) @property def mixed_signal(self) -> Signal: """Combined signal of the device's transmissions. Returns: Signal model of the device's electromagnetic transmission. """ return self.__mixed_signal @classmethod def from_HDF(cls: Type[DeviceOutput], group: Group) -> DeviceOutput: # Recall serialized mixed signal group mixed_signal = Signal.from_HDF(group["mixed_signal"]) # Initialize object return cls(mixed_signal) def to_HDF(self, group: Group) -> None: # Serialize groups self.mixed_signal.to_HDF(self._create_group(group, "mixed_signal"))
DTT = TypeVar("DTT", bound="DeviceTransmission") """Type of device transmission."""
[docs] class DeviceTransmission(DeviceOutput): """Information generated by transmitting over a device.""" __operator_transmissions: Sequence[Transmission] def __init__( self, operator_transmissions: Sequence[Transmission], mixed_signal: Signal ) -> None: """ Args: operator_transmissions (Sequence[Transmission]): List of information generated by transmit operators. mixed_signal (Signal): Mixed signal transmitted by the device. """ # Initialize base class DeviceOutput.__init__(self, mixed_signal) # Initialze class attributes self.__operator_transmissions = operator_transmissions
[docs] @classmethod def From_Output( cls: Type[DeviceTransmission], output: DeviceOutput, operator_transmissions: Sequence[Transmission], ) -> DeviceTransmission: """Initialize a device transmission from a device output. Args: output (DeviceOutput): Device output object. operator_transmissions (Sequence[Transmission]): List of information generated by transmit operators. """ return cls(operator_transmissions, output.mixed_signal)
@property def operator_transmissions(self) -> Sequence[Transmission]: """Information transmitted by device operators. Returns: Operator information. """ return self.__operator_transmissions @property def num_operator_transmissions(self) -> int: """Number of operator transmissions.""" return len(self.__operator_transmissions)
[docs] @classmethod def from_HDF( cls: Type[DeviceTransmission], group: Group, operators: Sequence[Transmitter] | None = None ) -> DeviceTransmission: """Recall a device transmission from a serialization. Args: group (Group): HDF5 group containing the serialized device transmission. operators (Sequence[Transmitter], optional): List of device transmitters to recall the specific transmissions. If not specified, the transmissions are recalled as their base class. """ # Recall base class device_output = DeviceOutput.from_HDF(group) # Recall attributes num_transmissions = group.attrs.get("num_transmissions", 1) # Recall transmissions if operators is None: transmissions = [ Transmission.from_HDF(group[f"transmission_{t:02d}"]) for t in range(num_transmissions) ] else: transmissions = [ operator.recall_transmission(group[f"transmission_{t:02d}"]) for t, operator in zip(range(num_transmissions), operators) ] # Initialize object return cls.From_Output(device_output, transmissions)
[docs] @classmethod def Recall(cls: Type[DeviceTransmission], group: Group, device: Device) -> DeviceTransmission: # Recall base class device_output = DeviceOutput.from_HDF(group) # Recall attributes num_transmissions = group.attrs.get("num_transmissions", 1) transmissions = [ transmitter.recall_transmission(group[f"transmission_{t:02d}"]) for t, transmitter in zip(range(num_transmissions), device.transmitters) ] # Initialize object return cls.From_Output(device_output, transmissions)
def to_HDF(self, group: Group) -> None: # Serialize base class DeviceOutput.to_HDF(self, group) # Serialize attributes group.attrs["num_transmissions"] = self.num_operator_transmissions # Serialize groups for t, transmission in enumerate(self.operator_transmissions): transmission.to_HDF(self._create_group(group, f"transmission_{t:02d}"))
[docs] class DeviceInput(HDFSerializable): """Receive information required by devices.""" __impinging_signals: Sequence[Signal] def __init__(self, impinging_signals: Signal | Sequence[Signal]) -> None: """ Args: impinging_signals (Union[Signal, Sequence[Signal]]): Signals to be processed by the device. """ impinging_signals = ( [impinging_signals] if isinstance(impinging_signals, Signal) else impinging_signals ) self.__impinging_signals = impinging_signals @property def impinging_signals(self) -> Sequence[Signal]: """Signals to be processed by the device. Returns: List of signal models. """ return self.__impinging_signals @property def num_impinging_signals(self) -> int: """Number of signals impinging onto the device. Returns: Signal model count. """ return len(self.__impinging_signals) @classmethod def from_HDF(cls: Type[DeviceInput], group: Group) -> DeviceInput: num_impinging_signals = group.attrs.get("num_impinging_signals", 1) impinging_signals = [ Signal.from_HDF(group[f"impinging_signal_{s:02d}"]) for s in range(num_impinging_signals) ] return cls(impinging_signals) def to_HDF(self, group: Group) -> None: # Serialize groups for s, signal in enumerate(self.impinging_signals): signal.to_HDF(self._create_group(group, f"impinging_signal_{s:02d}")) # Serialize attributes group.attrs["num_impinging_signals"] = self.num_impinging_signals
[docs] class ProcessedDeviceInput(DeviceInput): """Information generated by receiving over a device.""" __operator_inputs: Sequence[Signal] def __init__( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], operator_inputs: Sequence[Signal], ) -> None: """ Args: impinging_signals (DeviceInput | Signal | Sequence[Signal]): Numpy vector containing lists of signals impinging onto the device. operator_inputs (Sequence[Signal]): Signal models to be processed by receive DSP algorithms. """ if isinstance(impinging_signals, DeviceInput): impinging_signals = impinging_signals.impinging_signals # Initialize base class DeviceInput.__init__(self, impinging_signals) # Initialize attributes self.__operator_inputs = operator_inputs @property def operator_inputs(self) -> Sequence[Signal]: return self.__operator_inputs @property def num_operator_inputs(self) -> int: """Number of operator inputs. Returns: Input count. """ return len(self.__operator_inputs) @classmethod def from_HDF(cls: Type[ProcessedDeviceInput], group: Group) -> ProcessedDeviceInput: # Recall base class device_input = DeviceInput.from_HDF(group) # Recall individual detected frames num_inputs = group.attrs.get("num_operator_inputs", 0) operator_inputs = [ Signal.from_HDF(group[f"operator_input_{i:02d}"]) for i in range(num_inputs) ] return cls(device_input, operator_inputs) def to_HDF(self, group: Group) -> None: # Serialize base class DeviceInput.to_HDF(self, group) # Serialize groups for i, input_signal in enumerate(self.operator_inputs): input_signal.to_HDF(self._create_group(group, f"operator_input_{i:02d}")) # Serialize attributes group.attrs["num_operator_inputs"] = self.num_operator_inputs
DRT = TypeVar("DRT", bound="DeviceReception") """Type of device reception."""
[docs] class DeviceReception(ProcessedDeviceInput): """Information generated by receiving over a device and its operators.""" __operator_receptions: Sequence[Reception] def __init__( self, impinging_signals: Signal | Sequence[Signal], operator_inputs: Sequence[Signal], operator_receptions: Sequence[Reception], ) -> None: """ Args: impinging_signals (Signal | Sequence[Signal]): Numpy vector containing lists of signals impinging onto the device. operator_inputs (Sequence[Signal]): Signal models to be processed by receive DSP algorithms. operator_separation (bool): Is the operator separation flag enabled? operator_receptions (Sequence[Reception]): Information inferred from receive operators. """ # Initialize base class ProcessedDeviceInput.__init__(self, impinging_signals, operator_inputs) # Initialize attributes self.__operator_receptions = operator_receptions
[docs] @classmethod def From_ProcessedDeviceInput( cls: Type[DRT], device_input: ProcessedDeviceInput, operator_receptions: Sequence[Reception] ) -> DRT: """Initialize an instance from a processed device input. Args: device_input (ProcessedDeviceInput): The processed device output. operator_receptions (Sequence[OperatorReceptions]): List of operator receptions. Returns: The initialized object. """ return cls( device_input.impinging_signals, device_input.operator_inputs, operator_receptions )
@property def operator_receptions(self) -> Sequence[Reception]: """Information transmitted by device operators. Returns: Operator information. """ return self.__operator_receptions @property def num_operator_receptions(self) -> int: """Number of operator receptions. Returns: Number of receptions. """ return len(self.__operator_receptions) @classmethod def from_HDF(cls: Type[DRT], group: Group, operators: Sequence[Receiver] | None = None) -> DRT: # Recall base class device_input = ProcessedDeviceInput.from_HDF(group) # Recall individual operator receptions num_receptions = group.attrs.get("num_operator_receptions", 0) # Recall receptions if operators is None: receptions = [ Reception.from_HDF(group[f"reception_{r:02d}"]) for r in range(num_receptions) ] else: receptions = [ operator.recall_reception(group[f"reception_{r:02d}"]) for r, operator in zip(range(num_receptions), operators) ] # Initialize object return cls.From_ProcessedDeviceInput(device_input, receptions)
[docs] @classmethod def Recall(cls: Type[DRT], group: Group, device: Device) -> DRT: # Recall base class device_input = ProcessedDeviceInput.from_HDF(group) # Recall individual operator receptions num_receptions = group.attrs.get("num_operator_receptions", 0) # recall operator receptions receptions = [ receiver.recall_reception(group[f"reception_{r:02d}"]) for r, receiver in zip(range(num_receptions), device.receivers) ] # Initialize object return cls.From_ProcessedDeviceInput(device_input, receptions)
def to_HDF(self, group: Group) -> None: # Serialize base class ProcessedDeviceInput.to_HDF(self, group) # Serialize groups for t, reception in enumerate(self.operator_receptions): reception.to_HDF(self._create_group(group, f"reception_{t:02d}")) # Serialize attributes group.attrs["num_operator_receptions"] = self.num_operator_receptions
class MixingOperator(Operator): """Base class for operators performing mixing operations.""" __carrier_frequency: float | None # Carrier frequency def __init__(self, carrier_frequency: float | None = None) -> None: """ Args: carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. """ # Initialize base class Operator.__init__(self) # Initialize class attributes self.carrier_frequency = carrier_frequency @property def carrier_frequency(self) -> float | None: """Central frequency of the mixed signal in radio-frequency transmission band in Hz. Denoted by :math:`f_c` with unit :math:`\\left[ f_c \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in the literature. If not specified, i.e. :py:obj:`None`, the operated device's default carrier frequency will be assumed during signal processing. Raises: ValueError: If the carrier frequency is smaller than zero. """ return self.__carrier_frequency @carrier_frequency.setter def carrier_frequency(self, value: float | None) -> None: if value is None: self.__carrier_frequency = None return if value < 0.0: raise ValueError("Carrier frequency must be greater or equal to zero") self.__carrier_frequency = value
[docs] class Receiver(Generic[ReceptionType], MixingOperator, RandomNode): """Operator receiving from a device.""" __reference: Device | None __selected_receive_ports: Sequence[int] __receive_callbacks: Hookable[ReceptionType] def __init__( self, seed: int | None = None, reference: Device | None = None, selected_receive_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, ) -> None: """ Args: seed (int, optional): Random seed used to initialize the pseudo-random number generator. reference (Device, optional): Reference device to which the channel shall be estimated. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. If not specified, the operated device's default carrier frequency will be assumed during signal processing. """ # Initialize base classes RandomNode.__init__(self, seed=seed) MixingOperator.__init__(self, carrier_frequency) # Initialize class attributes self.__receive_callbacks = Hookable() self.reference = reference self.selected_receive_ports = selected_receive_ports
[docs] def add_receive_callback( self, callback: Callable[[ReceptionType], None] ) -> Hook[ReceptionType]: """Add a callback to be called after processing the receive DSP algorithm. Args: callback (Callable[[ReceptionType], None]): Function to be called after processing the receive DSP algorithm. The receive output is passed as the only argument. Returns: Hook to be used for removal. """ return self.__receive_callbacks.add_callback(callback)
@property def reference(self) -> Device | None: """Reference transmitter for this receiver. Returns: A handle to the referenced device. :py:obj:`None` if the device was not specified. """ return self.__reference @reference.setter def reference(self, value: Device | None) -> None: self.__reference = value @property def selected_receive_ports(self) -> Sequence[int] | None: """Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If :py:obj:`None`, all available antenna ports will be considered. """ return self.__selected_receive_ports @selected_receive_ports.setter def selected_receive_ports(self, value: Sequence[int] | None) -> None: self.__selected_receive_ports = value
[docs] def receive(self, signal: Signal, state: DeviceState, notify: bool = True) -> ReceptionType: """Process a received signal by the receiver. Wrapper around the abstract :meth:`_receive<Receiver._receive>` method. Args: signal (Signal): Model of the signal samples to be processed by the DSP algorithm. state (DeviceState): State of the receiving device to be conmsidered by the DSP algorithm. notify (bool, optional): Notify the receiver's callbacks. Enabled by default. Returns: Information received by this operator. Raises: ValueError: If the number of signal streams does not match the number of receive ports. """ # Generate the state for this DSP algorithm's assigned ports receive_state = state.receive_state(self.selected_receive_ports) # Assert the number of signal streams if signal.num_streams != receive_state.num_digital_receive_ports: raise ValueError( f"Number of signal streams does not match the number of receive ports ({signal.num_streams} != {receive_state.num_digital_receive_ports})" ) # Run the receive DSP routine reception = self._receive(signal, receive_state) # Notify the receiver's callbacks if notify: self.__receive_callbacks.notify(reception) # Return received information return reception
@abstractmethod def _receive(self, signal: Signal, state: ReceiveState) -> ReceptionType: """Process a received signal by the receiver. Subroutine of the public :meth:`receive<hermespy.core.device.Receiver.receive>` method that performs the pipeline-specific receive processing and consolidates the inferred information into a single :class:`Reception<hermespy.core.device.Reception>` object. Args: signal (Signal): Multi-stream signal model to be processed. state (ReceiveState): Device state to be considered during reception. Returns: Information inferred from the received signal. """ ... # pragma: no cover def recall_reception(self, group: Group, notify: bool = True) -> ReceptionType: """Recall a specific reception from a serialization. Internally calls the abstract method :meth:`_recall_reception<Receiver._recall_reception>`. Caches the reception if the respective flag is enabled. Args: group (Group): HDF group containing the reception. notify (bool, optional): Notify the receiver's callbacks. Enabled by default. Returns: The recalled reception. :meta private: """ # Recall the reception recalled_reception = self._recall_reception(group) # Notify the receiver's callbacks if notify: self.__receive_callbacks.notify(recalled_reception) return recalled_reception @abstractmethod def _recall_reception(self, group: Group) -> ReceptionType: """Recall a specific reception from a serialization. Args: group (Group): HDF group containing the reception. Returns: The recalled reception. """ ... # pragma: no cover @property @abstractmethod def power(self) -> float: """Expected power of the received signal in Watts. .. note:: Applies only to the signal-carrying parts of the transmission, silent parts shuch as guard intervals should not be considered. """ ... # pragma: no cover
class OperatorSlot(Generic[OperatorType], Sequence[OperatorType]): """Slot list for operators of a single device.""" __device: Device # Device this operator belongs to __operators: list[OperatorType] # List of registered operators def __init__(self, device: Device) -> None: """ Args: device (Device): Device this slot belongs to. """ self.__device = device self.__operators = [] @property def device(self) -> Device: """Device this operator slot belongs to. Returns: Device: Handle to the device. """ return self.__device def operator_index(self, operator: OperatorType) -> int: """Index of an operator within this slot. Returns: index (int): The `operator`'s index. Raises: ValueError: If the `operator` is not registered at this slot. """ return self.__operators.index(operator) def add(self, operator: OperatorType) -> None: """Add a new operator to this slot. Args: operator (OperatorType): Operator to be added to this slot. Raises: RuntimeError: If the `operator` is already registered at this slot. """ # Do nothing if the operator is already registered if operator in self.__operators: return self.__operators.append(operator) def remove(self, operator: OperatorType) -> None: """Remove an operator from this slot. Args: operator (OperatorType): Handle to the operator to be removed. """ if operator in self.__operators: self.__operators.remove(operator) def registered(self, operator: OperatorType) -> bool: """Check if an operator is registered at this slot. Returns: bool: Boolean indicating the registration status. """ return operator in self.__operators @property def num_operators(self) -> int: """Number of operators on this slot. Returns: int: Number of operators. """ return len(self.__operators) @property def max_sampling_rate(self) -> float: """Maximum requested sampling rate of all operators. Returns: The maximum sampling rate in Hz. """ sampling_rate = 0.0 for operator in self.__operators: sampling_rate = max(sampling_rate, operator.sampling_rate) return sampling_rate @property def min_frame_duration(self) -> float: """The mininum frame duration requried to process at least a single frame for each operator. Returns: The minimum frame duration in seconds. """ frame_duration = 0.0 for operator in self.__operators: frame_duration = max(frame_duration, operator.frame_duration) return frame_duration @property def min_num_samples_per_frame(self) -> int: """The number of samples required to collect at last a single frame for each operator. Returns: Number of samples. """ sampling_rate = self.max_sampling_rate frame_duration = self.min_frame_duration return ceil(frame_duration * sampling_rate) @overload def __getitem__(self, item: int) -> OperatorType: ... # pragma: no cover @overload def __getitem__(self, item: slice) -> Sequence[OperatorType]: ... # pragma: no cover def __getitem__(self, item: int | slice) -> OperatorType | Sequence[OperatorType]: return self.__operators[item] def __iter__(self) -> Iterator[OperatorType]: """Iterating over operator slots returns the iterator over the slots.""" return self.__operators.__iter__() def __contains__(self, operator: object) -> bool: """Contains is just a convenient mask for registered. Returns: Registration indicator. """ return operator in self.__operators def __len__(self) -> int: return self.num_operators TransmissionType = TypeVar("TransmissionType", bound="Transmission") """Type variable of a :class:`Transmission`."""
[docs] class Transmitter(Generic[TransmissionType], MixingOperator, RandomNode): """Operator transmitting over a device.""" __transmit_callbacks: Hookable[TransmissionType] __selected_transmit_ports: Sequence[int] | None def __init__( self, seed: int | None = None, selected_transmit_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, ) -> None: """ Args: seed (int, optional): Random seed used to initialize the pseudo-random number generator. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. If not specified, the operated device's default carrier frequency will be assumed during signal processing. """ # Initialize operator base class RandomNode.__init__(self, seed=seed) MixingOperator.__init__(self, carrier_frequency) # Initialize class attributes self.__transmit_callbacks = Hookable() self.__selected_transmit_ports = selected_transmit_ports
[docs] def add_transmit_callback( self, callback: Callable[[TransmissionType], None] ) -> Hook[TransmissionType]: """Add a callback to be called after processing the transmit DSP algorithm. Args: callback (Callable[[TransmissionType], None]): Function to be called after processing the transmit DSP algorithm. The transmit output is passed as the only argument. Returns: Hook to be used for removal. """ return self.__transmit_callbacks.add_callback(callback)
[docs] def transmit( self, state: DeviceState, duration: float = 0.0, notify: bool = True ) -> TransmissionType: """Transmit a signal. Registers the signal samples to be transmitted by the underlying device. Wrapper around the abstract :meth:`_transmit<Transmitter._transmit>` method. Args: state (DeviceState, optional): State of the device at the time of transmission. If not specified, the device's current state will be queried by calling :meth:`hermespy.core.device.Device.state`. duration (float, optional): Duration of the transmitted signal. If not specified, i.e. zero, the duration will be inferred by the transmitter. notify (bool, optional): Notify the transmitter's callbacks about the generated transmission. Returns: Information generated while transmitting over this operator. Raises: FloatingError: If the transmitter is currently considered floating. """ # Generate the state for this DSP algorithm's assigned ports # Run the transmit DSP routine transmission = self._transmit(state.transmit_state(self.selected_transmit_ports), duration) # Notify the registered callbacks if the respective flag is enabled if notify: self.__transmit_callbacks.notify(transmission) return transmission
@abstractmethod def _transmit(self, state: TransmitState, duration: float) -> TransmissionType: """Generate information to be transmitted. Subroutine of the public :meth:`transmit<Transmitter.transmit>` method that performs the pipeline-specific transmit-processing and consolidates the generated information into a single :class:`Transmission` object. Args: state (TransmitState): State of the device at the time of transmission. duration (float, optional): Duration of the transmitted signal in seconds. If not specified, the duration of a single frame will be assumed. Returns: Information to be transmitted. """ ... # pragma: no cover def recall_transmission(self, group: Group, notify: bool = True) -> TransmissionType: """Recall a specific transmission from a serialization. Internally calls the abstract method :meth:`Transmitter._recall_transmission`. Caches the transmission if the respective flag is enabled. Args: group (Group): HDF group containing the transmission. notify (bool, optional): Notify the transmitter's callbacks about the generated transmission. Enabled by default. Returns: The recalled transmission :meta private: """ # Recall the transmission from HDF recalled_transmission = self._recall_transmission(group) # Notify the registered callbacks if the respective flag is enabled if notify: self.__transmit_callbacks.notify(recalled_transmission) return recalled_transmission @abstractmethod def _recall_transmission(self, group: Group) -> TransmissionType: """Recall a specific transmission from a serialization. Args: group (Group): HDF group containing the transmission. Returns: The recalled transmission """ ... # pragma: no cover @property @abstractmethod def power(self) -> float: """Expected power of the transmitted signal in Watts. .. note:: Applies only to the signal-carrying parts of the transmission, silent parts shuch as guard intervals should not be considered. """ ... # pragma: no cover @property def selected_transmit_ports(self) -> Sequence[int] | None: """Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If :py:obj:`None`, all available transmit ports will be considered. """ return self.__selected_transmit_ports @selected_transmit_ports.setter def selected_transmit_ports(self, value: Sequence[int] | None) -> None: self.__selected_transmit_ports = value
class TransmitterSlot(OperatorSlot[Transmitter]): """Slot for transmitting operators within devices.""" ... # pragma: no cover class ReceiverSlot(OperatorSlot[Receiver]): """Slot for receiving operators within devices.""" ... # pragma: no cover class UnsupportedSlot(OperatorSlot): """Slot for unsupported operations within devices.""" def add(self, operator: Operator) -> None: raise RuntimeError("Slot not supported by this device")
[docs] class Device(ABC, Generic[DST], Transformable, RandomNode, Serializable): """Physical device representation within HermesPy. It acts as the basis for all transmissions and receptions of sampled electromagnetic signals. """ transmitters: TransmitterSlot """Transmitters broadcasting signals over this device.""" receivers: ReceiverSlot """Receivers capturing signals from this device""" __transmit_coding: TransmitSignalCoding # Codings applied to transmitted samples before upload __receive_coding: ( ReceiveSignalCoding # Codings applied to received samples directly after download ) __power: float # Average power of the transmitted signals __output_callbacks: Hookable[DeviceOutput] # Callbacks notified about generated outputs __input_callbacks: Hookable[ProcessedDeviceInput] # Callbacks notified about processed inputs def __init__( self, power: float = 1.0, pose: Transformation | None = None, seed: int | None = None ) -> None: """ Args: power (float, optional): Average power of the transmitted signals in Watts. 1.0 Watt by default. pose (Transformation, optional): Pose of the device with respect to its scenario coordinate system origin. seed (int, optional): Random seed used to initialize the pseudo-random number generator. """ # Initialize base classes RandomNode.__init__(self, seed=seed) Transformable.__init__(self, pose=pose) # Initalize device attributes and properties self.transmitters = TransmitterSlot(self) self.receivers = ReceiverSlot(self) self.__transmit_coding = TransmitSignalCoding() self.__receive_coding = ReceiveSignalCoding() self.__input_callbacks = Hookable() self.__output_callbacks = Hookable() self.power = power
[docs] @abstractmethod def state(self) -> DST: """Query the immutable physical state of the device. Returns: The physical device state. """ ... # pragma: no cover
@property @abstractmethod def antennas(self) -> AntennaArray: """Model of the device's antenna array.""" ... # pragma: no cover @property def transmit_coding(self) -> TransmitSignalCoding: """Digital coding applied to transmitted samples before digital-to-analog conversion.""" return self.__transmit_coding @property def receive_coding(self) -> ReceiveSignalCoding: """Digital coding applied to received samples after analog-to-digital conversion.""" return self.__receive_coding @property def output_callbacks(self) -> Hookable[DeviceOutput]: """Callbacks notified about generated outputs.""" return self.__output_callbacks @property def input_callbacks(self) -> Hookable[ProcessedDeviceInput]: """Callbacks notified about processed inputs.""" return self.__input_callbacks @property def num_digital_transmit_ports(self) -> int: """Number of available digital signal stream ports for transmission. If no transmit coding was configured, this value is equal to the number of transmit antenna ports. """ return self.__transmit_coding.num_transmit_input_streams(self.num_transmit_antenna_ports) @property def num_digital_receive_ports(self) -> int: """Number of available digital signal stream ports for reception. If no receive coding was configured, this value is equal to the number of receive antenna ports. """ return self.__receive_coding.num_receive_output_streams(self.num_receive_antenna_ports) @property def num_transmit_antenna_ports(self) -> int: """Number of available transmit antenna ports. Shorthand to :attr:`antennas'<.antennas>` :attr:`num_transmit_ports<AntennaArray.num_transmit_ports>`. """ return self.antennas.num_transmit_ports @property def num_receive_antenna_ports(self) -> int: """Number of available receive antenna ports. Short hand to :attr:`antennas'<.antennas>` :attr:`num_receive_ports<AntennaArray.num_receive_ports>`. """ return self.antennas.num_receive_ports @property def num_transmit_antennas(self) -> int: """Number of available transmit antennas. Shorthand to :attr:`antennas'<antennas>` :meth:`num_transmit_antennas<AntennaArrayBase.num_transmit_antennas>`. """ return self.antennas.num_transmit_antennas @property def num_receive_antennas(self) -> int: """Number of available receive antennas. Shorthand to :attr:`antennas'<antennas>` :meth:`num_receive_antennas<AntennaArrayBase.num_receive_antennas>`. """ return self.antennas.num_receive_antennas @property def power(self) -> float: """Average power of the transmitted signal signal. Returns: power (float): Power in Watt. Raises: ValueError: If `value` is smaller than zero. """ return self.__power @power.setter def power(self, value: float) -> None: """Set the average power of the transmitted signal.""" if value < 0.0: raise ValueError("Average signal power must be greater or equal to zero") self.__power = value @property def snr(self) -> float: """Signal to noise ratio of the received signal. Returns: Linear signal to noise ratio """ # By default, we have no knowledge about the snr and assum no noise return float("inf") @property def num_antennas(self) -> int: """Number of antennas within this device's topology. Returns: int: Number of antennas, greater or equal to one. """ return self.antennas.num_antennas @property def max_frame_duration(self) -> float: """Maximum frame duration transmitted by this device. Returns: max_duration (float): Maximum duration in seconds. """ max_duration = 0.0 for operator in chain(self.transmitters, self.receivers): max_duration = max(max_duration, operator.frame_duration) # type: ignore return max_duration @property @abstractmethod def carrier_frequency(self) -> float: """Central frequency of the device's emissions in the RF-band. Returns: frequency (float): Carrier frequency in Hz. Raises: ValueError: On negative carrier frequencies. """ ... # pragma: no cover @property def wavelength(self) -> float: """Central wavelength of the device's emmissions in the RF-band. Returns: wavelength (float): Wavelength in m. Raises: ValueError: On wavelengths smaller or equal to zero. """ return speed_of_light / self.carrier_frequency @property @abstractmethod def sampling_rate(self) -> float: """Sampling rate at which the device's analog-to-digital converters operate. Returns: sampling_rate (float): Sampling rate in Hz. Raises: ValueError: If the sampling rate is not greater than zero. """ ... # pragma: no cover
[docs] def add_dsp(self, dsp: Transmitter | Receiver) -> None: """Add a DSP algorithm to this device. Args: dsp (Transmitter | Receiver): DSP algorithm to be added to the device. If the DSP algorithm is a transmitter, it will be added to the transmit layer. If the DSP algorithm is a receiver, it will be added to the receive layer. If the DSP is both, it will be added to both layers. """ if isinstance(dsp, Transmitter): self.transmitters.add(dsp) if isinstance(dsp, Receiver): self.receivers.add(dsp)
[docs] def transmit_operators( self, state: DST | None = None, notify: bool = True ) -> list[Transmission]: """Generate transmitted information for all registered operators. Calls :meth:`Transmitter.transmit` for each transmit operator. Args: state (DST, optional): Device state to be used for the transmission. If not provided, query the current device state by calling :meth:`state`. notify (bool, optional): Notify the transmitter's callbacks about the generated transmission. Enabled by default. Returns: List of operator transmisisons. """ _state = self.state() if state is None else state return [transmitter.transmit(_state, 0.0, notify) for transmitter in self.transmitters]
[docs] def generate_output( self, operator_transmissions: Sequence[Transmission], state: DST | None = None, resample: bool = True, ) -> DeviceOutput: """Generate the device's output. Args: operator_transmissions (Sequence[Transmissions]): List of operator transmissions from which to generate the output. state (DST, optional): Device state to be used for the output generation. If not provided, query the current device state resample (bool, optional): Resample the output signal to the device's sampling rate. Enabled by default. Returns: The device's output. Raises: RuntimeError: If the transmit coding is incompatible with the number of transmit antenna ports. """ _state = self.state() if state is None else state # Ensure that the precoded signal will match the number of antenna ports if self.num_digital_transmit_ports != self.num_transmit_antenna_ports: raise RuntimeError( f"Number of streams after transmit coding does not match the number of transmit antenna ports ({self.num_digital_transmit_ports} != {self.num_transmit_antenna_ports})" ) # Query the intended transmission ports of each operator operator_streams = [o.selected_transmit_ports for o in self.transmitters] # Superimpose the operator transmissions to the device's RF configuration superimposed_signal = Signal.Empty( _state.sampling_rate, self.num_digital_transmit_ports, carrier_frequency=_state.carrier_frequency, ) for transmission, indices in zip(operator_transmissions, operator_streams): if transmission is not None: superimposed_signal.superimpose( transmission.signal, resample=resample, stream_indices=indices ) # Apply the transmit coding encoded_signal = self.transmit_coding.encode_streams( superimposed_signal, _state.transmit_state(list(range(self.num_transmit_antennas))) ) # Generate the output output = DeviceOutput(encoded_signal) # Notify the output callbacks self.__output_callbacks.notify(output) return output
[docs] def transmit(self, state: DST | None = None, notify: bool = True) -> DeviceTransmission: """Transmit over this device. Args: state (DST, optional): Device state to be used for the transmission. If not provided, query the current device state by calling :meth:`state`. notify (bool, optional): Notify the transmitter's callbacks about the generated transmission. Enabled by default. Returns: Information transmitted by this device. """ _state = self.state() if state is None else state operator_transmissions = self.transmit_operators(_state, notify) device_output = self.generate_output(operator_transmissions, _state) return DeviceTransmission.From_Output(device_output, operator_transmissions)
[docs] def recall_transmission(self, group: Group) -> DeviceTransmission: """Recall a specific transmission from a HDF5 serialization. Args: group (Group): HDF group containing the transmission. Returns: The recalled transmission. """ # Recall the specific operator transmissions return DeviceTransmission.from_HDF(group, list(self.transmitters))
[docs] def process_input( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], state: DST | None = None ) -> ProcessedDeviceInput: """Process input signals impinging onto this device. Args: impinging_signals (Union[DeviceInput, Signal, Sequence[Signal]]): The samples to be processed by the device. state (DST, optional): Device state to be used for the processing. If not provided, query the current device state. Returns: The processed device input information. Raises: ValueError: If the number of signal streams does not match the device configuration expectations. """ # Query the current device state if none was provided _state = self.state() if state is None else state # Convert argument impinging_signals = ( [impinging_signals] if isinstance(impinging_signals, Signal) else impinging_signals ) impinging_signals = ( impinging_signals.impinging_signals if isinstance(impinging_signals, DeviceInput) else impinging_signals ) # Superimpose the impinging signal models if len(impinging_signals) != 1: superimposed_signal = Signal.Empty( self.sampling_rate, self.antennas.num_receive_antennas, carrier_frequency=self.carrier_frequency, ) for signal in impinging_signals: superimposed_signal.superimpose(signal) else: superimposed_signal = impinging_signals[0] # Ensure that the number of antenna streams to be decoded matches the number of receive antenna ports if superimposed_signal.num_streams != self.num_receive_antenna_ports: raise RuntimeError( f"Number of antenna streams does not match the required number of receive coding streams ({superimposed_signal.num_streams} != {self.num_digital_receive_ports})" ) # Apply the receive coding decoded_signal = ( self.receive_coding.decode_streams( superimposed_signal, _state.receive_state(list(range(_state.antennas.num_receive_ports))), ) if _state.antennas.num_receive_ports > 0 else superimposed_signal ) # Each operator is fed the decoded signal by default operator_inputs: list[Signal] = [] for receiver in self.receivers: selected_receive_antennas = receiver.selected_receive_ports stream_selector = ( slice(None) if selected_receive_antennas is None else selected_receive_antennas ) # type: slice | Sequence[int] operator_inputs.append(decoded_signal.getstreams(stream_selector)) # Generate the input input = ProcessedDeviceInput(impinging_signals, operator_inputs) # Notify the input callbacks self.__input_callbacks.notify(input) return input
[docs] def receive_operators( self, operator_inputs: ProcessedDeviceInput | Sequence[Signal], state: DST | None = None, notify: bool = True, ) -> list[Reception]: """Receive over the registered operators. Args: operator_inputs (ProcessedDeviceInput | Sequence[Signal]): The signal samples to be processed by the device's operators. If a :class:`ProcessedDeviceInput` is provided, the operator inputs will be extracted from it. Otherwise, the operator inputs are assumed to be provided directly. state (DST, optional): Assumed state of the device during reception. If not provided, the current device state will be queried by calling :meth:`state`. notfiy (bool, optional): Notify all registered callbacks within the involved DSP algorithms. Enabled by default. Returns: List of information generated by receiving over the device's operators. Raises: ValueError: If the number of operator inputs does not match the number of receive operators. """ _operator_inputs = ( operator_inputs.operator_inputs if isinstance(operator_inputs, ProcessedDeviceInput) else operator_inputs ) _state = self.state() if state is None else state if len(_operator_inputs) != self.receivers.num_operators: raise ValueError( f"Number of operator inputs ({len(_operator_inputs)}) does not match the number of receive operators ({self.receivers.num_operators})" ) # Generate receive information receptions: list[Reception] = [] for operator, signal in zip(self.receivers, _operator_inputs): receptions.append(operator.receive(signal, _state, notify)) return receptions
[docs] def receive( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], state: DST | None = None, notify: bool = True, ) -> DeviceReception: """Receive over this device. Internally calls :meth:`Device.process_input` and :meth:`Device.receive_operators`. Args: impinging_signals (DeviceInput | Signal | Sequence[Signal]): The samples to be processed by the device. state (DST, optional): Device state to be used for the processing. If not provided, query the current device state by calling :meth:`state`. notify (bool, optional): Notify all registered callbacks within the involved DSP algorithms. Enabled by default. Returns: The received device information. """ _state = self.state() if state is None else state # Process input processed_input = self.process_input(impinging_signals, _state) # Generate receptions receptions = self.receive_operators(processed_input.operator_inputs, _state, notify) # Generate device reception return DeviceReception.From_ProcessedDeviceInput(processed_input, receptions)
[docs] def recall_reception(self, group: Group) -> DeviceReception: """Recall a specific reception from a HDF5 serialization. Args: group (Group): HDF group containing the reception. Returns: The recalled reception. """ return DeviceReception.Recall(group, self)
@classmethod def to_yaml(cls: Type[Device], representer: SafeRepresenter, node: Device) -> MappingNode: return node._mapping_serialization_wrapper( representer, None, {"transmitters": list(node.transmitters), "receivers": list(node.receivers)}, ) @classmethod def from_yaml(cls: Type[Device], constructor: SafeConstructor, node: Node) -> Device: # Abort if the node is scalar if isinstance(node, ScalarNode): return cls() device_mapping: dict = constructor.construct_mapping(node, deep=True) # Pop transmitter and receiver lists transmitters = device_mapping.pop("transmitters", list()) receivers = device_mapping.pop("receivers", list()) # Construct the device device = cls.InitializationWrapper(device_mapping) # Add transmitters and receivers for transmitter in transmitters: device.transmitters.add(transmitter) for receiver in receivers: device.receivers.add(receiver) return device