# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence
from enum import IntEnum
from itertools import chain
from os import path, remove
from typing import Generic, Type, TypeVar, Union
from h5py import File, Group
from .device import (
Device,
DeviceInput,
DeviceOutput,
DeviceReception,
DeviceState,
DeviceTransmission,
DeviceType,
DST,
ProcessedDeviceInput,
Reception,
Transmission,
Transmitter,
Receiver,
Operator,
)
from .drop import Drop, DropType
from .factory import Factory
from .random_node import RandomNode
from .signal_model import Signal
from .transformation import TransformableBase
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class ScenarioMode(IntEnum):
"""Current scenario mode."""
DEFAULT = 0
"""Default scenario state.
For configuration and generating drops.
"""
RECORD = 1
"""Recording scenario state.
For recording datasets.
"""
REPLAY = 2
"""Replay scenario state.
For replaying already recorded datasets.
"""
[docs]
class Scenario(ABC, RandomNode, TransformableBase, Generic[DeviceType, DST, DropType]):
"""A wireless scenario.
Scenarios consist of several devices transmitting and receiving electromagnetic signals.
Each device can be operated by multiple operators simultaneously.
"""
yaml_tag = "Scenario"
serialized_attributes = {"devices"}
@classmethod
def _arg_signature(cls: Type[Scenario]) -> set[str]:
return {"seed", "devices"}
__mode: ScenarioMode # Current scenario operating mode
__devices: list[DeviceType] # Registered devices within this scenario.
__drop_duration: float # Drop duration in seconds.
__file: File | None # HDF5 file handle
__drop_counter: int # Internal drop counter
__campaign: str # Measurement campaign name
__num_replayed_drops: int # Number of replayed drops
def __init__(
self, seed: int | None = None, devices: Sequence[DeviceType] | None = None
) -> None:
"""
Args:
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
devices (Sequence[Device], optional):
Devices to be added to the scenario during initialization.
"""
# Initialize base classes
RandomNode.__init__(self, seed=seed)
TransformableBase.__init__(self)
# Initialize attributes
self.__mode = ScenarioMode.DEFAULT
self.__devices = list()
self.drop_duration = 0.0
self.__file = None
self.__drop_counter = 0
self.__campaign = "default"
self.__num_replayed_drops = 0
# Add devices if specified
if devices is not None:
for device in devices:
self.add_device(device)
@property
def mode(self) -> ScenarioMode:
"""Current operating mode of the scenario.
Returns: Operating mode flag.
"""
return self.__mode
[docs]
def add_device(self, device: DeviceType) -> None:
"""Add a new device to the scenario.
Args:
device (Device):
New device to be added to the scenario.
Raises:
ValueError: If the device already exists.
RuntimeError: If the scenario is not in default mode.
"""
if self.device_registered(device):
raise ValueError("Error trying to add an already registered device to a scenario")
if self.mode != ScenarioMode.DEFAULT:
raise RuntimeError("Modifying a scenario is only allowed in default mode")
# Add device to internal device list
self.__devices.append(device)
# Register scenario at the device
device.random_mother = self
# Assign the scenario as the device's coordinate system base
device.set_base(self)
[docs]
def new_device(self, *args, **kwargs) -> DeviceType:
"""Add a new device to the scenario.
Convenience function pointing to :meth:`hermespy.core.scenario.Scenario.new_device`.
Returns: Handle to the created device.
Raises:
ValueError: If the device already exists.
RuntimeError: If the scenario is not in default mode.
RuntimeError: If the scenario does not allow for the creation or addition of new devices.
"""
raise RuntimeError(
"Error trying to create a new device within a scenario not supporting the operation"
)
[docs]
def device_registered(self, device: DeviceType) -> bool:
"""Check if an device is registered in this scenario.
Args:
device (Device): The device to be checked.
Returns:
bool: The device's registration status.
"""
return device in self.__devices
[docs]
def device_index(self, device: DeviceType) -> int:
"""Index of device
Args:
device (Device): Device for which to lookup the index.
Returns: The device index.
Raises:
ValueError: If `device` is not registered in this scenario.
"""
if not self.device_registered(device):
raise ValueError("Device not registered")
return self.devices.index(device)
@property
def devices(self) -> list[DeviceType]:
"""Devices registered in this scenario.
Returns: list of devices.
"""
return self.__devices.copy()
@property
def num_devices(self) -> int:
"""Number of devices in this scenario.
Returns:
int: Number of devices
"""
return len(self.__devices)
@property
def transmitters(self) -> list[Transmitter]:
"""All transmitting operators within this scenario.
Returns:
list[Transmitter]: list of all transmitting operators.
"""
transmitters: list[Transmitter] = []
for device in self.__devices:
transmitters.extend(device.transmitters)
return transmitters
@property
def receivers(self) -> list[Receiver]:
"""All receiving operators within this scenario.
Returns:
list[Receiver]: list of all transmitting operators.
"""
receivers: list[Receiver] = []
for device in self.__devices:
receivers.extend(device.receivers)
return receivers
@property
def num_receivers(self) -> int:
"""Number of receiving operators within this scenario.
Returns:
int: The number of receivers.
"""
num = 0
for device in self.__devices:
num += device.receivers.num_operators
return num
@property
def num_transmitters(self) -> int:
"""Number of transmitting operators within this scenario.
Returns:
int: The number of transmitters.
"""
num = 0
for device in self.__devices:
num += device.transmitters.num_operators
return num
@property
def operators(self) -> set[Operator]:
"""All operators within this scenario.
Returns: A set containing all unique operators within this scenario
"""
operators: set[Operator] = set()
# Iterate over all devices and collect operators
for device in self.devices:
for operator in chain(device.transmitters, device.receivers):
operators.add(operator) # type: ignore
return operators
@property
def num_operators(self) -> int:
"""Number of operators within this scenario.
Returns:
int: The number of operators.
"""
num = 0
for device in self.__devices:
num += device.transmitters.num_operators + device.receivers.num_operators
return num
@property
def drop_duration(self) -> float:
"""The scenario's default drop duration in seconds.
If the drop duration is set to zero, the property will return the maximum frame duration
over all registered transmitting modems as drop duration!
Returns:
float: The default drop duration in seconds.
Raises:
ValueError: For durations smaller than zero.
RuntimeError: If the scenario is not in default mode.
"""
# Return the largest frame length as default drop duration
if self.__drop_duration == 0.0:
duration = 0.0
for device in self.__devices:
duration = max(duration, device.max_frame_duration)
return duration
else:
return self.__drop_duration
@drop_duration.setter
def drop_duration(self, value: float) -> None:
if value < 0.0:
raise ValueError("Drop duration must be greater or equal to zero")
if self.mode != ScenarioMode.DEFAULT:
raise RuntimeError("Modifying scenario parameters is only allowed in default mode")
self.__drop_duration = value
@property
def campaign(self) -> str:
"""Measurement campaign identifier.
If not specified, the scenario will assume the campaign name to be `default`.
Returns:
Name of the current measurement campaign.
Raises:
ValueError: If in replay mode and the requested campaign name is not available.
"""
return self.__campaign
@campaign.setter
def campaign(self, value: str) -> None:
# Do nothing if value matches the current campaign
if value == self.__campaign:
return
# If in replay mode, make sure the campaign exists
if self.mode == ScenarioMode.REPLAY:
if not self.__campaign_exists(value, self.__file):
raise ValueError(
f"The requested measurement campaign '{value}' does not exists within the currently replayed savefile"
)
self.__drop_counter = 0
elif self.mode == ScenarioMode.RECORD:
# Create the campaign if it doesn't exists
if not self.__campaign_exists(value, self.__file):
self.__file.create_group("/campaigns/" + value)
self.__drop_counter = self.__file["/campaigns/" + value].len
# Update the campaign identifier
self.__campaign = value
def _state_to_HDF(self, factory: Factory, group: Group) -> None:
"""Serialize the scenario's state to an HDF5 group.
Args:
factory (Factory):
Reference to the serialization factory.
group (Group):
Reference to an empty HDF5 group.
"""
# Serialize required attributes
group.attrs["num_devices"] = self.num_devices
group.attrs["num_operators"] = self.num_operators
# Serialize device states
for d, device in enumerate(self.devices):
group.attrs[f"device_{d:02d}"] = factory.to_str(device)
# Serialize operator states
for o, operator in enumerate(self.operators):
group.attrs[f"operator_{o:02d}"] = factory.to_str(operator)
# Serialize full state
group.attrs["state"] = factory.to_str(
{"devices": self.devices, "operators": self.operators}
)
@classmethod
def _state_from_HDF(cls: Type[Scenario], factory: Factory, group: Group) -> Scenario:
# Initialize class
scenario = cls()
# Recall serialization
state: dict = factory.from_str(group.attrs["state"]) # type: ignore
# Add devices to the scenario
device: DeviceType
for device in state["devices"]:
scenario.add_device(device)
# Return initialize scenario
return scenario
[docs]
def record(
self,
file: str,
overwrite: bool = False,
campaign: str = "default",
state: Scenario | None = None,
serialize_state: bool = True,
) -> None:
"""Start recording drop information generated from this scenario.
After the scenario starts recording, changing the device and operator configuration
is not permitted.
Args:
file (str):
The system path where to store the generated recording data.
overwrite (bool, optional):
Overwrite the file if a recording already exists.
Disabled by default.
campaign (str, optional):
Name of the measurement campaign.
state (scenario, optional):
Scenario to be used for state serialization.
By default, this scenario is assumed.
serialize_state (bool, optional):
Serialize the scenario state to the recording.
Enabled by default.
Raises:
RuntimeError: If the scenario is not in default mode.
"""
if self.mode != ScenarioMode.DEFAULT:
raise RuntimeError(
"Initialize a recording is only possible in default mode. Please stop before starting a new recording."
)
# Check wether the specified file already exists within the filesystem
file_exists = path.exists(file)
# Remove the existing file if the overwrite flag is enabled
if overwrite and file_exists:
remove(file)
file_exists = False
# Compute drop duration
drop_duration = self.drop_duration
# Initialize dataset
file_mode = "w-" if overwrite else "a"
self.__file = File(file, file_mode)
self.__drop_counter = 0
self.__campaign = campaign
# Switch mode
self.__mode = ScenarioMode.RECORD
# Write required attributes
self.__file.attrs["drop_duration"] = drop_duration
# Write required groups
if "/campaigns" not in self.__file:
self.__file.create_group("/campaigns")
# Write scenario state to the dataset for easy recollection
# Future feature: Write a locking mechanism during recording
if serialize_state:
if "/state" not in self.__file:
self.__file.create_group("state")
factory = Factory()
if state is None:
self._state_to_HDF(factory, self.__file["/state"])
else:
state._state_to_HDF(factory, self.__file["/state"])
# Write meta-information
self.__file.attrs["hermes_version"] = __version__
self.__file.attrs["hermes_status"] = __status__
# Update the campaign, will create the respective group if it doesn't exist yet
self.campaign = campaign
def __campaign_exists(self, campaign: str, file: File) -> bool:
"""Check whether a campaign identifier exists within the current dataset.
Args:
campaign (str):
The campaign identifier string.
file (File):
The HDF5 file to check for campaign existence.
Returns: Boolean indicator.
Raises:
RuntimeError: If the scenario is currently in default mode and `file` was not specified.
"""
return "/campaigns/" + campaign in file
[docs]
def replay(self, file: str | File | None = None, campaign: str = "default") -> None:
"""Replay the scenario from and HDF5 savefile.
Args:
file (Union[None, str, File], optional):
File from which the scenario should be replayed.
May be a file system location or an HDF5 `File` handle.
campaign (str, optional):
Identifier of the campaign to replay.
If not specified, the assumed campaign name is `default`.
Raises:
RuntimeError: If `file` is not specified and can't be inferred from previous record executions.
ValueError: If `campaign` is specified and is not contained within the savefile.
"""
if file is None:
if self.__file is None:
raise ValueError(
"A file location must be specified or the scenario most be in record or replay mode"
)
file = self.__file.filename
# If only a file system location was specified, open the file
_file = File(file, "r") if isinstance(file, str) else file
# Check if the campaign is available (if a campaign was specified)
if not self.__campaign_exists(campaign, _file):
filename = _file.filename
_file.close()
raise ValueError(
f"The requested measurement campaign '{campaign}' does not exists within the savefile '{filename}'"
)
# Stop any action and close file handles if required
self.stop()
# Initialize dataset
self.__file = _file
self.__drop_counter = 0
self.__campaign = campaign
self.__num_replayed_drops = len(_file["/campaigns/" + campaign])
# Switch mode
self.__mode = ScenarioMode.REPLAY
[docs]
@classmethod
def Replay(cls: Type[Scenario], file: Union[str, File], campaign: str = "default") -> Scenario:
"""Replay a scenario from an HDF5 save file.
Args:
file (str):
File system location of the HDF5 save file.
campaign (str, optional):
Identifier of the campaign to replay.
If not specified, the assumed campaign name is `default`.
"""
# Load the dataset
if isinstance(file, str):
file = File(file, "r")
# Recall the class state from the respective HDF5 group
factory = Factory()
scenario = cls._state_from_HDF(factory, file["state"])
# Enable the replay mode
scenario.replay(file, campaign)
# Return the scenario (initialized and in replay mode)
return scenario
[docs]
def stop(self) -> None:
"""Stop a running recording / playback session."""
# In default mode, nothing needs to be done
if self.mode == ScenarioMode.DEFAULT:
return
# Save the overall number of recorder drops if in record mode
if self.mode == ScenarioMode.RECORD:
self.__file.attrs["num_drops"] = self.__drop_counter
# Close HDF5 file handle properly
self.__file.close()
self.__file = None
self.__drop_counter = 0
# Reset the mode
self.__mode = ScenarioMode.DEFAULT
[docs]
def transmit_operators(
self, states: Sequence[DST] = None, notify: bool = True
) -> Sequence[Sequence[Transmission]]:
"""Generate information transmitted by all registered device operators.
Args:
states (Sequence[DST], optional):
States of the transmitting devices.
If not specified, the current device states will be queried by calling :meth:`Device.state`.
notify (bool, optional):
Notify the DSP layer's callbacks about the transmission results.
Enabled by default.
Returns:
The generated information sorted into devices and their respective operators.
"""
_states = [d.state() for d in self.devices] if states is None else states
transmissions = [
[o.transmit(s, 0.0, notify) for o in d.transmitters]
for d, s in zip(self.devices, _states)
]
return transmissions
[docs]
def generate_outputs(
self, transmissions: list[list[Transmission]], states: Sequence[DST | None] | None = None
) -> Sequence[DeviceOutput]:
"""Generate signals emitted by devices.
Args:
transmissions (list[list[Transmission]]):
Results of all transmitting DSP algorithms.
states (Sequence[DST | None], optional):
States of the transmitting devices.
If not specified, the current device states will be queried by calling :meth:`Device.state`.
Returns: List of device outputs.
"""
_states = [None] * self.num_devices if states is None else states
if len(transmissions) != self.num_devices:
raise ValueError(
f"Number of device transmissions ({len(transmissions)}) does not match number of registered devices ({self.num_devices}"
)
outputs = [d.generate_output(t, s) for d, s, t in zip(self.devices, _states, transmissions)]
return outputs
[docs]
def transmit_devices(
self, states: Sequence[DST | None] | None = None, notify: bool = True
) -> Sequence[DeviceTransmission]:
"""Generated information transmitted by all registered devices.
Args:
states (Sequence[DST | None], optional):
States of the transmitting devices.
If not specified, the current device states will be queried by calling :meth:`Device.state`.
notify (bool, optional):
Notify the transmit DSP layer's callbacks about the transmission results.
Enabled by default.
Returns: List of generated information transmitted by each device.
"""
_states = [None] * self.num_devices if states is None else states
transmissions = [d.transmit(s, notify) for d, s in zip(self.devices, _states)]
return transmissions
[docs]
def receive_operators(
self,
operator_inputs: Sequence[ProcessedDeviceInput] | Sequence[Sequence[Signal]],
states: Sequence[DST | None] | None = None,
notify: bool = True,
) -> Sequence[Sequence[Reception]]:
"""Receive over the registered operators.
Args:
operator_inputs (Sequence[Sequence[Signal]] | ProcessedDeviceInput):
Signal models to be processed by the receive DSP algorithms.
Two-dimensional sequence where the first dimension corresponds to the devices and the second to the operators.
states (Sequence[DST | None], optional):
States of the transmitting devices.
If not specified, the current device states will be queried by calling :meth:`Device.state`.
notify (bool, optional):
Notify the receive DSP layer's callbacks about the reception results.
Enabled by default.
Returns: list of information generated by receiving over the device's operators.
Raises:
ValueError: If the number of operator inputs does not match the number of receive devices.
"""
_states = [None] * self.num_devices if states is None else states
if len(operator_inputs) != self.num_devices:
raise ValueError(
f"Number of operator inputs ({len(operator_inputs)}) does not match the number of registered scenario devices ({self.num_devices})"
)
# Generate receptions
receptions = [d.receive_operators(i, s, notify) for d, i, s in zip(self.devices, operator_inputs, _states)] # type: ignore
return receptions
[docs]
def receive_devices(
self,
impinging_signals: Sequence[DeviceInput] | Sequence[Signal] | Sequence[Sequence[Signal]],
states: Sequence[DST | None] | None = None,
notify: bool = True,
) -> Sequence[DeviceReception]:
"""Receive over all scenario devices.
Internally calls :meth:`Scenario.process_inputs` and :meth:`Scenario.receive_operators`.
Args:
impinging_signals (Sequence[DeviceInput] | Sequence[Signal] | Sequence[Sequence[Signal]):
list of signals impinging onto the devices.
states (Sequence[DST | None], optional):
States of the transmitting devices.
If not specified, the current device states will be queried by calling :meth:`Device.state
notify (bool, optional):
Notify the receiving DSP layer's callbacks about the reception results.
Enabled by default.
Returns: list of the processed device input information.
Raises:
ValueError: If the number of `impinging_signals` does not match the number of registered devices.
"""
_states = [d.state() for d in self.devices] if states is None else states
# Generate inputs
processed_inputs = self.process_inputs(impinging_signals, _states)
# Generate operator receptions
operator_receptions = self.receive_operators(
[i.operator_inputs for i in processed_inputs], _states, notify
)
# Generate device receptions
device_receptions = [
DeviceReception.From_ProcessedDeviceInput(i, r)
for i, r in zip(processed_inputs, operator_receptions)
]
return device_receptions
@property
def num_drops(self) -> int | None:
"""Number of drops within the scenario.
If the scenario is in replay mode, this property represents the
recorded number of drops
If the scenario is in record mode, this property represnts the
current number of recorded drops.
Returns: Number of drops. `None` if not applicable.
"""
if self.mode == ScenarioMode.RECORD:
return self.__drop_counter
if self.mode == ScenarioMode.REPLAY:
return self.__num_replayed_drops
return None
@abstractmethod
def _drop(self) -> DropType:
"""Generate a single scenario drop.
Wrapped by the scenario base class :meth:`.drop` method.
Returns:
The drop object containing all information.
"""
... # pragma no cover
@abstractmethod
def _recall_drop(self, group: Group) -> DropType:
"""Recall a recorded drop from a HDF5 group.
Args:
group (Group):
HDF5 group containing the drop information.
Returns: The recalled drop.
"""
... # pragma no cover
[docs]
def drop(self) -> DropType:
"""Generate a single data drop from all scenario devices.
Return: The generated drop information.
"""
if self.mode == ScenarioMode.REPLAY:
# Recall the drop from the savefile
for _ in range(self.__num_replayed_drops):
drop_path = f"/campaigns/{self.__campaign}/drop_{self.__drop_counter:02d}"
self.__drop_counter = (self.__drop_counter + 1) % self.__num_replayed_drops
if drop_path in self.__file:
drop = self._recall_drop(self.__file[drop_path])
break
# ToDo: Notify the transmit callbacks about the replayed results
# Replay device operator receptions
# This will simulatenously notify the receive operator callbacks
_ = self.receive_operators(drop.operator_inputs)
else:
# Generate a new drop
drop = self._drop()
# Serialize the drop to HDF if in record mode
if self.mode == ScenarioMode.RECORD:
drop.to_HDF(
self.__file.create_group(
f"campaigns/{self.__campaign}/drop_{self.__drop_counter:02d}"
)
)
self.__drop_counter += 1
return drop
ScenarioType = TypeVar("ScenarioType", bound="Scenario")
"""Type of scenario."""
[docs]
class ReplayScenario(Scenario[Device, DeviceState, Drop]):
"""Scenario which is unable to generate drops."""
def _drop(self) -> Drop:
raise RuntimeError("Replay scenario may not generate data drops.")
def _recall_drop(self, group: Group) -> Drop:
return Drop.from_HDF(group, self.devices)