Source code for hermespy.core.device

# -*- coding: utf-8 -*-
"""
===============
Device Modeling
===============

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   classDiagram-v2

   direction RL

   class Device {

       topology
       carrier_frequency
       transmit()
       receive()
   }

   class SimulatedDevice {

       position
       orientation
       velocity
       acceleration
   }

   class HardwareDevice {

       trigger()
   }

   class Operator {

       transmit()
       receive()
   }

   class Modem {

       tx_bits
       rx_bits
       ...
       waveform
       transmit()
       receive()
   }

   class Radar {

       waveform
       ...
       target_estimates
       transmit()
       receive()
   }

   Operator <|-- Modem
   Operator <|-- Radar
   Device *-- Operator
   SimulatedDevice <|-- Device
   HardwareDevice <|-- Device


.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   channel{Real World}

   subgraph devicea[HardwareDevice]

       direction TB
       deviceatx>Tx Slot] --> deviceabinding[Hardware Binding] --> devicearx>Rx Slot]

   end

   subgraph deviceb[HardwareDevice]

       direction TB
       devicebtx>Tx Slot] --> devicebbinding[Hardware Binding] --> devicebrx>Rx Slot]
   end

   deviceabinding <--> channel
   devicebbinding <--> channel

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   channel{Channel Modeling}

   subgraph devicea[SimulatedDevice]

       direction TB
       deviceatx>Tx Slot]
       devicearx>Rx Slot]

   end

   subgraph deviceb[SimulatedDevice]

       direction TB
       devicebtx>Tx Slot]
       devicebrx>Rx Slot]
   end

   deviceatx --> channel --> devicearx
   devicebtx --> channel --> devicebrx

"""

from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from itertools import chain
from math import ceil
from typing import Generic, Iterator, List, Optional, overload, Type, TypeVar

import numpy as np
from h5py import Group
from scipy.constants import speed_of_light

from .antennas import AntennaArray
from .definitions import SNRType
from .factory import HDFSerializable, Serializable
from .random_node import RandomNode
from .signal_model import Signal
from .transformation import Transformable, Transformation


__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


class FloatingError(RuntimeError):
    """Exception raised if an operation fails due to a currently being considered floating."""

    pass


OperationResultType = TypeVar("OperationResultType", bound="OperationResult")
"""Type of OperationResult"""


OperatorType = TypeVar("OperatorType", bound="Operator")
"""Type of operator."""


DeviceType = TypeVar("DeviceType", bound="Device")
"""Type of device."""


[docs] class OperationResult(HDFSerializable): """Base class for outputs of device operators.""" signal: Signal """Base-band model of the processed signal.""" def __init__(self, signal: Signal) -> None: """ Args: signal (signal): Base-band waveform model. """ self.signal = signal @classmethod def from_HDF(cls: Type[OperationResultType], group: Group) -> OperationResultType: signal = Signal.from_HDF(group["signal"]) return cls(signal=signal) def to_HDF(self, group: Group) -> None: self.signal.to_HDF(self._create_group(group, "signal"))
[docs] class Transmission(OperationResult): """Information generated by transmitting over a transmit operator.""" ... # pragma: no cover
[docs] class Reception(OperationResult): """Information generated by receiving over a receive operator.""" ... # pragma: no cover
ReceptionType = TypeVar("ReceptionType", bound=Reception) """Type variable of a :class:`Reception`.""" SlotType = TypeVar("SlotType", bound="OperatorSlot") """Type of slot.""" class Operator(Generic[SlotType], Serializable): """Base class for operators of devices. In HermesPy, operators may configure devices, broadcast signals over them or capture signals from them. Each operator is attached to a single device instance it operates on. """ property_blacklist = {"slot"} __slot: Optional[SlotType] # Slot within a device this operator 'operates' def __init__(self, slot: Optional[SlotType] = None) -> None: """ Args: slot (OperatorSlot, optional): Device slot this operator is attached to. By default, the operator is not attached and considered floating. """ self.slot = slot @property def slot(self) -> Optional[OperatorSlot]: """Device slot this operator operates. Returns: Handle to the device slot. `None` if the operator is currently considered floating. :meta private: """ return self.__slot @slot.setter def slot(self, value: Optional[SlotType]) -> None: # A None argument indicates the slot should be unbound if value is None: if hasattr(self, "_Operator__slot") and self.__slot is not None: # This is necessary to prevent event-loops. Just ignore it. slot = self.__slot self.__slot = None if slot.registered(self): slot.remove(self) else: self.__slot = None elif not hasattr(self, "_Operator__slot"): self.__slot = value if not self.__slot.registered(self): self.__slot.add(self) elif self.__slot is not value: # if self.__slot is not None and self.__slot.registered(self): # self.__slot.remove(self) self.__slot = value if not self.__slot.registered(self): self.__slot.add(self) @property def slot_index(self) -> int | None: """Index of the operator within its slot. Returns: Index of the operator. `None` if the operator is currently considered floating. :meta private: """ if self.__slot is None: return None return self.__slot.operator_index(self) @property def device(self) -> Device | None: """Device this object is assigned to. :obj:`None` if this object is currently considered floating / unassigned. """ if self.__slot is None: return None return self.__slot.device @property def attached(self) -> bool: """Attachment state of the operator. Indicates whether this object is currently assigned a :class:`.Device` instance. """ return self.__slot is not None @property @abstractmethod def frame_duration(self) -> float: """Duration of a single sample frame in seconds. Denoted as :math:`T_{\\mathrm{F}}` of unit :math:`\\left[ T_{\\mathrm{F}} \\right] = \\mathrm{s}` in literature. """ ... # pragma: no cover @property @abstractmethod def sampling_rate(self) -> float: """The operator's preferred sampling rate in Hz. Denoted as :math:`f_{\\mathrm{S}}` of unit :math:`\\left[ f_{\\mathrm{S}} \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in literature. """ ... # pragma: no cover
[docs] class DeviceOutput(HDFSerializable): """Base class for the output of a device.""" __mixed_signal: Signal | None def __init__(self, mixed_signal: Signal) -> None: """ Args: mixed_signal (Signal): Mixed signal emerging from the device. """ self.__mixed_signal = mixed_signal @property def sampling_rate(self) -> float: """Sampling rate at which the device operates. Returns: Sampling rate in Hz. """ return self.__mixed_signal.sampling_rate @property def num_antennas(self) -> int: """Number of transmitting device antennas. Returns: Number of antennas. """ return self.__mixed_signal.num_streams @property def carrier_frequency(self) -> float: """Carrier frequency at which the device operates. Returns: Carrier frequency in Hz. """ return self.__mixed_signal.carrier_frequency @property def emerging_signals(self) -> Sequence[Signal]: """Models of electromagnetic signals generated by the device. Returns: List of signal models. """ return [self.__mixed_signal] @property def num_emerging_signals(self) -> int: """Number of emerging signal models. Returns: Count of models. """ return len(self.emerging_signals) @property def mixed_signal(self) -> Signal: """Combined signal of the device's transmissions. Returns: Signal model of the device's electromagnetic transmission. """ return self.__mixed_signal @classmethod def from_HDF(cls: Type[DeviceOutput], group: Group) -> DeviceOutput: # Recall serialized mixed signal group mixed_signal = Signal.from_HDF(group["mixed_signal"]) # Initialize object return cls(mixed_signal) def to_HDF(self, group: Group) -> None: # Serialize groups self.mixed_signal.to_HDF(self._create_group(group, "mixed_signal"))
class DeviceTransmission(DeviceOutput): """Information generated by transmitting over a device.""" __operator_transmissions: Sequence[Transmission] def __init__( self, operator_transmissions: Sequence[Transmission], mixed_signal: Signal ) -> None: """ Args: operator_transmissions (Sequence[Transmission]): List of information generated by transmit operators. mixed_signal (Signal): Mixed signal transmitted by the device. """ # Initialize base class DeviceOutput.__init__(self, mixed_signal) # Initialze class attributes self.__operator_transmissions = operator_transmissions @classmethod def From_Output( cls: Type[DeviceTransmission], output: DeviceOutput, operator_transmissions: Sequence[Transmission], ) -> DeviceTransmission: """Initialize a device transmission from a device output. Args: output (DeviceOutput): Device output object. operator_transmissions (List[Transmission]): List of information generated by transmit operators. """ return cls(operator_transmissions, output.mixed_signal) @property def operator_transmissions(self) -> Sequence[Transmission]: """Information transmitted by device operators. Returns: Operator information. """ return self.__operator_transmissions @property def num_operator_transmissions(self) -> int: """Number of operator transmissions.""" return len(self.__operator_transmissions) @classmethod def from_HDF(cls: Type[DeviceTransmission], group: Group) -> DeviceTransmission: # Recall base class device_output = DeviceOutput.from_HDF(group) # Recall attributes num_transmissions = group.attrs.get("num_transmissions", 1) # Recall transmissions transmissions = [ Transmission.from_HDF(group[f"transmission_{t:02d}"]) for t in range(num_transmissions) ] # Initialize object return cls.From_Output(device_output, transmissions) @classmethod def Recall(cls: Type[DeviceTransmission], group: Group, device: Device) -> DeviceTransmission: # Recall base class device_output = DeviceOutput.from_HDF(group) # Recall attributes num_transmissions = group.attrs.get("num_transmissions", 1) transmissions = [ transmitter.recall_transmission(group[f"transmission_{t:02d}"]) for t, transmitter in zip(range(num_transmissions), device.transmitters) ] # Initialize object return cls.From_Output(device_output, transmissions) def to_HDF(self, group: Group) -> None: # Serialize base class DeviceOutput.to_HDF(self, group) # Serialize attributes group.attrs["num_transmissions"] = self.num_operator_transmissions # Serialize groups for t, transmission in enumerate(self.operator_transmissions): transmission.to_HDF(self._create_group(group, f"transmission_{t:02d}")) class DeviceInput(HDFSerializable): """Receive information required by devices.""" __impinging_signals: Sequence[Signal] def __init__(self, impinging_signals: Signal | Sequence[Signal]) -> None: """ Args: impinging_signals (Union[Signal, List[Signal]]): Signals to be processed by the device. """ impinging_signals = ( [impinging_signals] if isinstance(impinging_signals, Signal) else impinging_signals ) self.__impinging_signals = impinging_signals @property def impinging_signals(self) -> Sequence[Signal]: """Signals to be processed by the device. Returns: List of signal models. """ return self.__impinging_signals @property def num_impinging_signals(self) -> int: """Number of signals impinging onto the device. Returns: Signal model count. """ return len(self.__impinging_signals) @classmethod def from_HDF(cls: Type[DeviceInput], group: Group) -> DeviceInput: num_impinging_signals = group.attrs.get("num_impinging_signals", 1) impinging_signals = [ Signal.from_HDF(group[f"impinging_signal_{s:02d}"]) for s in range(num_impinging_signals) ] return cls(impinging_signals) def to_HDF(self, group: Group) -> None: # Serialize groups for s, signal in enumerate(self.impinging_signals): signal.to_HDF(self._create_group(group, f"impinging_signal_{s:02d}")) # Serialize attributes group.attrs["num_impinging_signals"] = self.num_impinging_signals class ProcessedDeviceInput(DeviceInput): """Information generated by receiving over a device.""" __operator_inputs: Sequence[Signal] def __init__( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], operator_inputs: Sequence[Signal], ) -> None: """ Args: impinging_signals (DeviceInput | Signal | Sequence[Signal]): Numpy vector containing lists of signals impinging onto the device. operator_inputs (Sequence[Signal]): Information cached by the device operators. """ if isinstance(impinging_signals, DeviceInput): impinging_signals = impinging_signals.impinging_signals # Initialize base class DeviceInput.__init__(self, impinging_signals) # Initialize attributes self.__operator_inputs = operator_inputs @property def operator_inputs(self) -> Sequence[Signal]: return self.__operator_inputs @property def num_operator_inputs(self) -> int: """Number of operator inputs. Returns: Input count. """ return len(self.__operator_inputs) @classmethod def from_HDF(cls: Type[ProcessedDeviceInput], group: Group) -> ProcessedDeviceInput: # Recall base class device_input = DeviceInput.from_HDF(group) # Recall individual detected frames num_inputs = group.attrs.get("num_operator_inputs", 0) operator_inputs = [ Signal.from_HDF(group[f"operator_input_{i:02d}"]) for i in range(num_inputs) ] return cls(device_input, operator_inputs) def to_HDF(self, group: Group) -> None: # Serialize base class DeviceInput.to_HDF(self, group) # Serialize groups for i, input_signal in enumerate(self.operator_inputs): input_signal.to_HDF(self._create_group(group, f"operator_input_{i:02d}")) # Serialize attributes group.attrs["num_operator_inputs"] = self.num_operator_inputs DRT = TypeVar("DRT", bound="DeviceReception") """Type of device reception.""" class DeviceReception(ProcessedDeviceInput): """Information generated by receiving over a device and its operators.""" __operator_receptions: Sequence[Reception] def __init__( self, impinging_signals: Signal | Sequence[Signal], operator_inputs: Sequence[Signal], operator_receptions: Sequence[Reception], ) -> None: """ Args: impinging_signals (Signal | Sequence[Signal]): Numpy vector containing lists of signals impinging onto the device. operator_inputs (Sequence[Signal]): Information cached by the device operators. operator_separation (bool): Is the operator separation flag enabled? operator_receptions (Sequence[Reception]): Information inferred from receive operators. """ # Initialize base class ProcessedDeviceInput.__init__(self, impinging_signals, operator_inputs) # Initialize attributes self.__operator_receptions = operator_receptions @classmethod def From_ProcessedDeviceInput( cls: Type[DRT], device_input: ProcessedDeviceInput, operator_receptions: Sequence[Reception] ) -> DRT: """Initialize an instance from a processed device input. Args: device_input (ProcessedDeviceInput): The processed device output. operator_receptions (List[OperatorReceptions]): List of operator receptions. Returns: The initialized object. """ return cls( device_input.impinging_signals, device_input.operator_inputs, operator_receptions ) @property def operator_receptions(self) -> Sequence[Reception]: """Information transmitted by device operators. Returns: Operator information. """ return self.__operator_receptions @property def num_operator_receptions(self) -> int: """Number of operator receptions. Returns: Number of receptions. """ return len(self.__operator_receptions) @classmethod def from_HDF(cls: Type[DRT], group: Group) -> DRT: # Recall base class device_input = ProcessedDeviceInput.from_HDF(group) # Recall individual operator receptions num_receptions = group.attrs.get("num_operator_receptions", 0) # Recall operator receptions operator_receptions = [ Reception.from_HDF(group[f"reception_{f:02d}"]) for f in range(num_receptions) ] # Initialize object return cls.From_ProcessedDeviceInput(device_input, operator_receptions) @classmethod def Recall(cls: Type[DRT], group: Group, device: Device) -> DRT: # Recall base class device_input = ProcessedDeviceInput.from_HDF(group) # Recall individual operator receptions num_receptions = group.attrs.get("num_operator_receptions", 0) # recall operator receptions receptions = [ receiver.recall_reception(group[f"reception_{r:02d}"]) for r, receiver in zip(range(num_receptions), device.receivers) ] # Initialize object return cls.From_ProcessedDeviceInput(device_input, receptions) def to_HDF(self, group: Group) -> None: # Serialize base class ProcessedDeviceInput.to_HDF(self, group) # Serialize groups for t, reception in enumerate(self.operator_receptions): reception.to_HDF(self._create_group(group, f"reception_{t:02d}")) # Serialize attributes group.attrs["num_operator_receptions"] = self.num_operator_receptions class MixingOperator(Generic[SlotType], Operator[SlotType], ABC): """Base class for operators performing mixing operations.""" __carrier_frequency: Optional[float] # Carrier frequency def __init__( self, carrier_frequency: Optional[float] = None, slot: Optional[SlotType] = None ) -> None: """ Args: carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. slot (SlotType, optional): Device slot this operator operators. """ self.carrier_frequency = carrier_frequency Operator.__init__(self, slot) @property def carrier_frequency(self) -> float: """Central frequency of the mixed signal in radio-frequency transmission band in Hz. Denoted by :math:`f_c` with unit :math:`\\left[ f_c \\right] = \\mathrm{Hz} = \\tfrac{1}{\\mathrm{s}}` in the literature. By default, the carrier frequency of the assigned :class:`.Device` is returned. If no device is assigned, we assume a base band, i.e. :math:`f_c = 0`. Raises: ValueError: If the carrier frequency is smaller than zero. """ if self.__carrier_frequency is None: if self.device is None: return 0.0 else: return self.device.carrier_frequency return self.__carrier_frequency @carrier_frequency.setter def carrier_frequency(self, value: Optional[float]) -> None: if value is None: self.__carrier_frequency = None return if value < 0.0: raise ValueError("Carrier frequency must be greater or equal to zero") self.__carrier_frequency = value
[docs] class Receiver(RandomNode, MixingOperator["ReceiverSlot"], Generic[ReceptionType]): """Operator receiving from a device.""" __reference: Device | None __signal: Signal | None __reception: ReceptionType | None __selected_receive_ports: Sequence[int] def __init__( self, seed: int | None = None, reference: Device | None = None, selected_receive_ports: Sequence[int] | None = None, *args, **kwargs, ) -> None: """ Args: seed (int, optional): Random seed used to initialize the pseudo-random number generator. reference (Device, optional): Reference device to which the channel shall be estimated. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. *args: Operator base class initialization parameters. \**kwargs: Operator base class initialization parameters. """ # Initialize base classes RandomNode.__init__(self, seed=seed) MixingOperator.__init__(self, *args, **kwargs) # Initialize class attributes self.reference = reference self.selected_receive_ports = selected_receive_ports self.__signal = None self.__reception = None @Operator.device.setter # type: ignore def device(self, value: Device | None) -> None: # type: ignore value.receivers.add(self) @Operator.slot.setter # type: ignore def slot(self, value: Optional[ReceiverSlot]) -> None: Operator.slot.fset(self, value) # type: ignore self.random_mother = None if value is None else value.device @property def reference(self) -> Optional[Device]: """Reference transmitter for this receiver. Returns: A handle to the referenced device. `None` if the device was not specified. """ return self.__reference @reference.setter def reference(self, value: Optional[Device]) -> None: self.__reference = value @property def selected_receive_ports(self) -> Sequence[int] | None: """Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If :py:obj:`None`, all available antenna ports will be considered. Raises: ValueError: If the selected ports don't match the configured device's receive antenna array configuration. """ return self.__selected_receive_ports @selected_receive_ports.setter def selected_receive_ports(self, value: Sequence[int] | None) -> None: if value is None: self.__selected_receive_ports = None return # Make sure the provided indices match the underlying device's antenna array configuration if self.device is not None: if max(value) > (self.device.antennas.num_receive_ports - 1): raise ValueError( "Receive antenna selection indices don't match the device's receive antenna configuration" ) self.__selected_receive_ports = value @property def num_receive_ports(self) -> int: """Number of considered receiving ports.""" if self.selected_receive_ports is None: if self.device is None: return 0 return self.device.antennas.num_receive_ports return len(self.selected_receive_ports) @property def num_receive_antennas(self) -> int: """Number of considered receiving antennas.""" if self.device is None: return 0 if self.selected_receive_ports is None: return self.device.antennas.num_receive_antennas return self.device.antennas.count_receive_antennas(self.selected_receive_ports) @property def reception(self) -> ReceptionType | None: """Information inferred from the most recent reception. Updated during the :meth:`receive<receive>` routine. :py:obj:`None` if the cache has been cleared or :meth:`receive<receive>` has not been called yet. """ return self.__reception
[docs] def receive(self, signal: Signal | None = None, cache: bool = True) -> ReceptionType: """Process a received signal by the receiver. Wrapper around the abstract :meth:`_receive<Receiver._receive>` method. Args: signal (Signal, optional): Signal model to be processed. cache (bool, optional): Cache the received information Returns: Information rceived by this operator. Raises: RuntimeError: If signal model was not provided and no signal is cached. ValueError: If the number of signal streams does not match the number of receive ports. """ if signal is None: if self.signal is None: raise RuntimeError("Error attempting to fetch a cached receiver signal") signal = self.signal if signal.num_streams != self.num_receive_ports and self.num_receive_ports > 0: raise ValueError( f"Number of signal streams does not match the number of receive ports ({signal.num_streams} != {self.num_receive_ports})" ) # Generate received information reception = self._receive(signal) # Cache the reception if the respective flag is enabled if cache: self.__reception = reception # Return received information return reception
@abstractmethod def _receive(self, signal: Signal) -> ReceptionType: """Process a received signal by the receiver. Subroutine of the public :meth:`receive<hermespy.core.device.Receiver.receive>` method that performs the pipeline-specific receive processing and consolidates the inferred information into a single :class:`Reception<hermespy.core.device.Reception>` object. Args: signal (Signal): Multi-stream signal model to be processed. Returns: Information inferred from the received signal. """ ... # pragma: no cover @property def signal(self) -> Signal | None: """Cached signal model to be received. Signal model assumed by :meth:`receive` if no signal model is provided. Updated with :meth:`cache_reception`. :obj:`None` if cache has been cleared or :meth:`receive` has not been called yet. """ return self.__signal
[docs] def cache_reception(self, signal: Signal | None) -> None: """Cache recent reception at this receiver. Args: signal (Signal | None): Signal model to be cached for the next reception. channel_realization (ChannelRealizationBase | None): Recently received channel state. """ self.__signal = signal
def recall_reception(self, group: Group, cache: bool = True) -> ReceptionType: """Recall a specific reception from a serialization. Internally calls the abstract method :meth:`_recall_reception<Receiver._recall_reception>`. Caches the reception if the respective flag is enabled. Args: group (Group): HDF group containing the reception. cache (bool, optional): Cache the reception. Enabled by default. Returns: The recalled reception. :meta private: """ recalled_reception = self._recall_reception(group) if cache: self.__reception = recalled_reception return recalled_reception @abstractmethod def _recall_reception(self, group: Group) -> ReceptionType: """Recall a specific reception from a serialization. Args: group (Group): HDF group containing the reception. Returns: The recalled reception. """ ... # pragma: no cover
[docs] def noise_power(self, strength: float, snr_type: SNRType) -> float: """Compute noise power for a given signal strength. Internally calls :meth:`_noise_power` for some :class:`SNRTypes<hermespy.core.definitions.SNRType>`. Args: strength (float): Signal strength indicator. The unit depends on the `snr_type`. snr_type (SNRType): The considered signal to noise ratio type. Raises: ValueError: If the receiver does not support the required snr type. """ # For the N0 snr_type the receiver implementation must not be queried if snr_type is SNRType.N0: return strength return self._noise_power(strength, snr_type)
@abstractmethod def _noise_power(self, strength: float, snr_type: SNRType) -> float: """Compute noise power for a given signal strength. Args: strength (float): Signal strength indicator. The unit depends on the `snr_type`. snr_type (SNRType): The considered signal to noise ratio type. Raises: ValueError: If the receiver does not support the required snr type. """ ... # pragma: no cover
class OperatorSlot(Generic[OperatorType], Sequence[OperatorType]): """Slot list for operators of a single device.""" __device: Device # Device this operator belongs to __operators: List[OperatorType] # List of registered operators def __init__(self, device: Device) -> None: """ Args: device (Device): Device this slot belongs to. """ self.__device = device self.__operators = [] @property def device(self) -> Device: """Device this operator slot belongs to. Returns: Device: Handle to the device. """ return self.__device def operator_index(self, operator: OperatorType) -> int: """Index of an operator within this slot. Returns: index (int): The `operator`'s index. Raises: ValueError: If the `operator` is not registered at this slot. """ return self.__operators.index(operator) def add(self, operator: OperatorType) -> None: """Add a new operator to this slot. Args: operator (OperatorType): Operator to be added to this slot. Raises: RuntimeError: If the `operator` is already registered at this slot. """ # Do nothing if the operator is already registered if operator in self.__operators: return self.__operators.append(operator) operator.slot = self def remove(self, operator: OperatorType) -> None: """Remove an operator from this slot. Args: operator (OperatorType): Handle to the operator to be removed. """ if operator in self.__operators: self.__operators.remove(operator) # Only detach from the operator's slot if it matches to this signal # This is required to prevent duplex operators from accidentally detaching themselves if isinstance(operator.slot, type(self)): operator.slot = None def registered(self, operator: OperatorType) -> bool: """Check if an operator is registered at this slot. Returns: bool: Boolean indicating the registration status. """ return operator in self.__operators @property def num_operators(self) -> int: """Number of operators on this slot. Returns: int: Number of operators. """ return len(self.__operators) @property def max_sampling_rate(self) -> float: """Maximum requested sampling rate of all operators. Returns: The maximum sampling rate in Hz. """ sampling_rate = 0.0 for operator in self.__operators: sampling_rate = max(sampling_rate, operator.sampling_rate) return sampling_rate @property def min_frame_duration(self) -> float: """The mininum frame duration requried to process at least a single frame for each operator. Returns: The minimum frame duration in seconds. """ frame_duration = 0.0 for operator in self.__operators: frame_duration = max(frame_duration, operator.frame_duration) return frame_duration @property def min_num_samples_per_frame(self) -> int: """The number of samples required to collect at last a single frame for each operator. Returns: Number of samples. """ sampling_rate = self.max_sampling_rate frame_duration = self.min_frame_duration return ceil(frame_duration * sampling_rate) @overload def __getitem__(self, item: int) -> OperatorType: ... # pragma: no cover @overload def __getitem__(self, item: slice) -> Sequence[OperatorType]: ... # pragma: no cover def __getitem__(self, item: int | slice) -> OperatorType | Sequence[OperatorType]: return self.__operators[item] def __iter__(self) -> Iterator[OperatorType]: """Iterating over operator slots returns the iterator over the slots.""" return self.__operators.__iter__() def __contains__(self, operator: object) -> bool: """Contains is just a convenient mask for registered. Returns: Registration indicator. """ return operator in self.__operators def __len__(self) -> int: return self.num_operators TransmissionType = TypeVar("TransmissionType", bound="Transmission") """Type variable of a :class:`Transmission`."""
[docs] class Transmitter(Generic[TransmissionType], RandomNode, MixingOperator["TransmitterSlot"]): """Operator transmitting over a device.""" __transmission: TransmissionType | None __selected_transmit_ports: Sequence[int] | None def __init__( self, seed: int | None = None, selected_transmit_ports: Sequence[int] | None = None, *args, **kwargs, ) -> None: """ Args: seed (int, optional): Random seed used to initialize the pseudo-random number generator. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. *args: Operator base class initialization parameters. \**kwargs: Operator base class initialization parameters. """ # Initialize operator base class RandomNode.__init__(self, seed=seed) MixingOperator.__init__(self, *args, **kwargs) # Initialize class attributes self.__selected_transmit_ports = selected_transmit_ports self.__transmission = None
[docs] def transmit(self, duration: float = 0.0, cache: bool = True) -> TransmissionType: """Transmit a signal. Registers the signal samples to be transmitted by the underlying device. Wrapper around the abstract :meth:`_transmit<Transmitter._transmit>` method. Args: duration (float, optional): Duration of the transmitted signal. If not specified, the duration will be inferred by the transmitter. cache (bool, optional): Cache the transmission by the transmitter. Enabled by default. Returns: Information generated while transmitting over this operator. Raises: FloatingError: If the transmitter is currently considered floating. """ transmission = self._transmit(duration) if cache: self.__transmission = transmission return transmission
@abstractmethod def _transmit(self, duration: float) -> TransmissionType: """Generate information to be transmitted. Subroutine of the public :meth:`transmit<Transmitter.transmit>` method that performs the pipeline-specific transmit-processing and consolidates the generated information into a single :class:`Transmission` object. Args: duration (float, optional): Duration of the transmitted signal in seconds. If not specified, the duration of a single frame will be assumed. Returns: Information to be transmitted. """ ... # pragma: no cover @property def transmission(self) -> TransmissionType | None: """Recent transmission of the transmitter. Updated during the :meth:`transmit<Transmitter.transmit>` routine. :py:obj:`None` if the cache has been cleared or :meth:`transmit<Transmitter.transmit>` has not been called yet. """ return self.__transmission
[docs] def cache_transmission(self, transmission: TransmissionType) -> None: """Update the information resulting from the latest transmission. Called by the :meth:`transmit<Transmitter.transmit>` routine. Args: transmission (TransmissionType): The transmission to be cached. """ self.__transmission = transmission
def recall_transmission(self, group: Group, cache: bool = True) -> TransmissionType: """Recall a specific transmission from a serialization. Internally calls the abstract method :meth:`Transmitter._recall_transmission`. Caches the transmission if the respective flag is enabled. Args: group (Group): HDF group containing the transmission. cache (bool, optional): Cache the transmission. Enabled by default. Returns: The recalled transmission :meta private: """ recalled_transmission = self._recall_transmission(group) if cache: self.cache_transmission(recalled_transmission) return recalled_transmission @abstractmethod def _recall_transmission(self, group: Group) -> TransmissionType: """Recall a specific transmission from a serialization. Args: group (Group): HDF group containing the transmission. Returns: The recalled transmission """ ... # pragma: no cover @property def selected_transmit_ports(self) -> Sequence[int] | None: """Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If `None`, all available transmit ports will be considered. Raises: ValueError: If the selected ports don't match the configured device's transmit antenna array configuration. """ return self.__selected_transmit_ports @selected_transmit_ports.setter def selected_transmit_ports(self, value: Sequence[int] | None) -> None: if value is None: self.__selected_transmit_ports = None return # Make sure the provided indices match the underlying device's antenna array configuration if self.device is not None: if max(value) > (self.device.antennas.num_transmit_ports - 1): raise ValueError( "Transmit antenna selection indices don't match the device's receive antenna configuration" ) self.__selected_transmit_ports = value @property def num_transmit_ports(self) -> int: """Number of considered transmitting ports.""" if self.selected_transmit_ports is None: if self.device is None: return 0 return self.device.antennas.num_transmit_ports return len(self.selected_transmit_ports) @property def num_transmit_antennas(self) -> int: """Number of considered receiving antennas.""" if self.device is None: return 0 if self.selected_transmit_ports is None: return self.device.antennas.num_receive_antennas return self.device.antennas.count_transmit_antennas(self.selected_transmit_ports) @Operator.device.setter # type: ignore def device(self, value: Device) -> None: # type: ignore value.transmitters.add(self) @Operator.slot.setter # type: ignore def slot(self, value: Optional[TransmitterSlot]) -> None: Operator.slot.fset(self, value) # type: ignore self.random_mother = None if value is None else value.device
class TransmitterSlot(OperatorSlot[Transmitter]): """Slot for transmitting operators within devices.""" # Next signals to be transmitted __transmissions: List[Optional[Transmission]] def __init__(self, *args, **kwargs) -> None: """ Args: *args: OperatorSlot base class initialization parameters. **kwargs: OperatorSlot base class initialization parameters. """ self.__transmissions = [] # Init base class OperatorSlot.__init__(self, *args, **kwargs) def add(self, operator: Transmitter) -> None: OperatorSlot.add(self, operator) self.__transmissions.append(None) def add_transmission(self, transmitter: Transmitter, transmission: Transmission) -> None: """Add a transmitter's most recent transmission. Args: transmitter(Transmitter): The transmitter emitting the signal. transmission (OperatorTransmission): The information to be transmitted. Raises: ValueError: If the transmitter is not registered at this slot. """ if not self.registered(transmitter): raise ValueError("Transmitter not registered at this slot") if transmission.signal.num_streams != transmitter.num_transmit_ports: raise ValueError( f"Transmitted signal has invalid number of streams ({transmission.signal.num_streams} instead of {transmitter.num_transmit_ports})" ) self.__transmissions[transmitter.slot_index] = transmission def get_transmissions(self, clear_cache: bool = True) -> List[Transmission]: """Get recent transmissions. Returns: A list of recent operator transmissions. """ transmissions: List[Transmission] = self.__transmissions.copy() # Clear the cache if the respective flag is enabled if clear_cache: self.clear_cache() return transmissions def clear_cache(self) -> None: """Clear the cached transmission of all registered transmit operators.""" self.__transmissions = [None] * self.num_operators class ReceiverSlot(OperatorSlot[Receiver]): """Slot for receiving operators within devices.""" def __init__(self, *args, **kwargs) -> None: """ Args: *args: OperatorSlot base class initialization parameters. **kwargs: OperatorSlot base class initialization parameters. """ # Init base class OperatorSlot.__init__(self, *args, **kwargs) class UnsupportedSlot(OperatorSlot): """Slot for unsupported operations within devices.""" def add(self, operator: Operator) -> None: raise RuntimeError("Slot not supported by this device")
[docs] class Device(ABC, Transformable, RandomNode, Serializable): """Physical device representation within HermesPy. It acts as the basis for all transmissions and receptions of sampled electromagnetic signals. """ transmitters: TransmitterSlot """Transmitters broadcasting signals over this device.""" receivers: ReceiverSlot """Receivers capturing signals from this device""" __power: float # Average power of the transmitted signals def __init__( self, power: float = 1.0, pose: Transformation | None = None, seed: Optional[int] = None ) -> None: """ Args: power (float, optional): Average power of the transmitted signals in Watts. 1.0 Watt by default. pose (Transformation, optional): Pose of the device with respect to its scenario coordinate system origin. seed (int, optional): Random seed used to initialize the pseudo-random number generator. """ # Initialize base classes RandomNode.__init__(self, seed=seed) Transformable.__init__(self, pose=pose) # Initalize device attributes and properties self.transmitters = TransmitterSlot(self) self.receivers = ReceiverSlot(self) self.power = power @property @abstractmethod def antennas(self) -> AntennaArray: """Model of the device's antenna array.""" ... # pragma: no cover @property def num_transmit_ports(self) -> int: """Number of available transmit antenna ports. Shorthand to :attr:`antennas'<.antennas>` :attr:`num_transmit_ports<AntennaArray.num_transmit_ports>`. """ return self.antennas.num_transmit_ports @property def num_receive_ports(self) -> int: """Number of available receive antenna ports. Short hand to :attr:`antennas'<.antennas>` :attr:`num_receive_ports<AntennaArray.num_receive_ports>`. """ return self.antennas.num_receive_ports @property def num_transmit_antennas(self) -> int: """Number of available transmit antennas. Shorthand to :attr:`antennas'<antennas>` :meth:`num_transmit_antennas<AntennaArrayBase.num_transmit_antennas>`. """ return self.antennas.num_transmit_antennas @property def num_receive_antennas(self) -> int: """Number of available receive antennas. Shorthand to :attr:`antennas'<antennas>` :meth:`num_receive_antennas<AntennaArrayBase.num_receive_antennas>`. """ return self.antennas.num_receive_antennas @property def power(self) -> float: """Average power of the transmitted signal signal. Returns: power (float): Power in Watt. Raises: ValueError: If `value` is smaller than zero. """ return self.__power @power.setter def power(self, value: float) -> None: """Set the average power of the transmitted signal.""" if value < 0.0: raise ValueError("Average signal power must be greater or equal to zero") self.__power = value @property def snr(self) -> float: """Signal to noise ratio of the received signal. Returns: Linear signal to noise ratio """ # By default, we have no knowledge about the snr and assum no noise return float("inf") @property def num_antennas(self) -> int: """Number of antennas within this device's topology. Returns: int: Number of antennas, greater or equal to one. """ return self.antennas.num_antennas @property def max_frame_duration(self) -> float: """Maximum frame duration transmitted by this device. Returns: max_duration (float): Maximum duration in seconds. """ max_duration = 0.0 for operator in chain(self.transmitters, self.receivers): max_duration = max(max_duration, operator.frame_duration) # type: ignore return max_duration @property @abstractmethod def carrier_frequency(self) -> float: """Central frequency of the device's emissions in the RF-band. Returns: frequency (float): Carrier frequency in Hz. Raises: ValueError: On negative carrier frequencies. """ ... # pragma: no cover @property def wavelength(self) -> float: """Central wavelength of the device's emmissions in the RF-band. Returns: wavelength (float): Wavelength in m. Raises: ValueError: On wavelengths smaller or equal to zero. """ return speed_of_light / self.carrier_frequency @property @abstractmethod def sampling_rate(self) -> float: """Sampling rate at which the device's analog-to-digital converters operate. Returns: sampling_rate (float): Sampling rate in Hz. Raises: ValueError: If the sampling rate is not greater than zero. """ ... # pragma: no cover @property @abstractmethod def velocity(self) -> np.ndarray: """Cartesian device velocity vector. Returns: np.ndarray: Velocity vector. Raises: ValueError: If `velocity` is not three-dimensional. NotImplementedError: If `velocity` is unknown. """ ... # pragma: no cover
[docs] def transmit_operators(self) -> List[Transmission]: """Generate transmitted information for all registered operators. Calls :meth:`Transmitter.transmit` for each transmit operator. Returns: List of operator transmisisons. """ return [transmitter.transmit() for transmitter in self.transmitters]
[docs] def generate_output( self, operator_transmissions: Optional[List[Transmission]] = None, resample: bool = True ) -> DeviceOutput: """Generate the device's output. Args: operator_transmissions (List[Transmissions], optional): List of operator transmissions from which to generate the output. If the `operator_transmissions` are not provided, the transmitter's caches will be queried. Returns: The device's output. Raises: RuntimeError: If no `operator_transmissions` were provided and an operator has no cached transmission. """ # Generate operator transmissions if None were provided: operator_transmissions = ( [o.transmission for o in self.transmitters] if operator_transmissions is None else operator_transmissions ) operator_streams = [o.selected_transmit_ports for o in self.transmitters] if any([t is None for t in operator_transmissions]): raise RuntimeError( "Error trying to generate outputs without specifying transmissions, caches are empty" ) # Superimpose the operator transmissions to the device's RF configuration superimposed_signal = Signal.empty( self.sampling_rate, self.num_transmit_ports, carrier_frequency=self.carrier_frequency ) for transmission, indices in zip(operator_transmissions, operator_streams): if transmission is not None: superimposed_signal.superimpose( transmission.signal, resample=resample, stream_indices=indices ) return DeviceOutput(superimposed_signal)
[docs] def transmit(self, clear_cache: bool = True) -> DeviceTransmission: """Transmit over this device. Args: clear_cache (bool, optional): Clear the cache of operator signals to be transmitted. Enabled by default. Returns: Information transmitted by this device. """ operator_transmissions = self.transmit_operators() device_output = self.generate_output(operator_transmissions) return DeviceTransmission.From_Output(device_output, operator_transmissions)
[docs] def cache_transmission(self, transmission: DeviceTransmission) -> None: for transmitter, operator_transmission in zip( self.transmitters, transmission.operator_transmissions ): transmitter.cache_transmission(operator_transmission)
[docs] def process_input( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], cache: bool = True ) -> ProcessedDeviceInput: """Process input signals impinging onto this device. Args: impinging_signals (Union[DeviceInput, Signal, Sequence[Signal]]): The samples to be processed by the device. cache (bool, optional): Cache the operator inputs at the registered receive operators for further processing. Enabled by default. Returns: The processed device input information. Raises: ValueError: If the number of signal streams does not match the device configuration expectations. """ # Convert argument impinging_signals = ( [impinging_signals] if isinstance(impinging_signals, Signal) else impinging_signals ) impinging_signals = ( impinging_signals.impinging_signals if isinstance(impinging_signals, DeviceInput) else impinging_signals ) # Superimpose the impinging signal models if len(impinging_signals) != 1: superimposed_signal = Signal.empty( self.sampling_rate, self.antennas.num_receive_antennas, carrier_frequency=self.carrier_frequency, ) for signal in impinging_signals: superimposed_signal.superimpose(signal) else: superimposed_signal = impinging_signals[0] # Each operator is fed the superimposed signal by default operator_inputs: List[Signal] = [] for receiver in self.receivers: selected_receive_antennas = receiver.selected_receive_ports stream_selector = ( slice(None) if selected_receive_antennas is None else selected_receive_antennas ) stream_samples = superimposed_signal.samples[stream_selector, :] # type: ignore operator_input = Signal( stream_samples, superimposed_signal.sampling_rate, superimposed_signal.carrier_frequency, ) operator_inputs.append(operator_input) # Cache the operator inputs if the respective flag is enabled if cache: for receiver, input in zip(self.receivers, operator_inputs): receiver.cache_reception(input) return ProcessedDeviceInput(impinging_signals, operator_inputs)
[docs] def receive_operators( self, operator_inputs: ProcessedDeviceInput | Sequence[Signal | None] | None = None, cache: bool = True, ) -> List[Reception]: """Receive over the registered operators. Args: operator_inputs (ProcessedDeviceInput | Sequence[Signal | None], optional): Signal models fed to the receive operators. If not provided, the operatores are expected to have inputs cached cache (bool, optional): Cache the generated received information at the device's receive operators. Enabled by default. Returns: List of information generated by receiving over the device's operators. Raises: ValueError: If the number of operator inputs does not match the number of receive operators. RuntimeError: If no operator inputs were specified and an operator has no cached inputs. """ _operator_inputs: Sequence[Signal | None] # Refer to the operator caches if no inputs were specified if operator_inputs is None: _operator_inputs = [o.signal for o in self.receivers] # Convert object oriented input elif isinstance(operator_inputs, ProcessedDeviceInput): _operator_inputs = operator_inputs.operator_inputs else: _operator_inputs = operator_inputs if len(_operator_inputs) != self.receivers.num_operators: raise ValueError( f"Number of operator inputs ({len(_operator_inputs)}) does not match the number of receive operators ({self.receivers.num_operators})" ) # Generate receive information receptions: List[Reception] = [] for operator, signal in zip(self.receivers, _operator_inputs): reception = operator.receive(signal, cache) receptions.append(reception) return receptions
[docs] def receive( self, impinging_signals: DeviceInput | Signal | Sequence[Signal], cache: bool = True ) -> DeviceReception: """Receive over this device. Internally calls :meth:`Device.process_input` and :meth:`Device.receive_operators`. Args: impinging_signals (Union[DeviceInput, Signal, Sequence[Signal]]): The samples to be processed by the device. cache (bool, optional): Cache the received information. Enabled by default. Returns: The received device information. """ # Process input processed_input = self.process_input(impinging_signals) # Generate receptions receptions = self.receive_operators(processed_input.operator_inputs, cache) # Generate device reception return DeviceReception.From_ProcessedDeviceInput(processed_input, receptions)