# -*- coding: utf-8 -*-
"""
=============
Hardware Loop
=============
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from contextlib import ExitStack
from os import path
from typing import Any, Generic, List, Mapping, Sequence, Tuple, Type
import matplotlib.pyplot as plt
import numpy as np
from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn
from rich.prompt import Confirm
from ruamel.yaml import SafeConstructor, Node, SafeRepresenter, MappingNode
from hermespy.core import (
Artifact,
ConsoleMode,
Drop,
Evaluation,
EvaluationResult,
Evaluator,
MonteCarloResult,
Pipeline,
Serializable,
SerializableEnum,
VAT,
Verbosity,
)
from hermespy.core.monte_carlo import GridDimension, SampleGrid, MonteCarloSample
from hermespy.tools import tile_figures
from .physical_device import PDT
from .physical_device_dummy import PhysicalScenarioDummy
from .scenario import PhysicalScenarioType
__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
class IterationPriority(SerializableEnum):
"""Iteration priority of the hardware loop.
Used by the :meth:`HardwareLoop.run` method.
"""
DROPS = 0
"""Iterate over drops first before iterating over the parameter grid."""
GRID = 1
"""Iterate over the parameter grid first before iterating over drops."""
class EvaluatorPlotMode(SerializableEnum):
"""Evalution plot mode during hardware loop runtime."""
HIDE = 0
"""Do not plot evaluation results during hardware loop runtime."""
EVALUATION = 1
"""Plot the evaluation during hardware loop runtime."""
ARTIFACTS = 2
"""Plot the series of generated scalar artifacts during hardware loop runtime."""
class EvaluatorRegistration(Evaluator):
"""Evaluator registration for the hardware loop.
Created by the :meth:`HardwareLoop.add_evaluator` method.
"""
__evaluator: Evaluator
__plot_mode: EvaluatorPlotMode
def __init__(self, evaluator: Evaluator, plot_mode: EvaluatorPlotMode) -> None:
"""
Args:
evaluator (Evaluator):
Registered evaluator.
plot_mode (EvaluatorPlotMode):
Plot mode of the registered evaluator.
"""
# Initialize class attributes
self.__evaluator = evaluator
self.__plot_mode = plot_mode
@property
def evaluator(self) -> Evaluator:
"""Registered evaluator."""
return self.__evaluator
@property
def plot_mode(self) -> EvaluatorPlotMode:
"""Plot mode of the registered evaluator."""
return self.__plot_mode
def evaluate(self) -> Evaluation:
return self.__evaluator.evaluate()
@property
def abbreviation(self) -> str:
return self.__evaluator.abbreviation
@property
def title(self) -> str:
return self.__evaluator.title
@property
def confidence(self) -> float:
return self.__evaluator.confidence
@confidence.setter
def confidence(self, value: float) -> None:
self.__evaluator.confidence = value
@property
def tolerance(self) -> float:
return self.__evaluator.tolerance
@tolerance.setter
def tolerance(self, value: float) -> None:
self.__evaluator.tolerance = value
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> EvaluationResult:
return self.__evaluator.generate_result(grid, artifacts)
class HardwareLoopSample(object):
"""Sample of the hardware loop.
Generated during :meth:`HardwareLoop.run`.
"""
__drop: Drop
__evaluations: Sequence[Evaluation]
__artifacts: Sequence[Artifact]
def __init__(
self, drop: Drop, evaluations: Sequence[Evaluation], artifacts: Sequence[Artifact]
) -> None:
# Initialize class attributes
self.__drop = drop
self.__evaluations = evaluations
self.__artifacts = artifacts
@property
def drop(self) -> Drop:
"""Drop of the hardware loop sample."""
return self.__drop
@property
def evaluations(self) -> Sequence[Evaluation]:
"""Evaluations of the hardware loop sample."""
return self.__evaluations
@property
def artifacts(self) -> Sequence[Artifact]:
"""Artifacts of the hardware loop sample."""
return self.__artifacts
[docs]
class HardwareLoopPlot(ABC):
__hardware_loop: HardwareLoop | None
__title: str
__figure: plt.Figure | None
__axes: VAT | None
def __init__(self, title: str = "") -> None:
# Initialize class attributes
self.__hardware_loop = None
self.__title = title
self.__figure = None
self.__axes = None
@property
def hardware_loop(self) -> HardwareLoop | None:
"""Hardware loop this plot is attached to."""
return self.__hardware_loop
@hardware_loop.setter
def hardware_loop(self, value: HardwareLoop) -> None:
"""Hardware loop this plot is attached to."""
if self.__hardware_loop is not None:
raise RuntimeError("Plot already assigned to a hardware loop.")
self.__hardware_loop = value
@property
def title(self) -> str:
"""Title of the hardware loop plot."""
return self.__title
@property
def figure(self) -> plt.Figure:
"""Figure of the hardware loop plot."""
return self.__figure
@property
def axes(self) -> VAT:
"""Axes of the hardware loop plot."""
return self.__axes
@abstractmethod
def _prepare_figure(self) -> Tuple[plt.Figure, VAT]:
"""Prepare the figure for the hardware loop plot.
Returns:
Tuple[plt.Figure, VAT]:
Figure and axes of the hardware loop plot.
"""
... # pragma: no cover
[docs]
def update_plot(self, sample: HardwareLoopSample) -> None:
"""Update the hardware loop plot.
Internally calls the abstract :meth:`_update_plot` method.
Args:
sample (HardwareLoopSample):
Hardware loop sample to be plotted.
Raises:
RuntimeError: If the hardware loop is not set.
"""
# Assert correct state
if self.hardware_loop is None:
raise RuntimeError("Unable to update transmitted signal plot without device.")
self._update_plot(sample)
@abstractmethod
def _update_plot(self, sample: HardwareLoopSample) -> None:
"""Update the hardware loop plot.
Abstract subroutine of :meth:`HardwareLoopPlot.update_plot`.
Args:
sample (HardwareLoopSample):
Hardware loop sample to be plotted.
"""
... # pragma: no cover
[docs]
class HardwareLoop(
Serializable, Generic[PhysicalScenarioType, PDT], Pipeline[PhysicalScenarioType, PDT]
):
"""Hermespy hardware loop configuration."""
yaml_tag = "HardwareLoop"
"""YAML serialization tag"""
property_blacklist = {"console"}
serialized_attributes = {"scenario", "manual_triggering", "plot_information", "record_drops"}
manual_triggering: bool
"""Require a user input to trigger each drop manually"""
plot_information: bool
"""Plot information during loop runtime"""
record_drops: bool
"""Record drops during loop runtime"""
__dimensions: List[GridDimension] # Parameter dimension over which a run sweeps
__evaluators: List[Evaluator] # Evaluators further processing drop information
__plots: List[HardwareLoopPlot]
__iteration_priority: IterationPriority
def __init__(
self,
scenario: PhysicalScenarioType,
manual_triggering: bool = False,
plot_information: bool = True,
iteration_priority: IterationPriority = IterationPriority.DROPS,
record_drops: bool = True,
**kwargs,
) -> None:
"""
Args:
scenario (PhysicalScenarioType):
The physical scenario being controlled by the hardware loop.
manual_triggering (bool, optional):
Require a keyboard user input to trigger each drop manually.
Disabled by default.
plot_information (bool, optional):
Plot information during loop runtime.
Enabled by default.
iteration_priority (IterationPriority, optional):
Which dimension to iterate over first.
Defaults to :attr:`IterationPriority.DROPS`.
record_drops (bool, optional):
Record drops during loop runtime.
Enabled by default.
"""
# Initialize base classes
Pipeline.__init__(self, scenario=scenario, **kwargs)
# Initialize class attributes
self.manual_triggering = manual_triggering
self.plot_information = plot_information
self.iteration_priority = iteration_priority
self.record_drops = record_drops
self.__dimensions = []
self.__evaluators = []
self.__plots = []
[docs]
def new_dimension(
self, dimension: str, sample_points: List[Any], *args: Tuple[Any]
) -> GridDimension:
"""Add a dimension to the sweep grid.
Must be a property of the :meth:`HardwareLoop.scenario`.
Args:
dimension (str):
String representation of dimension location relative to the investigated object.
sample_points (List[Any]):
List points at which the dimension will be sampled into a grid.
The type of points must be identical to the grid arguments / type.
*args (Tuple[Any], optional):
References to the object the imension belongs to.
Resolved to the investigated object by default,
but may be an attribute or sub-attribute of the investigated object.
Returns: The newly created dimension object.
"""
considered_objects = (self.__scenario) if len(args) < 1 else args
grid_dimension = GridDimension(considered_objects, dimension, sample_points)
self.add_dimension(grid_dimension)
return grid_dimension
[docs]
def add_dimension(self, dimension: GridDimension) -> None:
"""Add a new dimension to the simulation grid.
Args:
dimension (GridDimension):
Dimension to be added.
Raises:
ValueError:
If the `dimension` already exists within the grid.
"""
if dimension in self.__dimensions:
raise ValueError("Dimension instance already registered within the grid")
self.__dimensions.append(dimension)
[docs]
def add_evaluator(self, evaluator: Evaluator) -> None:
"""Add new evaluator to the hardware loop.
Args:
evaluator (Evaluator):
The evaluator to be added.
"""
self.__evaluators.append(evaluator)
[docs]
def add_plot(self, plot: HardwareLoopPlot) -> None:
"""Add a new plot to be visualized by the hardware loop during runtime.
Args:
plot (HardwareLoopPlot):
The plot to be added.
"""
plot.hardware_loop = self
self.__plots.append(plot)
@property
def evaluators(self) -> List[Evaluator]:
"""List of registered evaluators."""
return self.__evaluators
@property
def num_evaluators(self) -> int:
"""Number of registered evaluators.
Returns: The number of evaluators.
"""
return len(self.__evaluators)
[docs]
def evaluator_index(self, evaluator: Evaluator) -> int:
"""Index of the given evaluator.
Args:
evaluator (Evaluator):
The evaluator to be searched for.
Returns: The index of the evaluator.
"""
return self.__evaluators.index(evaluator)
@property
def iteration_priority(self) -> IterationPriority:
"""Iteration priority of the hardware loop."""
return self.__iteration_priority
@iteration_priority.setter
def iteration_priority(self, value: IterationPriority) -> None:
self.__iteration_priority = value
[docs]
def run(self, overwrite=True, campaign: str = "default") -> None:
"""Run the hardware loop configuration.
Args:
overwrite (bool, optional):
Allow the replacement of an already existing savefile.
campaing (str, optional):
Name of the measurement campaign.
"""
# Prepare the results file
# Only required if the results directory is set and drops are recorded
if self.results_dir:
if self.record_drops:
file_location = path.join(self.results_dir, "drops.h5")
# Workaround for scenarios which might not be serializable:
# Create a dummy scenario and patch the operators for recording
if not isinstance(self.scenario, Serializable):
state = PhysicalScenarioDummy()
for device in self.scenario.devices:
state.new_device(
carrier_frequency=device.carrier_frequency,
sampling_rate=device.sampling_rate,
)
# Patch operators for recording
operator_devices = [
(operator.device, self.scenario.device_index(operator.device))
for operator in self.scenario.operators
]
for (_, device_idx), operator in zip(operator_devices, self.scenario.operators):
operator.device = state.devices[device_idx] # type: ignore
self.scenario.record(
file_location, overwrite=overwrite, campaign=campaign, state=state
)
for (_, device_idx), operator in zip(operator_devices, state.operators):
operator.device = self.scenario.devices[device_idx] # type: ignore
else:
self.scenario.record(file_location, overwrite=overwrite, campaign=campaign)
else:
if (
self.console_mode is not ConsoleMode.SILENT
and self.verbosity.value >= Verbosity.INFO.value
):
self.console.print("Skipping drop recording", style="bright_yellow")
# Run internally
self.__run()
# Save results and close file streams
self.scenario.stop()
[docs]
def replay(self, file_location: str) -> None:
"""Replay a stored pipeline run.
Args:
file_location (str):
File system location of the replay.
"""
# Start the scenario replay
self.scenario.replay(file_location)
# Run internally
self.__run()
# Stop the scenario replay
self.scenario.stop()
def __generate_sample(
self, section_indices: Tuple[int, ...], sample_index: int
) -> HardwareLoopSample:
"""Generate a sample from the grid.
Args:
section_indices (Tuple[int, ...]):
The indices of the section within the sample grid.
sample_index (int):
Index of the sample within the section.
Returns: The generated sample.
"""
# Configure the parameters according to the grid indces
for dim, p in zip(self.__dimensions, section_indices):
dim.configure_point(p)
# Generate a new drop (triggers the hardware)
drop = self.scenario.drop()
# Generate evaluations and artifacts
evaluations = [e.evaluate() for e in self.__evaluators]
artifacts = [e.artifact() for e in evaluations]
# Return sample
return HardwareLoopSample(drop, evaluations, artifacts)
def __run(self) -> None:
"""Internal run method executing drops"""
# Initialize plots
if self.plot_information:
with plt.ion() and self.style_context(): # pragma: no cover
for plot in self.__plots:
plot.prepare_figure()
# Tile the generated figures
tile_figures(2, 4)
# runtime = HardwareLoopRuntime(self.__devices, self.__dimensions, self.__evaluators, self.plot_information)
sample_grid = SampleGrid(self.__dimensions, self.__evaluators)
# Print indicator that the simulation is starting
if (
self.console_mode is not ConsoleMode.SILENT
and self.verbosity.value >= Verbosity.INFO.value
): # pragma: no cover
self.console.print() # Just an empty line
self.console.rule("Hardware Loop")
self.console.print() # Just an empty line
# Prepare the console
progress = Progress(
SpinnerColumn(),
*Progress.get_default_columns(),
TimeElapsedColumn(),
transient=True,
console=self.console,
)
confirm = Confirm(console=self.console)
num_total_drops = self.num_drops
grid_tasks = []
for grid_dimension in self.__dimensions:
grid_tasks.append(
progress.add_task(
"[cyan]" + grid_dimension.title, total=grid_dimension.num_sample_points
)
)
num_total_drops *= grid_dimension.num_sample_points
loop_drop_progress = progress.add_task("[green]Drops", total=self.num_drops)
total_progress = progress.add_task("[red]Progress", total=num_total_drops)
with ExitStack() as stack:
# Add the progress bar to the context stack
if self.console_mode == ConsoleMode.INTERACTIVE:
stack.enter_context(progress)
# If the plot information is enabled,
# add the interactive plot to the context stack to enable live updates
if self.plot_information:
stack.enter_context(plt.ion())
# Start counting the total number of completed drops
total = 0
# Generate the number of required drops
index_grid: Tuple[int, ...] = tuple()
if self.iteration_priority is IterationPriority.GRID:
index_grid = (self.num_drops, *[d.num_sample_points for d in self.__dimensions])
grid_selector = slice(1, 1 + len(self.__dimensions))
drop_selector = 0
elif self.iteration_priority is IterationPriority.DROPS:
index_grid = (*[d.num_sample_points for d in self.__dimensions], self.num_drops)
grid_selector = slice(0, len(self.__dimensions))
drop_selector = len(self.__dimensions)
else:
# This should never happen
raise RuntimeError(f"Invalid iteration priority: {self.iteration_priority}")
for indices in np.ndindex(index_grid):
sample_index = indices[drop_selector]
section_indices = indices[grid_selector]
# Ask for a trigger input if manual mode is enabled
# Abort execution if user denies
if self.manual_triggering: # pragma: no cover
progress.live.stop()
if not confirm.ask(f"Trigger next drop ({total+1})?"):
break
progress.live.start()
# Generate the next drop
try:
# Generate a new samples
loop_sample = self.__generate_sample(section_indices, sample_index)
grid_sample = MonteCarloSample(
section_indices, sample_index, loop_sample.artifacts
)
# Save sample
sample_grid[section_indices].add_samples(grid_sample, self.__evaluators)
# Print results
if (
self.console_mode is not ConsoleMode.SILENT
and self.verbosity.value <= Verbosity.INFO.value
):
result_str = f"# {total:<5}"
for dimesion, i in zip(self.__dimensions, indices[grid_selector]):
result_str += f" {dimesion.title} = {dimesion.sample_points[i].title}"
for evaluator, artifact in zip(self.__evaluators, loop_sample.artifacts):
result_str += f" {evaluator.abbreviation}: {str(artifact):5}"
if self.console_mode == ConsoleMode.INTERACTIVE:
progress.log(result_str)
elif self.console_mode != ConsoleMode.SILENT:
self.console.log(result_str)
# Update plots
if self.plot_information:
for plot in self.__plots:
plot.update_plot(loop_sample)
except Exception as e:
self._handle_exception(e, confirm=False)
# Update progress
total += 1
for task, completed in zip(grid_tasks, indices[grid_selector]):
progress.update(task, completed=completed)
progress.update(loop_drop_progress, completed=indices[drop_selector])
progress.update(total_progress, completed=total)
# Compute the evaluation results
result: MonteCarloResult = MonteCarloResult(
self.__dimensions, self.__evaluators, sample_grid, 0.0
)
# Generate result plots
result_figures = result.plot()
# Save results if a directory was provided
if self.results_dir:
result.save_to_matlab(path.join(self.results_dir, "results.mat"))
for idx, (figure_base, evaluator) in enumerate(zip(result_figures, self.__evaluators)):
figure = figure_base.get_figure()
if figure is not None:
figure.savefig(
path.join(self.results_dir, f"result_{idx}_{evaluator.abbreviation}.png"),
format="png",
)
if self.plot_information:
plt.show()
@classmethod
def to_yaml(
cls: Type[HardwareLoop], representer: SafeRepresenter, node: HardwareLoop
) -> MappingNode:
# Prepare dimensions
dimension_fields: List[Mapping[str, Any]] = []
for dimension in node.__dimensions:
dimension_fields.append(
{
"objects": dimension.considered_objects,
"property": dimension.dimension,
"points": [p.value for p in dimension.sample_points],
"title": dimension.title,
}
)
additional_fields = {"Evaluators": node.__evaluators, "Dimensions": dimension_fields}
return node._mapping_serialization_wrapper(representer, additional_fields=additional_fields)
@classmethod
def from_yaml(
cls: Type[HardwareLoop], constructor: SafeConstructor, node: Node
) -> HardwareLoop:
state = constructor.construct_mapping(node, deep=True)
state.pop("Operators", [])
evaluators: List[Evaluator] = state.pop("Evaluators", [])
dimensions: List[Mapping[str, Any]] = state.pop("Dimensions", [])
# Initialize the hardware loop
hardware_loop: HardwareLoop = cls.InitializationWrapper(state)
# Register evaluators
for evaluator in evaluators:
hardware_loop.add_evaluator(evaluator)
# Add sweeping dimensions
for dimension in dimensions:
new_dim = hardware_loop.new_dimension(
dimension["property"], dimension["points"], *dimension["objects"]
)
title = dimension.get("title", None)
if title is not None:
new_dim.title = title
# Return fully configured hardware loop
return hardware_loop