# -*- coding: utf-8 -*-
"""
==========
Simulation
==========
"""
from __future__ import annotations
from collections.abc import Sequence
from sys import maxsize
from time import time
from typing import Any, Callable, Dict, List, Mapping, Optional, overload, Type, Union
import matplotlib.pyplot as plt
import numpy as np
from h5py import Group
from os import path
from ray import remote
from ruamel.yaml import SafeConstructor, SafeRepresenter, MappingNode, Node
from rich.console import Console
from hermespy.core import (
DeviceInput,
Drop,
Serializable,
Pipeline,
Verbosity,
Operator,
ConsoleMode,
Evaluator,
register,
MonteCarloActor,
MonteCarlo,
MonteCarloResult,
Scenario,
Signal,
DeviceOutput,
SNRType,
)
from hermespy.channel import (
Channel,
ChannelPropagation,
ChannelRealization,
DirectiveChannelRealization,
IdealChannel,
QuadrigaInterface,
)
from .simulated_device import (
TriggerModel,
TriggerRealization,
ProcessedSimulatedDeviceInput,
SimulatedDevice,
SimulatedDeviceTransmission,
SimulatedDeviceReception,
)
__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class SimulatedDrop(Drop):
"""Drop containing all information generated during a simulated wireless scenario transmission,
channel propagation and reception."""
__channel_realizations: Sequence[Sequence[DirectiveChannelRealization]]
def __init__(
self,
timestamp: float,
device_transmissions: Sequence[SimulatedDeviceTransmission],
channel_realizations: Sequence[Sequence[DirectiveChannelRealization]],
device_receptions: Sequence[SimulatedDeviceReception],
) -> None:
"""
Args:
timestamp (float):
Time at which the drop was generated.
device_transmissions (Sequence[DeviceTransmission]):
Transmitted device information.
channel_realizations (Sequence[Sequence[DirectiveChannelRealization]]):
Realizations of the wireless channels over which the simualation propagated device transmissions.
device_receptions (Sequence[ProcessedSimulatedDeviceReception]):
Received device information.
"""
self.__channel_realizations = channel_realizations
Drop.__init__(self, timestamp, device_transmissions, device_receptions)
@property
def channel_realizations(self) -> Sequence[Sequence[DirectiveChannelRealization]]:
"""Channel realizations over which signals were propagated.
Returns: Two-dimensional numpy matrix with each entry corresponding to the respective device link's channel.
"""
return self.__channel_realizations
def to_HDF(self, group: Group) -> None:
num_devices = self.num_device_transmissions
# Serialize attributes
group.attrs["timestamp"] = self.timestamp
group.attrs["num_transmissions"] = self.num_device_transmissions
group.attrs["num_receptions"] = self.num_device_receptions
group.attrs["num_devices"] = self.num_device_transmissions
# Serialize groups
for t, transmission in enumerate(self.device_transmissions):
transmission.to_HDF(self._create_group(group, f"transmission_{t:02d}"))
for r, reception in enumerate(self.device_receptions):
reception.to_HDF(self._create_group(group, f"reception_{r:02d}"))
for d_out in range(num_devices):
for d_in in range(d_out + 1):
realization_group = self._create_group(
group, f"channel_realization_{d_out:02d}_{d_in:02d}"
)
self.channel_realizations[d_out][d_in].realization.to_HDF(realization_group)
@classmethod
def from_HDF(
cls: Type[SimulatedDrop], group: Group, scenario: SimulationScenario | None = None
) -> SimulatedDrop:
# Require a scenario to be specified
# Maybe there is a workaround possible since this is validates object-oriented principles
if scenario is None:
raise ValueError("Simulation drops must be deserialized with a scenario instance")
# Recall attributes
timestamp = group.attrs.get("timestamp", 0.0)
num_transmissions = group.attrs.get("num_transmissions", 0)
num_receptions = group.attrs.get("num_receptions", 0)
num_devices = group.attrs.get("num_devices", 1)
# Assert that the scenario parameters match the serialization
if scenario.num_devices != num_devices:
raise ValueError(
f"Number of scenario devices does not match the serialization ({scenario.num_devices} != {num_devices})"
)
# Recall groups
transmissions = [
SimulatedDeviceTransmission.from_HDF(group[f"transmission_{t:02d}"])
for t in range(num_transmissions)
]
receptions = [
SimulatedDeviceReception.from_HDF(group[f"reception_{r:02d}"])
for r in range(num_receptions)
]
channel_realizations = np.empty((num_devices, num_devices), dtype=np.object_)
for device_beta_idx in range(num_devices):
beta_device = scenario.devices[device_beta_idx]
for device_alpha_idx in range(device_beta_idx + 1):
alpha_device = scenario.devices[device_alpha_idx]
# Recall the channel realization
channel = scenario.channels[device_beta_idx, device_alpha_idx]
realization = channel.recall_realization(
group[f"channel_realization_{device_beta_idx:02d}_{device_alpha_idx:02d}"]
)
# Place the channel realization into the channel realization matrix
channel_realizations[
device_beta_idx, device_alpha_idx
] = DirectiveChannelRealization(alpha_device, beta_device, realization)
channel_realizations[
device_alpha_idx, device_beta_idx
] = DirectiveChannelRealization(beta_device, alpha_device, realization)
return SimulatedDrop(timestamp, transmissions, channel_realizations.tolist(), receptions)
[docs]
class SimulationScenario(Scenario[SimulatedDevice]):
yaml_tag = "SimulationScenario"
__channels: np.ndarray # Channel matrix linking devices
__snr: Optional[float] # Signal to noise ratio at the receiver-side
__snr_type: SNRType # Global global type of signal to noise ratio.
def __init__(
self, snr: float = float("inf"), snr_type: str | SNRType = SNRType.PN0, *args, **kwargs
) -> None:
"""
Args:
snr (float, optional):
The assumed linear signal to noise ratio.
Infinite by default, i.e. no added noise during reception.
snr_type (Union[str, SNRType], optional):
The signal to noise ratio metric to be used.
By default, signal power to noise power is assumed.
"""
Scenario.__init__(self, *args, **kwargs)
self.snr = snr
self.snr_type = snr_type
self.__channels = np.ndarray((0, 0), dtype=object)
[docs]
def new_device(self, *args, **kwargs) -> SimulatedDevice:
"""Add a new device to the simulation scenario.
Returns:
SimulatedDevice: Newly added simulated device.
"""
device = SimulatedDevice(*args, **kwargs)
self.add_device(device)
return device
[docs]
def add_device(self, device: SimulatedDevice) -> None:
# Add the device to the scenario
Scenario.add_device(self, device)
device.scenario = self
if self.num_devices == 1:
self.__channels = np.array([[IdealChannel(device, device)]], dtype=object)
else:
# Create new channels from each existing device to the newly added device
new_channels = np.array([[IdealChannel(device, rx)] for rx in self.devices])
# Complete channel matrix by the newly created channels
self.__channels = np.append(self.__channels, new_channels[:-1], axis=1)
self.__channels = np.append(self.__channels, new_channels.T, axis=0)
@property
def channels(self) -> np.ndarray:
"""Channel matrix between devices.
Returns:
np.ndarray:
An `MxM` matrix of channels between devices.
"""
return self.__channels
[docs]
def channel(self, transmitter: SimulatedDevice, receiver: SimulatedDevice) -> Channel:
"""Access a specific channel between two devices.
Args:
transmitter (SimulatedDevice):
The device transmitting into the channel.
receiver (SimulatedDevice):
the device receiving from the channel
Returns:
Channel:
Channel between `transmitter` and `receiver`.
Raises:
ValueError:
Should `transmitter` or `receiver` not be registered with this scenario.
"""
devices = self.devices
if transmitter not in devices:
raise ValueError("Provided transmitter is not registered with this scenario")
if receiver not in devices:
raise ValueError("Provided receiver is not registered with this scenario")
index_transmitter = devices.index(transmitter)
index_receiver = devices.index(receiver)
return self.__channels[index_transmitter, index_receiver]
[docs]
def departing_channels(
self, transmitter: SimulatedDevice, active_only: bool = False
) -> List[Channel]:
"""Collect all channels departing from a transmitting device.
Args:
transmitter (SimulatedDevice):
The transmitting device.
active_only (bool, optional):
Consider only active channels.
A channel is considered active if its gain is greater than zero.
Returns: A list of departing channels.
Raises:
ValueError: Should `transmitter` not be registered with this scenario.
"""
devices = self.devices
if transmitter not in devices:
raise ValueError("The provided transmitter is not registered with this scenario.")
transmitter_index = devices.index(transmitter)
channels: List[Channel] = self.__channels[:, transmitter_index].tolist()
if active_only:
channels = [channel for channel in channels if channel.gain > 0.0]
return channels
[docs]
def arriving_channels(
self, receiver: SimulatedDevice, active_only: bool = False
) -> List[Channel]:
"""Collect all channels arriving at a device.
Args:
receiver (Receiver):
The receiving modem.
active_only (bool, optional):
Consider only active channels.
A channel is considered active if its gain is greater than zero.
Returns: A list of arriving channels.
Raises:
ValueError: Should `receiver` not be registered with this scenario.
"""
devices = self.devices
if receiver not in devices:
raise ValueError("The provided transmitter is not registered with this scenario.")
receiver_index = devices.index(receiver)
channels: List[Channel] = self.__channels[receiver_index,].tolist()
if active_only:
channels = [channel for channel in channels if channel.gain > 0.0]
return channels
[docs]
def set_channel(
self,
beta_device: int | SimulatedDevice,
alpha_device: int | SimulatedDevice,
channel: Channel | None,
) -> None:
"""Specify a channel within the channel matrix.
Args:
beta_device (int | SimulatedDevice):
Index of the receiver within the channel matrix.
alpha_device (int | SimulatedDevice):
Index of the transmitter within the channel matrix.
channel (Channel | None):
The channel instance to be set at position (`transmitter_index`, `receiver_index`).
Raises:
ValueError:
If `transmitter_index` or `receiver_index` are greater than the channel matrix dimensions.
"""
if isinstance(beta_device, SimulatedDevice):
beta_device = self.devices.index(beta_device)
if isinstance(alpha_device, SimulatedDevice):
alpha_device = self.devices.index(alpha_device)
if self.__channels.shape[0] <= alpha_device or 0 > alpha_device:
raise ValueError("Alpha device index greater than channel matrix dimension")
if self.__channels.shape[1] <= beta_device or 0 > beta_device:
raise ValueError("Beta Device index greater than channel matrix dimension")
# Update channel field within the matrix
self.__channels[alpha_device, beta_device] = channel
self.__channels[beta_device, alpha_device] = channel
if channel is not None:
# Set proper receiver and transmitter fields
channel.alpha_device = self.devices[alpha_device]
channel.beta_device = self.devices[beta_device]
channel.scenario = self
@register(first_impact="receive_devices", title="SNR") # type: ignore[misc]
@property
def snr(self) -> Optional[float]:
"""Ratio of signal energy to noise power at the receiver-side.
Returns:
Optional[float]:
Linear signal energy to noise power ratio.
`None` if not specified.
Raises:
ValueError: On ratios smaller or equal to zero.
"""
return self.__snr
@snr.setter
def snr(self, value: Optional[float]) -> None:
if value is None:
self.__snr = None
else:
if value <= 0.0:
raise ValueError("Signal to noise ratio must be greater than zero")
self.__snr = value
@register(first_impact="receive_devices", title="SNR Type") # type: ignore
@property
def snr_type(self) -> SNRType:
"""Type of signal-to-noise ratio.
Returns:
SNRType: The SNR type.
"""
return self.__snr_type
@snr_type.setter
def snr_type(self, snr_type: Union[str, int, SNRType]) -> None:
"""Modify the type of signal-to-noise ratio.
Args:
snr_type (Union[str, int, SNRType]):
The new type of signal to noise ratio, string or enum representation.
"""
if isinstance(snr_type, str):
snr_type = SNRType[snr_type]
elif isinstance(snr_type, int):
snr_type = SNRType(snr_type)
self.__snr_type = snr_type
[docs]
def transmit_devices(self, cache: bool = True) -> Sequence[SimulatedDeviceTransmission]:
"""Generate simulated device transmissions of all registered devices.
Devices sharing trigger models will be triggered simultaneously.
Args:
cache (bool, optional):
Cache the generated transmissions at the respective devices.
Enabled by default.
Returns:
Sequence of simulated simulated device transmissions.
"""
# Collect unique triggers
triggers: List[TriggerModel] = []
trigger_realizations: List[TriggerRealization] = []
transmissions: List[SimulatedDeviceTransmission] = []
for device in self.devices:
trigger_realization: TriggerRealization
if device.trigger_model not in triggers:
trigger_realization = device.trigger_model.realize()
triggers.append(device.trigger_model)
trigger_realizations.append(trigger_realization)
else:
trigger_realization = trigger_realizations[triggers.index(device.trigger_model)]
transmission = device.transmit(cache=cache, trigger_realization=trigger_realization)
transmissions.append(transmission)
return transmissions
[docs]
def propagate(self, transmissions: Sequence[DeviceOutput]) -> List[List[ChannelPropagation]]:
"""Propagate device transmissions over the scenario's channel instances.
Args:
transmissions (Sequence[DeviceOutput])
Sequence of device transmissisons.
Returns: Propagation matrix.
Raises:
ValueError: If the length of `transmissions` does not match the number of registered devices.
"""
if len(transmissions) != self.num_devices:
raise ValueError(
f"Number of transmit signals ({len(transmissions)}) does not match "
f"the number of registered devices ({self.num_devices})"
)
# Initialize the propagated signals
propagation_matrix = np.empty((self.num_devices, self.num_devices), dtype=object)
# Loop over each channel within the channel matrix and propagate the signals over the respective channel model
for device_alpha_idx, alpha_device in enumerate(self.devices):
for device_beta_idx, beta_device in enumerate(self.devices[: 1 + device_alpha_idx]):
# Select and realize the channel linking device alpha and device beta
channel: Channel = self.channels[device_alpha_idx, device_beta_idx]
channel_realization: ChannelRealization = channel.realize()
# Propagate signal emitted from device alpha to device beta over the linking channel
alpha_propagation = channel_realization.propagate(
transmissions[device_alpha_idx], alpha_device, beta_device
)
# Propagate signal emitted from device beta to device alpha over the linking channel
beta_propagation = channel_realization.propagate(
transmissions[device_beta_idx], beta_device, alpha_device
)
# Store propagtions in their respective coordinates within the propagation matrix
propagation_matrix[device_alpha_idx, device_beta_idx] = beta_propagation
propagation_matrix[device_beta_idx, device_alpha_idx] = alpha_propagation
return propagation_matrix.tolist()
@overload
def process_inputs(
self,
impinging_signals: Sequence[DeviceInput],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> List[ProcessedSimulatedDeviceInput]:
... # pragma: no cover
@overload
def process_inputs(
self,
impinging_signals: Sequence[Signal],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> List[ProcessedSimulatedDeviceInput]:
... # pragma: no cover
@overload
def process_inputs(
self,
impinging_signals: Sequence[Sequence[Signal]],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> List[ProcessedSimulatedDeviceInput]:
... # pragma: no cover
@overload
def process_inputs(
self,
impinging_signals: Sequence[Sequence[ChannelPropagation]],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> List[ProcessedSimulatedDeviceInput]:
... # pragma: no cover
@overload
def receive_devices(
self,
impinging_signals: Sequence[DeviceInput],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
... # pragma: no cover
@overload
def receive_devices(
self,
impinging_signals: Sequence[Signal],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
... # pragma: no cover
@overload
def receive_devices(
self,
impinging_signals: Sequence[Sequence[Signal]],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
... # pragma: no cover
@overload
def receive_devices(
self,
impinging_signals: Sequence[Sequence[ChannelPropagation]],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
... # pragma: no cover
[docs]
def receive_devices(
self,
impinging_signals: Sequence[DeviceInput]
| Sequence[Signal]
| Sequence[Sequence[Signal]]
| Sequence[Sequence[ChannelPropagation]],
cache: bool = True,
trigger_realizations: Sequence[TriggerRealization] | None = None,
) -> Sequence[SimulatedDeviceReception]:
"""Receive over all simulated scenario devices.
Internally calls :meth:`SimulationScenario.process_inputs` and :meth:`Scenario.receive_operators`.
Args:
impinging_signals (List[Union[DeviceInput, Signal, Iterable[Signal]]]):
List of signals impinging onto the devices.
cache (bool, optional):
Cache the operator inputs at the registered receive operators for further processing.
Enabled by default.
trigger_realizations (Sequence[TriggerRealization], optional):
Sequence of trigger realizations.
If not specified, ideal triggerings are assumed for all devices.
Returns: List of the processed device input information.
Raises:
ValueError: If the number of `impinging_signals` does not match the number of registered devices.
"""
# Generate inputs
processed_inputs = self.process_inputs(
impinging_signals, cache=cache, trigger_realizations=trigger_realizations
)
# Generate operator receptions
operator_receptions = self.receive_operators(
[i.operator_inputs for i in processed_inputs], cache=cache
)
# Generate device receptions
device_receptions = [SimulatedDeviceReception.From_ProcessedSimulatedDeviceInput(i, r) for i, r in zip(processed_inputs, operator_receptions)] # type: ignore
return device_receptions
def _drop(self) -> SimulatedDrop:
# Generate drop timestamp
timestamp = time()
# Generate device transmissions
device_transmissions = self.transmit_devices()
# Simulate channel propagation
channel_propagations = self.propagate(device_transmissions)
channel_realizations = [[tx.realization for tx in rx] for rx in channel_propagations]
# Process receptions
trigger_realizations = [t.trigger_realization for t in device_transmissions]
device_receptions = self.receive_devices(
channel_propagations, trigger_realizations=trigger_realizations
)
# Return finished drop
return SimulatedDrop(
timestamp, device_transmissions, channel_realizations, device_receptions
)
class SimulationRunner(object):
"""Runner remote thread deployed by Monte Carlo routines"""
__scenario: SimulationScenario # Scenario to be run
__propagation: Sequence[Sequence[ChannelPropagation]] | None
__processed_inputs: Sequence[ProcessedSimulatedDeviceInput]
def __init__(self, scenario: SimulationScenario) -> None:
"""
Args:
scenario(SimulationScenario):
Scenario to be run.
"""
self.__scenario = scenario
self.__propagation = None
self.__processed_inputs = []
def transmit_operators(self) -> None:
"""Generate base-band signal models emitted by all registered transmitting operators.
Internaly resolves to the scenario's transmit operators routine :meth:`SimulationScenario.transmit_operators`.
"""
# Resolve to the scenario transmit operators routine
_ = self.__scenario.transmit_operators()
def generate_outputs(self) -> None:
"""Generate radio-frequency band signal models emitted by devices.
Internally resolves to the scenario's generate outputs routine :meth:`SimulationScenario.generate_outputs`.
"""
# Resolve to the scenario output generation routine
_ = self.__scenario.generate_outputs()
def propagate(self) -> None:
"""Propagate the signals generated by registered transmitters over the channel model.
Signals receiving at each receive modem are a superposition of all transmit signals impinging
onto the receive modem over activated channels.
The signal stream matrices contain the number of antennas on the first dimension and the number of
signal samples on the second dimension.
Raises:
RuntimeError: If the propagation stage is called without prior device transmission.
RuntimeError: If the number of transmit signals does not match the number of registered devices.
"""
device_outputs = [device.output for device in self.__scenario.devices]
if any([t is None for t in device_outputs]):
raise RuntimeError(
"Propagation simulation stage called without prior device transmission"
)
# Propagate device outputs
self.__propagation = self.__scenario.propagate(device_outputs)
def process_inputs(self) -> None:
"""Process device inputs after channel propgation.
Raises:
RuntimeError: If the propagation stage is called without prior channel propagation.
RuntimeError: If the number of arriving signals does not match the number of registered devices.
"""
propagation_matrix = self.__propagation
if propagation_matrix is None:
raise RuntimeError(
"Receive device simulation stage called without prior channel propagation"
)
if len(propagation_matrix) != self.__scenario.num_devices:
raise RuntimeError(
f"Number of arriving signals ({len(propagation_matrix)}) does not match "
f"the number of receiving devices ({self.__scenario.num_devices})"
)
self.__processed_inputs: Sequence[ProcessedSimulatedDeviceInput] = []
for device, impinging_signals in zip(self.__scenario.devices, propagation_matrix):
self.__processed_inputs.append(
device.process_input(
impinging_signals=impinging_signals,
snr=self.__scenario.snr,
snr_type=self.__scenario.snr_type,
)
)
def receive_operators(self) -> None:
"""Demodulate base-band signal models received by all registered receiving operators.
Internally resolves to the scenario's receive operators routine :meth:`SimulationScenario.receive_operators`.
"""
# Resolve to the scenario's operator receive routine
_ = self.__scenario.receive_operators()
@remote(num_cpus=1)
class SimulationActor(MonteCarloActor[SimulationScenario], SimulationRunner):
"""Remote ray actor generated from the simulation runner class."""
def __init__(self, argument_tuple: Any, index: int, catch_exceptions: bool = True) -> None:
"""
Args:
argument_tuple (Any):
MonteCarloActor initialization arguments.
"""
MonteCarloActor.__init__(self, argument_tuple, index, catch_exceptions)
SimulationRunner.__init__(self, self._investigated_object)
# Update the internal random seed pseudo-deterministically for each actor instance
seed = self._investigated_object._rng.integers(0, maxsize)
individual_seed = seed + index * 12345678
self._investigated_object.seed = individual_seed
@staticmethod
def stage_identifiers() -> List[str]:
return [
"transmit_operators",
"generate_outputs",
"propagate",
"process_inputs",
"receive_operators",
]
def stage_executors(self) -> List[Callable]:
return [
self.transmit_operators,
self.generate_outputs,
self.propagate,
self.process_inputs,
self.receive_operators,
]
[docs]
class Simulation(
Serializable, Pipeline[SimulationScenario, SimulatedDevice], MonteCarlo[SimulationScenario]
):
"""HermesPy simulation configuration."""
yaml_tag = "Simulation"
property_blacklist = {"console", "console_mode", "scenario"}
plot_results: bool
"""Plot results after simulation runs"""
dump_results: bool
"""Dump results to files after simulation runs."""
def __init__(
self,
scenario: SimulationScenario | None = None,
num_samples: int = 100,
drop_duration: float = 0.0,
plot_results: bool = False,
dump_results: bool = True,
console_mode: ConsoleMode = ConsoleMode.INTERACTIVE,
ray_address: str | None = None,
results_dir: str | None = None,
verbosity: str | Verbosity = Verbosity.INFO,
seed: int | None = None,
num_actors: int | None = None,
) -> None:
"""Args:
scenario (SimulationScenario, optional):
The simulated scenario.
If none is provided, an empty one will be initialized.
num_samples (int, optional):
Number of drops generated per sweeping grid section.
100 by default.
drop_duration(float, optional):
Duration of simulation drops in seconds.
plot_results (bool, optional):
Plot results after simulation runs.
Disabled by default.
dump_results (bool, optional):
Dump results to files after simulation runs.
Enabled by default.
ray_address (str, optional):
The address of the ray head node.
If None is provided, the head node will be launched in this machine.
results_dir (str, optional):
Directory in which all simulation artifacts will be dropped.
verbosity (Union[str, Verbosity], optional):
Information output behaviour during execution.
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
"""
scenario = SimulationScenario() if scenario is None else scenario
if seed is not None:
scenario.seed = seed
# Initialize base classes
Pipeline.__init__(
self, scenario, results_dir=results_dir, verbosity=verbosity, console_mode=console_mode
)
MonteCarlo.__init__(
self,
self.scenario,
num_samples,
console=self.console,
console_mode=console_mode,
ray_address=ray_address,
num_actors=num_actors,
)
self.plot_results = plot_results
self.dump_results = dump_results
self.drop_duration = drop_duration
self.num_drops = num_samples
@property
def num_samples(self) -> int:
return self.num_drops
@num_samples.setter
def num_samples(self, value: int) -> None:
self.num_drops = value
@Pipeline.console_mode.setter # type: ignore
def console_mode(self, value: ConsoleMode) -> None: # type: ignore
Pipeline.console_mode.fset(self, value) # type: ignore
MonteCarlo.console_mode.fset(self, value) # type: ignore
@Pipeline.console.setter # type: ignore
def console(self, value: Console) -> None: # type: ignore
Pipeline.console.fset(self, value) # type: ignore
MonteCarlo.console.fset(self, value) # type: ignore
[docs]
def run(self) -> MonteCarloResult:
# Print indicator that the simulation is starting
if self.console_mode != ConsoleMode.SILENT:
self.console.print() # Just an empty line
self.console.rule("Simulation Campaign")
self.console.print() # Just an empty line
# Generate simulation result
result = self.simulate(SimulationActor)
# Visualize results if the flag respective is enabled
figures: List[plt.FigureBase] = []
if self.plot_results:
with self.style_context():
figures = result.plot()
# Dump results if the respective flag is enabled
if self.dump_results and self.results_dir is not None:
# Save figures to png files
for figure_idx, base_figure in enumerate(figures):
figure_instace = base_figure.get_figure()
if figure_instace is not None:
figure_instace.savefig(
path.join(self.results_dir, f"figure_{figure_idx}.png"), format="png"
)
# Save results to matlab file
result.save_to_matlab(path.join(self.results_dir, "results.mat"))
# Show plots if the flag is enabled
# if self.plot_results:
# plt.show()
# Return result object
return result
[docs]
def set_channel(
self, alpha: int | SimulatedDevice, beta: int | SimulatedDevice, channel: Channel | None
) -> None:
"""Specify a channel within the channel matrix.
Convenience method resolving to :meth:`.SimulationScenario.set_channel`.
Args:
receiver (int | SimulatedDevice):
Index of the receiver within the channel matrix.
transmitter (int | SimulatedDevice):
Index of the transmitter within the channel matrix.
channel (Channel | None):
The channel instance to be set at position (`transmitter_index`, `receiver_index`).
"""
self.scenario.set_channel(alpha, beta, channel)
[docs]
@classmethod
def to_yaml(
cls: Type[Simulation], representer: SafeRepresenter, node: Simulation
) -> MappingNode:
"""Serialize an `Simulation` object to YAML.
Args:
representer (SafeRepresenter):
A handle to a representer used to generate valid YAML code.
The representer gets passed down the serialization tree to each node.
node (Simulation):
The `Simulation` instance to be serialized.
Returns:
Node:
The serialized YAML node
"""
# Prepare dimensions
dimension_fields: List[Mapping[str, Any]] = []
for dimension in node.dimensions:
dimension_map = {
"property": dimension.dimension,
"points": [p.value for p in dimension.sample_points],
"title": dimension.title,
}
considered_objects = dimension.considered_objects
if considered_objects != (node.scenario,):
dimension_map["objects"] = considered_objects
dimension_fields.append(dimension_map)
additional_fields = {
"snr_type": node.scenario.snr_type,
"verbosity": node.verbosity.name,
"Devices": node.scenario.devices,
"Operators": node.scenario.operators,
"Evaluators": node.evaluators,
"Dimensions": dimension_fields,
"Channels": node.scenario.channels.flatten().tolist(),
}
return node._mapping_serialization_wrapper(representer, additional_fields=additional_fields)
[docs]
@classmethod
def from_yaml(cls: Type[Simulation], constructor: SafeConstructor, node: Node) -> Simulation:
"""Recall a new `Simulation` instance from YAML.
Args:
constructor (SafeConstructor):
A handle to the constructor extracting the YAML information.
node (Node):
YAML node representing the `Simulation` serialization.
Returns:
Simulation:
Newly created `Simulation` instance.
"""
state: dict = constructor.construct_mapping(node, deep=True)
# Launch a global quadriga instance
quadriga_interface: Optional[QuadrigaInterface] = state.pop(
QuadrigaInterface.yaml_tag, None
)
if quadriga_interface is not None: # pragma: no cover
QuadrigaInterface.SetGlobalInstance(quadriga_interface)
# Pop configuration sections for "special" treatment
devices: List[SimulatedDevice] = state.pop("Devices", [])
channels: List[Channel] = state.pop("Channels", [])
_: List[Operator] = state.pop("Operators", [])
evaluators: List[Evaluator] = state.pop("Evaluators", [])
dimensions: Dict[str, Any] | List[Mapping[str, Any]] = state.pop("Dimensions", {})
# Initialize simulation
state["scenario"] = SimulationScenario(
snr=state.pop("snr", float("inf")), snr_type=state.pop("snr_type", SNRType.EBN0)
)
simulation: Simulation = cls.InitializationWrapper(state)
# Add devices to the simulation
for device in devices:
simulation.scenario.add_device(device)
# Assign channel models
for channel in channels:
# If the scenario features just a single device, we can infer the transmitter and receiver easily
if channel.alpha_device is None or channel.beta_device is None:
if simulation.scenario.num_devices > 1:
raise RuntimeError(
"Please specifiy the transmitting and receiving device of each channel in a multi-device scenario"
)
channel.alpha_device = simulation.scenario.devices[0]
channel.beta_device = simulation.scenario.devices[0]
simulation.scenario.set_channel(channel.alpha_device, channel.beta_device, channel)
# Register evaluators
for evaluator in evaluators:
simulation.add_evaluator(evaluator)
# Add simulation dimensions
if isinstance(dimensions, list):
for dimension in dimensions:
considered_objects = dimension.get("objects", (simulation.scenario,))
new_dim = simulation.new_dimension(
dimension["property"], dimension["points"], *considered_objects
)
title = dimension.get("title", None)
if title is not None:
new_dim.title = title
else:
for property_name, property_values in dimensions.items():
simulation.new_dimension(property_name, property_values, simulation.scenario)
# Return simulation instance recovered from the serialization
return simulation
@staticmethod
def _pip_packages() -> List[str]:
return MonteCarlo._pip_packages() + ["sparse", "protobuf", "numba"]