Source code for hermespy.simulation.simulation

# -*- coding: utf-8 -*-

from __future__ import annotations
from collections.abc import Sequence
from itertools import product
from sys import maxsize
from typing import Any, Callable, Dict, List, Mapping, Type

import numpy as np
from os import path
from ray import remote
from ruamel.yaml import SafeConstructor, SafeRepresenter, MappingNode, Node
from rich.console import Console

from hermespy.core import (
    Serializable,
    Pipeline,
    Verbosity,
    Operator,
    ConsoleMode,
    Evaluator,
    MonteCarloActor,
    MonteCarlo,
    MonteCarloResult,
    Signal,
    Visualization,
)
from hermespy.channel import Channel, ChannelRealization
from .scenario import SimulationScenario
from .simulated_device import (
    DeviceState,
    TriggerRealization,
    ProcessedSimulatedDeviceInput,
    SimulatedDevice,
)

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


class SimulationRunner(object):
    """Runner remote thread deployed by Monte Carlo routines"""

    __scenario: SimulationScenario  # Scenario to be run
    __trigger_realizations: Sequence[TriggerRealization]
    __propagation: Sequence[Sequence[Signal]] | None
    __processed_inputs: Sequence[ProcessedSimulatedDeviceInput]
    __device_states: Sequence[DeviceState] | None
    __channel_realizations: Sequence[ChannelRealization] | None

    def __init__(self, scenario: SimulationScenario) -> None:
        """
        Args:

            scenario(SimulationScenario):
                Scenario to be run.
        """

        self.__scenario = scenario
        self.__channel_realizations = None
        self.__device_states = None
        self.__device_states = None
        self.__trigger_realizations = None
        self.__propagation = None
        self.__processed_inputs = []

    def realize_channels(self) -> None:
        self.__channel_realizations = self.__scenario.realize_channels()

    def sample_trajectories(self, timestamp: float = 0.0) -> None:
        self.__device_states = [d.state(timestamp) for d in self.__scenario.devices]

    def transmit_operators(self) -> None:
        """Generate base-band signal models emitted by all registered transmitting operators.

        Internaly resolves to the scenario's transmit operators routine :meth:`SimulationScenario.transmit_operators`.
        """

        # Resolve to the scenario transmit operators routine
        _ = self.__scenario.transmit_operators()

    def generate_outputs(self) -> None:
        """Generate radio-frequency band signal models emitted by devices.

        Internally resolves to the scenario's generate outputs routine :meth:`SimulationScenario.generate_outputs`.
        """

        self.__trigger_realizations = self.__scenario.realize_triggers()
        _ = self.__scenario.generate_outputs(None, self.__trigger_realizations)

    def propagate(self) -> None:
        """Propagate the signals generated by registered transmitters over the channel model.

        Signals receiving at each receive modem are a superposition of all transmit signals impinging
        onto the receive modem over activated channels.

        The signal stream matrices contain the number of antennas on the first dimension and the number of
        signal samples on the second dimension.

        Raises:

            RuntimeError: If the propagation stage is called without prior device transmission.
            RuntimeError: If the number of transmit signals does not match the number of registered devices.
        """

        if self.__device_states is None or self.__channel_realizations is None:
            raise RuntimeError(
                "Propagation simulation stage called without prior channel or device realization"
            )

        device_outputs = [device.output for device in self.__scenario.devices]
        if any([t is None for t in device_outputs]):
            raise RuntimeError(
                "Propagation simulation stage called without prior device transmission"
            )

        # Propagate device outputs
        self.__propagation, _ = self.__scenario.propagate(
            device_outputs, self.__device_states, self.__channel_realizations
        )

    def process_inputs(self) -> None:
        """Process device inputs after channel propgation.

        Raises:

            RuntimeError: If the propagation stage is called without prior channel propagation.
            RuntimeError: If the number of arriving signals does not match the number of registered devices.
        """

        propagation_matrix = self.__propagation

        if self.__trigger_realizations is None:
            raise RuntimeError(
                "Process inputs simulation stage without prior call to generate outputs"
            )

        if len(self.__trigger_realizations) != self.__scenario.num_devices:
            raise RuntimeError(
                "Number of trigger realizations does not match the number of registered devices"
            )

        if propagation_matrix is None:
            raise RuntimeError(
                "Process inputs simulation stage called without prior channel propagation"
            )

        if len(propagation_matrix) != self.__scenario.num_devices:
            raise RuntimeError(
                f"Number of arriving signals ({len(propagation_matrix)}) does not match "
                f"the number of receiving devices ({self.__scenario.num_devices})"
            )

        self.__processed_inputs: Sequence[ProcessedSimulatedDeviceInput] = []
        for device, impinging_signals, trigger_realization in zip(
            self.__scenario.devices, propagation_matrix, self.__trigger_realizations
        ):
            self.__processed_inputs.append(
                device.process_input(
                    impinging_signals=impinging_signals,
                    noise_level=self.__scenario.noise_level,  # type: ignore[operator]
                    noise_model=self.__scenario.noise_model,  # type: ignore[operator]
                    trigger_realization=trigger_realization,
                )
            )

    def receive_operators(self) -> None:
        """Demodulate base-band signal models received by all registered receiving operators.

        Internally resolves to the scenario's receive operators routine :meth:`SimulationScenario.receive_operators`.
        """

        # Resolve to the scenario's operator receive routine
        _ = self.__scenario.receive_operators()


@remote(num_cpus=1)
class SimulationActor(MonteCarloActor[SimulationScenario], SimulationRunner):
    """Remote ray actor generated from the simulation runner class."""

    def __init__(
        self,
        argument_tuple: Any,
        index: int,
        stage_arguments: Mapping[str, Sequence[tuple]] | None = None,
        catch_exceptions: bool = True,
    ) -> None:
        """
        Args:

            argument_tuple:
                Object to be investigated during the simulation runtime.
                Dimensions over which the simulation will iterate.
                Evaluators used to process the investigated object sample state.

            index (int):
                Global index of the actor.

            stage_arguments (Mapping[str, Sequence[Tuple]], optional):
                Arguments for the simulation stages.

            catch_exceptions (bool, optional):
                Catch exceptions during run.
                Enabled by default.
        """

        # Initialize base classes
        MonteCarloActor.__init__(self, argument_tuple, index, stage_arguments, catch_exceptions)
        SimulationRunner.__init__(self, self._investigated_object)

        # Update the internal random seed pseudo-deterministically for each actor instance
        seed = self._investigated_object._rng.integers(0, maxsize)
        individual_seed = seed + index * 12345678
        self._investigated_object.seed = individual_seed

    @staticmethod
    def stage_identifiers() -> List[str]:
        return [
            "realize_channels",
            "sample_trajectories",
            "transmit_operators",
            "generate_outputs",
            "propagate",
            "process_inputs",
            "receive_operators",
        ]

    def stage_executors(self) -> List[Callable]:
        return [
            self.realize_channels,
            self.sample_trajectories,
            self.transmit_operators,
            self.generate_outputs,
            self.propagate,
            self.process_inputs,
            self.receive_operators,
        ]


[docs] class Simulation( Serializable, Pipeline[SimulationScenario, SimulatedDevice], MonteCarlo[SimulationScenario] ): """Executable HermesPy simulation configuration.""" yaml_tag = "Simulation" property_blacklist = {"console", "console_mode", "scenario"} plot_results: bool """Plot results after simulation runs""" dump_results: bool """Dump results to files after simulation runs.""" def __init__( self, scenario: SimulationScenario | None = None, num_samples: int = 100, drop_duration: float = 0.0, drop_interval: float = float("inf"), plot_results: bool = False, dump_results: bool = True, console_mode: ConsoleMode = ConsoleMode.INTERACTIVE, ray_address: str | None = None, results_dir: str | None = None, verbosity: str | Verbosity = Verbosity.INFO, seed: int | None = None, num_actors: int | None = None, ) -> None: """ Args: scenario (SimulationScenario, optional): The simulated scenario. If none is provided, an empty one will be initialized. num_samples (int, optional): Number of drops generated per sweeping grid section. 100 by default. drop_duration(float, optional): Duration of simulation drops in seconds. drop_interval(float, optional): Interval at which drops are being generated in seconds. If not specified, only a single drop is generated at the beginning of the simulation. plot_results (bool, optional): Plot results after simulation runs. Disabled by default. dump_results (bool, optional): Dump results to files after simulation runs. Enabled by default. console_mode (ConsoleMode, optional): Console output behaviour during execution. ray_address (str, optional): The address of the ray head node. If None is provided, the head node will be launched in this machine. results_dir (str, optional): Directory in which all simulation artifacts will be dropped. verbosity (Union[str, Verbosity], optional): Information output behaviour during execution. seed (int, optional): Random seed used to initialize the pseudo-random number generator. num_actors (int, optional): Number of actors to be deployed for parallel execution. If None is provided, the number of actors will be set to the number of available CPU cores. """ scenario = SimulationScenario() if scenario is None else scenario if seed is not None: scenario.seed = seed # Initialize base classes Pipeline.__init__( self, scenario, results_dir=results_dir, verbosity=verbosity, console_mode=console_mode ) MonteCarlo.__init__( self, self.scenario, num_samples, console=self.console, console_mode=console_mode, ray_address=ray_address, num_actors=num_actors, ) # Initialize class attributes self.plot_results = plot_results self.dump_results = dump_results self.drop_duration = drop_duration self.num_drops = num_samples self.drop_interval = drop_interval @property def num_samples(self) -> int: return self.num_drops @num_samples.setter def num_samples(self, value: int) -> None: self.num_drops = value @property def drop_interval(self) -> float: """Interval at which drops are being generated in seconds. Raises: ValueError for values smaller or equal to zero. """ return self.__drop_interval @drop_interval.setter def drop_interval(self, value: float) -> None: if value <= 0: raise ValueError("Drop interval must be greater than zero.") self.__drop_interval = value @Pipeline.console_mode.setter # type: ignore def console_mode(self, value: ConsoleMode) -> None: # type: ignore Pipeline.console_mode.fset(self, value) # type: ignore MonteCarlo.console_mode.fset(self, value) # type: ignore @Pipeline.console.setter # type: ignore def console(self, value: Console) -> None: # type: ignore Pipeline.console.fset(self, value) # type: ignore MonteCarlo.console.fset(self, value) # type: ignore
[docs] def run(self) -> MonteCarloResult: # Print indicator that the simulation is starting if self.console_mode != ConsoleMode.SILENT: self.console.print() # Just an empty line self.console.rule("Simulation Campaign") self.console.print() # Just an empty line # Generate timestamps at which drops are generated max_timestamp = max(d.trajectory.max_timestamp for d in self.scenario.devices) timestamps = ( np.arange(0, max_timestamp, self.drop_interval, np.float_) if max_timestamp > 0.0 else np.zeros(1, np.float_) ) stage_arguments = dict() if timestamps[-1] > 0.0: stage_arguments["sample_trajectories"] = [(t,) for t in timestamps] if self.console_mode != ConsoleMode.SILENT: self.console.log( f"Generating {len(timestamps)} drops at an interval of {self.drop_interval} seconds along the trajectories of moveable objects" ) # Generate simulation result result = self.simulate(SimulationActor) # Visualize results if the flag respective is enabled visualizations: Sequence[Visualization] = [] if self.plot_results: with self.style_context(): visualizations = result.plot() # Dump results if the respective flag is enabled if self.dump_results and self.results_dir is not None: # Save figures to png files for figure_idx, visualization in enumerate(visualizations): if visualization.figure is not None: visualization.figure.savefig( path.join(self.results_dir, f"figure_{figure_idx}.png"), format="png" ) # Save results to matlab file result.save_to_matlab(path.join(self.results_dir, "results.mat")) # Show plots if the flag is enabled # if self.plot_results: # plt.show() # Return result object return result
[docs] def set_channel( self, alpha: SimulatedDevice, beta: SimulatedDevice, channel: Channel | None ) -> None: """Specify a channel within the channel matrix. Convenience method resolving to the :meth:`set_channel<SimulationScenario.set_channel>` method of the managed :class:`SimulationScenario` instance, which can be accessed via the :attr:`scenario<hermespy.core.monte_carlo.MonteCarlo.scenario>` property. Args: receiver (SimulatedDevice): Index of the receiver within the channel matrix. transmitter (SimulatedDevice): Index of the transmitter within the channel matrix. channel (Channel | None): The channel instance to be set at position (`transmitter_index`, `receiver_index`). """ self.scenario.set_channel(alpha, beta, channel)
[docs] @classmethod def to_yaml( cls: Type[Simulation], representer: SafeRepresenter, node: Simulation ) -> MappingNode: """Serialize an `Simulation` object to YAML. Args: representer (SafeRepresenter): A handle to a representer used to generate valid YAML code. The representer gets passed down the serialization tree to each node. node (Simulation): The `Simulation` instance to be serialized. Returns: Node: The serialized YAML node """ # Prepare dimensions dimension_fields: List[Mapping[str, Any]] = [] for dimension in node.dimensions: dimension_map = { "property": dimension.dimension, "points": [p.value for p in dimension.sample_points], "title": dimension.title, } considered_objects = dimension.considered_objects if considered_objects != (node.scenario,): dimension_map["objects"] = considered_objects dimension_fields.append(dimension_map) # Collection channel models channels = [] for device_alpha, device_beta in product(node.scenario.devices, node.scenario.devices): channel = node.scenario.channel(device_alpha, device_beta) if channel is not None: channels.append((device_alpha, device_beta, channel)) additional_fields = { "noise_model": node.scenario.noise_model, # type: ignore[operator] "noise_level": node.scenario.noise_level, # type: ignore[operator] "verbosity": node.verbosity.name, "Devices": node.scenario.devices, "Operators": node.scenario.operators, "Evaluators": node.evaluators, "Dimensions": dimension_fields, "Channels": channels, } return node._mapping_serialization_wrapper(representer, additional_fields=additional_fields)
[docs] @classmethod def from_yaml(cls: Type[Simulation], constructor: SafeConstructor, node: Node) -> Simulation: """Recall a new `Simulation` instance from YAML. Args: constructor (SafeConstructor): A handle to the constructor extracting the YAML information. node (Node): YAML node representing the `Simulation` serialization. Returns: Simulation: Newly created `Simulation` instance. """ state: dict = constructor.construct_mapping(node, deep=True) # Pop configuration sections for "special" treatment devices: List[SimulatedDevice] = state.pop("Devices", []) channels: list[tuple[SimulatedDevice, SimulatedDevice, Channel]] = state.pop("Channels", []) _: List[Operator] = state.pop("Operators", []) evaluators: List[Evaluator] = state.pop("Evaluators", []) dimensions: Dict[str, Any] | List[Mapping[str, Any]] = state.pop("Dimensions", {}) # Initialize simulation state["scenario"] = SimulationScenario( noise_level=state.pop("noise_level", None), noise_model=state.pop("noise_model", None) ) simulation: Simulation = cls.InitializationWrapper(state) # Add devices to the simulation for device in devices: simulation.scenario.add_device(device) # Assign channel models for device_alpha, device_beta, channel in channels: simulation.scenario.set_channel(device_alpha, device_beta, channel) # Register evaluators for evaluator in evaluators: simulation.add_evaluator(evaluator) # Add simulation dimensions if isinstance(dimensions, list): for dimension in dimensions: considered_objects = dimension.get("objects", (simulation.scenario,)) new_dim = simulation.new_dimension( dimension["property"], dimension["points"], *considered_objects ) title = dimension.get("title", None) if title is not None: new_dim.title = title else: for property_name, property_values in dimensions.items(): simulation.new_dimension(property_name, property_values, simulation.scenario) # Return simulation instance recovered from the serialization return simulation
@staticmethod def _pip_packages() -> List[str]: return MonteCarlo._pip_packages() + ["sparse", "protobuf", "numba"]