# -*- coding: utf-8 -*-
"""
=====================
Physical Device Dummy
=====================
Implements a physical device dummy for testing and demonstration purposes.
"""
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
from hermespy.core import DeviceInput, Serializable, Signal
from hermespy.simulation import (
NoiseLevel,
NoiseModel,
ProcessedSimulatedDeviceInput,
SimulatedAntennaArray,
SimulatedDevice,
SimulatedDeviceOutput,
SimulatedDeviceReception,
SimulatedDeviceTransmission,
SimulationScenario,
TriggerRealization,
)
from .physical_device import PhysicalDevice
from .scenario import PhysicalScenario
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class PhysicalDeviceDummy(SimulatedDevice, PhysicalDevice, Serializable):
"""Physical device dummy for testing and demonstration.
The physical device dummy always receives back its most recent transmission.
"""
yaml_tag = "PhysicalDeviceDummy"
__receive_transmission: bool
__uploaded_signal: Signal
__downloaded_signal: Signal
def __init__(
self,
max_receive_delay: float = 0.0,
antennas: SimulatedAntennaArray | None = None,
noise_power: np.ndarray | None = None,
receive_transmission: bool = True,
**kwargs,
) -> None:
# Initialize base classes
SimulatedDevice.__init__(self, antennas=antennas, **kwargs)
PhysicalDevice.__init__(self, max_receive_delay=max_receive_delay, noise_power=noise_power)
# Initialize internal state
self.receive_transmission = receive_transmission
self.__uploaded_signal = Signal.Empty(1.0, self.num_antennas)
self.__downloaded_signal = Signal.Empty(1.0, self.num_antennas)
@property
def receive_transmission(self) -> bool:
"""Whether the device receives back its own transmission."""
return self.__receive_transmission
@receive_transmission.setter
def receive_transmission(self, value: bool) -> None:
self.__receive_transmission = value
def _upload(self, signal: Signal) -> None:
self.__uploaded_signal = signal
def _download(self) -> Signal:
return self.__downloaded_signal
[docs]
def transmit(
self, cache: bool = True, trigger_realization: TriggerRealization | None = None
) -> SimulatedDeviceTransmission:
# Generate device transmission
device_transmission = SimulatedDevice.transmit(self, cache, trigger_realization)
# Upload mixed signal
self._upload(device_transmission.mixed_signal)
return device_transmission
[docs]
def receive(
self,
impinging_signals: (
DeviceInput | Signal | Sequence[Signal] | SimulatedDeviceOutput | None
) = None,
*args,
**kwargs,
) -> SimulatedDeviceReception:
if impinging_signals is None:
impinging_signals = self._download()
return SimulatedDevice.receive(self, impinging_signals, *args, **kwargs)
[docs]
def trigger(self) -> None:
if self.receive_transmission:
self.__downloaded_signal = self.__uploaded_signal
else:
samples = np.zeros(self.__uploaded_signal.shape)
self.__downloaded_signal = Signal.Create(
samples, self.sampling_rate, self.carrier_frequency
)
[docs]
def trigger_direct(self, signal: Signal, calibrate: bool = True) -> Signal:
if self.receive_transmission:
input = signal
else:
input = Signal.Create(
np.zeros(
(self.antennas.num_receive_antennas, signal.num_samples), dtype=np.complex_
),
self.sampling_rate,
self.carrier_frequency,
)
# Apply the simulation receive model
leaking_signal = self.isolation.leak(signal)
processed_input = self.process_input(input, False, leaking_signal=leaking_signal)
baseband_signal = processed_input.baseband_signal
# Apply correction routines if calibrations are available
corrected_signal = (
baseband_signal
if not calibrate or self.leakage_calibration is None
else self.leakage_calibration.remove_leakage(
signal, baseband_signal, self.delay_calibration.delay
)
)
return corrected_signal
@property
def max_sampling_rate(self) -> float:
return self.sampling_rate
[docs]
class PhysicalScenarioDummy(
SimulationScenario, PhysicalScenario[PhysicalDeviceDummy], Serializable
):
"""Physical scenario for testing and demonstration."""
yaml_tag = "PhysicalScenarioDummy"
def __init__(
self, seed: int | None = None, devices: Sequence[PhysicalDeviceDummy] | None = None
) -> None:
# Initialize base classes
SimulationScenario.__init__(self, seed=seed, devices=devices)
PhysicalScenario.__init__(self, seed=seed, devices=devices)
[docs]
def new_device(self, *args, **kwargs) -> PhysicalDeviceDummy:
device = PhysicalDeviceDummy(*args, **kwargs)
self.add_device(device)
return device
[docs]
def add_device(self, device: SimulatedDevice | PhysicalDeviceDummy) -> None:
# Adding a device resolves to the simulation scenario's add device method
SimulationScenario.add_device(self, device)
[docs]
def receive_devices(
self,
impinging_signals: (
Sequence[DeviceInput] | Sequence[Signal] | Sequence[Sequence[Signal]] | None
) = None,
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
if impinging_signals is None:
physical_device_receptions = PhysicalScenario.receive_devices(self, None, cache)
impinging_signals = [r.impinging_signals for r in physical_device_receptions]
return SimulationScenario.receive_devices(
self, impinging_signals, cache, trigger_realizations
)
def _trigger(self) -> None:
# Triggering is equivalent to generating a new simulation drop
SimulationScenario.drop(self) # type: ignore