Source code for hermespy.core.operators

# -*- coding: utf-8 -*-

from __future__ import annotations

import numpy as np
from h5py import Group

from .device import Transmission, Transmitter, Receiver, Reception
from .factory import Serializable
from .signal_model import Signal
from .state import ReceiveState, TransmitState

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class StaticOperator(object): """Base class for static device operators""" __num_samples: int # Number of samples per transmission __sampling_rate: float # Sampling rate of transmission def __init__(self, num_samples: int, sampling_rate: float) -> None: """ Args: num_samples (int): Number of samples per transmission. sampling_rate (float): Sampling rate of transmission. """ self.__num_samples = num_samples self.sampling_rate = sampling_rate @property def num_samples(self) -> int: """Number of samples per transmission. Returns: Number of samples. """ return self.__num_samples @property def sampling_rate(self) -> float: return self.__sampling_rate @sampling_rate.setter def sampling_rate(self, value: float) -> None: if value <= 0: raise ValueError(f"Sampling rate must be positive (not {value})") self.__sampling_rate = value @property def frame_duration(self) -> float: return self.__num_samples / self.sampling_rate
[docs] class SilentTransmitter(StaticOperator, Transmitter[Transmission], Serializable): """Silent transmitter mock.""" yaml_tag = "SilentTransmitter" serialized_attributes = {"num_samples", "sampling_rate"} def __init__(self, num_samples: int, sampling_rate: float, *args, **kwargs) -> None: """ Args: num_samples (int): Number of samples per transmission. sampling_rate (float): Sampling rate of transmission. """ # Init base classes StaticOperator.__init__(self, num_samples, sampling_rate) Transmitter.__init__(self, *args, **kwargs) @property def power(self) -> float: return 0.0 def _transmit(self, device: TransmitState, duration: float) -> Transmission: # Compute the number of samples to be transmitted num_samples = self.num_samples if duration <= 0.0 else int(duration * self.sampling_rate) silence = Signal.Create( np.zeros((device.num_digital_transmit_ports, num_samples), dtype=complex), sampling_rate=self.sampling_rate, carrier_frequency=device.carrier_frequency, ) return Transmission(silence) def _recall_transmission(self, group: Group) -> Transmission: return Transmission.from_HDF(group)
[docs] class SignalTransmitter(StaticOperator, Transmitter[Transmission], Serializable): """Custom signal transmitter.""" yaml_tag = "SignalTransmitter" __signal: Signal def __init__(self, signal: Signal, *args, **kwargs) -> None: """ Args: signal (Signal): Signal to be transmittered by the static operator for each transmission. """ # Init base classes StaticOperator.__init__(self, signal.num_samples, signal.sampling_rate) Transmitter.__init__(self, *args, **kwargs) # Init class attributes self.__signal = signal @property def power(self) -> float: return np.mean(self.signal.power) @property def signal(self) -> Signal: """Signal to be transmitted by the static operator for each transmission.""" return self.__signal @signal.setter def signal(self, value: Signal) -> None: self.__signal = value def _transmit(self, device: TransmitState, duration) -> Transmission: transmitted_signal = self.__signal.copy() # Update the transmitted signal's carrier frequency if it is specified as base-band if transmitted_signal.carrier_frequency == 0.0: transmitted_signal.carrier_frequency = device.carrier_frequency transmission = Transmission(transmitted_signal) return transmission def _recall_transmission(self, group: Group) -> Transmission: return Transmission.from_HDF(group)
[docs] class SignalReceiver(StaticOperator, Receiver[Reception], Serializable): """Custom signal receiver.""" yaml_tag = "SignalReceiver" serialized_attributes = {"num_samples", "sampling_rate"} __expected_power: float def __init__( self, num_samples: int, sampling_rate: float, expected_power: float = 0.0, *args, **kwargs ) -> None: # Initialize base classes StaticOperator.__init__(self, num_samples, sampling_rate) Receiver.__init__(self, *args, **kwargs) # Initialize class attributes if expected_power < 0.0: raise ValueError(f"Expected power must be non-negative (not {expected_power})") self.__expected_power = expected_power @property def energy(self) -> float: return self.__expected_power * self.num_samples @property def power(self) -> float: return self.__expected_power def _receive(self, signal: Signal, device: ReceiveState) -> Reception: received_signal = signal.resample(self.sampling_rate) return Reception(received_signal) def _recall_reception(self, group: Group) -> Reception: return Reception.from_HDF(group)