# -*- coding: utf-8 -*-
"""
=====================
Physical Device Dummy
=====================
Implements a physical device dummy for testing and demonstration purposes.
"""
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
from hermespy.core import DeviceInput, DeviceState, Serializable, Signal, Transmission
from hermespy.simulation import (
NoiseLevel,
NoiseModel,
ProcessedSimulatedDeviceInput,
SimulatedAntennaArray,
SimulatedDevice,
SimulatedDeviceOutput,
SimulatedDeviceReception,
SimulatedDeviceState,
SimulatedDeviceTransmission,
SimulationScenario,
TriggerRealization,
)
from .physical_device import PhysicalDevice, PhysicalDeviceState
from .scenario import PhysicalScenario
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class PhysicalDeviceDummyState(SimulatedDeviceState, PhysicalDeviceState):
... # pragma: no cover
[docs]
class PhysicalDeviceDummy(SimulatedDevice, PhysicalDevice, Serializable):
"""Physical device dummy for testing and demonstration.
The physical device dummy always receives back its most recent transmission.
"""
yaml_tag = "PhysicalDeviceDummy"
__receive_transmission: bool
__uploaded_signal: Signal
__downloaded_signal: Signal
def __init__(
self,
max_receive_delay: float = 0.0,
antennas: SimulatedAntennaArray | None = None,
noise_power: np.ndarray | None = None,
receive_transmission: bool = True,
**kwargs,
) -> None:
# Initialize base classes
SimulatedDevice.__init__(self, antennas=antennas, **kwargs)
PhysicalDevice.__init__(self, max_receive_delay=max_receive_delay, noise_power=noise_power)
# Initialize internal state
self.receive_transmission = receive_transmission
self.__uploaded_signal = Signal.Empty(1.0, self.num_antennas)
self.__downloaded_signal = Signal.Empty(1.0, self.num_antennas)
[docs]
def state(self, timestamp: float = 0.0) -> PhysicalDeviceDummyState:
trajectory_sample = self.trajectory.sample(timestamp)
return PhysicalDeviceDummyState(
id(self),
trajectory_sample,
self.carrier_frequency,
self.sampling_rate,
self.num_digital_transmit_ports,
self.num_digital_receive_ports,
self.antennas.state(trajectory_sample.pose),
)
@property
def receive_transmission(self) -> bool:
"""Whether the device receives back its own transmission."""
return self.__receive_transmission
@receive_transmission.setter
def receive_transmission(self, value: bool) -> None:
self.__receive_transmission = value
def _upload(self, signal: Signal) -> None:
self.__uploaded_signal = signal
def _download(self) -> Signal:
return self.__downloaded_signal
[docs]
def transmit(
self,
state: PhysicalDeviceDummyState | SimulatedDeviceState | None = None,
notify: bool = True,
trigger_realization: TriggerRealization | None = None,
) -> SimulatedDeviceTransmission:
# Generate device transmission
device_transmission = SimulatedDevice.transmit(self, state, notify, trigger_realization)
# Upload mixed signal
self._upload(device_transmission.mixed_signal)
return device_transmission
[docs]
def receive(
self,
impinging_signals: (
DeviceInput | Signal | Sequence[Signal] | SimulatedDeviceOutput | None
) = None,
state: PhysicalDeviceDummyState | SimulatedDeviceState | None = None,
notify: bool = True,
trigger_realization: TriggerRealization | None = None,
) -> SimulatedDeviceReception:
if impinging_signals is None:
impinging_signals = self._download()
return SimulatedDevice.receive(self, impinging_signals, state, notify, trigger_realization)
[docs]
def trigger(self) -> None:
if self.receive_transmission:
self.__downloaded_signal = self.__uploaded_signal
else:
samples = np.zeros(self.__uploaded_signal.shape)
self.__downloaded_signal = Signal.Create(
samples, self.sampling_rate, self.carrier_frequency
)
[docs]
def trigger_direct(self, signal: Signal, calibrate: bool = True) -> Signal:
if self.receive_transmission:
input = signal
else:
input = Signal.Create(
np.zeros(
(self.antennas.num_receive_antennas, signal.num_samples), dtype=np.complex128
),
self.sampling_rate,
self.carrier_frequency,
)
# Apply the simulation receive model
leaking_signal = self.isolation.leak(signal)
processed_input = self.process_input(input, leaking_signals=leaking_signal)
baseband_signal = processed_input.baseband_signal
# Apply correction routines if calibrations are available
corrected_signal = (
baseband_signal
if not calibrate or self.leakage_calibration is None
else self.leakage_calibration.remove_leakage(signal, baseband_signal)
)
return corrected_signal
@property
def max_sampling_rate(self) -> float:
return self.sampling_rate
[docs]
class PhysicalScenarioDummy(
SimulationScenario, PhysicalScenario[PhysicalDeviceDummy], Serializable
):
"""Physical scenario for testing and demonstration."""
yaml_tag = "PhysicalScenarioDummy"
def __init__(
self, seed: int | None = None, devices: Sequence[PhysicalDeviceDummy] | None = None
) -> None:
# Initialize base classes
SimulationScenario.__init__(self, seed=seed, devices=devices)
PhysicalScenario.__init__(self, seed=seed, devices=devices)
[docs]
def new_device(self, *args, **kwargs) -> PhysicalDeviceDummy:
device = PhysicalDeviceDummy(*args, **kwargs)
self.add_device(device)
return device
[docs]
def add_device(self, device: SimulatedDevice | PhysicalDeviceDummy) -> None:
# Adding a device resolves to the simulation scenario's add device method
SimulationScenario.add_device(self, device)
[docs]
def receive_devices(
self,
impinging_signals: (
Sequence[DeviceInput] | Sequence[Signal] | Sequence[Sequence[Signal]] | None
) = None,
states: Sequence[DeviceState] | Sequence[SimulatedDeviceState] | None = None,
notify: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
leaking_signals: Sequence[Signal] | Sequence[Sequence[Signal]] | None = None,
) -> Sequence[SimulatedDeviceReception]:
if impinging_signals is None:
physical_device_receptions = PhysicalScenario.receive_devices(
self, impinging_signals, states, False
)
impinging_signals = [r.impinging_signals for r in physical_device_receptions]
return SimulationScenario.receive_devices(
self, impinging_signals, states, notify, trigger_realizations, leaking_signals # type: ignore
)
def _trigger(self) -> None:
# Triggering is equivalent to generating a new simulation drop
SimulationScenario.drop(self) # type: ignore
def _trigger_direct(
self,
transmissions: list[Signal],
devices: list[PhysicalDeviceDummy],
calibrate: bool = True,
timestamp: float = 0.0,
) -> list[Signal]:
# Realize triggers
triggers = self.realize_triggers(devices)
# Generate transmissions considering the hardware models
device_outputs = [
device.generate_output(
[Transmission(transmission)],
None,
True,
trigger,
[np.arange(device.num_digital_transmit_ports).tolist()],
)
for device, transmission, trigger in zip(devices, transmissions, triggers)
]
# Realize all channels
channel_realizations = self.realize_channels()
# Propgate over all channels
propagated_signals = np.empty((len(devices), len(devices)), dtype=object)
for n, (alpha_device, alpha_transmission) in enumerate(zip(devices, device_outputs)):
for m, (beta_device, beta_transmission) in enumerate(
zip(devices[n:], device_outputs[n:]), n
):
# Select the correct channel and its respective realization
channel_index = self.channels.index(self.channel(alpha_device, beta_device))
channel_realization = channel_realizations[channel_index]
# Sample the propagations, optimizing for reciprocal channels
alpha_sample = channel_realization.sample(
alpha_device,
beta_device,
timestamp,
alpha_transmission.carrier_frequency,
alpha_transmission.sampling_rate,
)
beta_sample = channel_realization.reciprocal_sample(
alpha_sample,
beta_device,
alpha_device,
timestamp,
beta_transmission.carrier_frequency,
beta_transmission.sampling_rate,
)
# Propagate
beta_reception = alpha_sample.propagate(alpha_transmission)
alpha_reception = beta_sample.propagate(beta_transmission)
# Store the impinging signals
propagated_signals[m, n] = beta_reception
propagated_signals[n, m] = alpha_reception
# Receive devices
received_base_band_signals = [
device.process_input(impinging_signals.tolist(), None, trigger).baseband_signal
for impinging_signals, device, trigger in zip(propagated_signals, devices, triggers)
]
return received_base_band_signals