Source code for hermespy.hardware_loop.visualizers

# -*- coding: utf-8 -*-
"""
=============
Visualizers
=============
"""

from __future__ import annotations
from abc import ABC
from typing import Generic, Literal, Tuple

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np

from hermespy.core import (
    Evaluator,
    Hook,
    Signal,
    VAT,
    VT,
    PlotVisualization,
    ScatterVisualization,
    ValueType,
)
from hermespy.modem import CommunicationReception, ReceivingModem
from hermespy.radar import Radar, RadarCube, RadarReception
from hermespy.tools import lin2db
from .hardware_loop import HardwareLoopPlot, HardwareLoopSample
from .physical_device import PhysicalDevice

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class SignalPlot(HardwareLoopPlot[PlotVisualization], ABC): """Base class of hardware loop plots visualizing signal models.""" __space: Literal["time", "frequency", "both"] def __init__( self, title: str = "", space: Literal["time", "frequency", "both"] = "time" ) -> None: # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__space = space @property def space(self) -> Literal["time", "frequency", "both"]: """Space in which the signal is to be plotted.""" return self.__space def _initialize_subplots(self, num_streams: int) -> Tuple[plt.Figure, VAT]: """Initialize the suplots for a signal model visualization. Subroutine of :meth:`HardwareLoopPlot._prepare_plot` for classes inheriting from :class:`SignalPlot`. Args: num_streams (int): Then number of expected streams. Returns: The figure and axes of the subplots. """ figure, axes = plt.subplots(num_streams, 2 if self.space == "both" else 1, squeeze=False) return figure, axes def _plot_initial_signal(self, signal: Signal, axes: VAT) -> PlotVisualization: # Plot transmitted signal return signal.plot.visualize(axes, space=self.__space) def _update_signal_plot(self, signal: Signal, visualization: PlotVisualization) -> None: """Update the plot of a signal model. Subroutine of :meth:`HardwareLoopPlot._update_plot` for classes inheriting from :class:`SignalPlot`. Args: signal (Signal): Signal model to be plotted. visualization (PlotVisualization): Visualization to be updated. """ signal.plot.update_visualization(visualization, space=self.__space)
[docs] class HardwareLoopDevicePlot(Generic[VT], HardwareLoopPlot[VT], ABC): """Base class for plots of information generated by a device.""" __device: PhysicalDevice def __init__(self, device: PhysicalDevice, title: str | None = None) -> None: """ Args: device (PhysicalDevice): Physical device of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. """ # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__device = device @property def device(self) -> PhysicalDevice: """Physical device of which information is to be plotted.""" return self.__device
[docs] class DeviceTransmissionPlot(HardwareLoopDevicePlot[PlotVisualization], SignalPlot): """Plot base-band signals transmitted by a device.""" def __init__( self, device: PhysicalDevice, title: str | None = None, space: Literal["time", "frequency", "both"] = "time", ) -> None: """ Args: device (PhysicalDevice): Physical device of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. space (Literal["time", "frequency", "both"], optional): Space in which the signal is to be plotted. By the default, the signal is plotted in the time domain. """ # Initialize base classes HardwareLoopDevicePlot.__init__(self, device, title) SignalPlot.__init__(self, title, space) @property def _default_title(self) -> str: return "Device Transmission" def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = self._initialize_subplots(self.device.num_transmit_antennas) return figure, axes def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> PlotVisualization: return self._plot_initial_signal( sample.drop.device_transmissions[ self.hardware_loop.device_index(self.device) ].mixed_signal, axes, ) def _update_plot(self, sample: HardwareLoopSample, visualization: PlotVisualization) -> None: self._update_signal_plot( sample.drop.device_transmissions[ self.hardware_loop.device_index(self.device) ].mixed_signal, visualization, )
[docs] class DeviceReceptionPlot(HardwareLoopDevicePlot[PlotVisualization], SignalPlot): """Plot base-band signals received by a device.""" def __init__( self, device: PhysicalDevice, title: str | None = None, space: Literal["time", "frequency"] = "time", ) -> None: """ Args: device (PhysicalDevice): Physical device of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. space (Literal["time", "frequency", "both"], optional): Space in which the signal is to be plotted. By the default, the signal is plotted in the time domain. """ # Initialize base classes HardwareLoopDevicePlot.__init__(self, device, title) SignalPlot.__init__(self, title, space) @property def _default_title(self) -> str: return "Device Reception" def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = self._initialize_subplots(self.device.num_receive_antenna_ports) return figure, axes def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> PlotVisualization: return self._plot_initial_signal( sample.drop.device_receptions[ self.hardware_loop.device_index(self.device) ].impinging_signals[0], axes, ) def _update_plot(self, sample: HardwareLoopSample, visualization: PlotVisualization) -> None: self._update_signal_plot( sample.drop.device_receptions[ self.hardware_loop.device_index(self.device) ].impinging_signals[0], visualization, )
[docs] class EyePlot(HardwareLoopPlot[PlotVisualization]): """Plot eye diagrams of a received signal.""" __hook: Hook[CommunicationReception] __reception: CommunicationReception def __init__(self, modem: ReceivingModem, title: str | None = None) -> None: """ Args: modem (ReceivingModem): Modem of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. """ # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__hook = modem.add_receive_callback(self.__update_reception) self.__reception = CommunicationReception(Signal.Empty(1.0, 1), []) def __update_reception(self, reception: CommunicationReception) -> None: """Callback envoked by receiving modem to notify the plot about new receptions. Args: reception (CommunicationReception): The most recent reception. """ self.__reception = reception @property def _default_title(self) -> str: return "Eye Plot" def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = plt.subplots(1, 1, squeeze=False) return figure, axes def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> PlotVisualization: if self.__reception.num_frames < 1: raise RuntimeError("No frames received yet") symbol_duration = ( self.__reception.frames[0].signal.duration / self.__reception.frames[0].symbols.raw.size ) return self.__reception.frames[0].signal.eye.visualize( ymbol_duration=symbol_duration, axes=axes ) def _update_plot(self, sample: HardwareLoopSample, visualization: PlotVisualization) -> None: if self.__reception.num_frames < 1: raise RuntimeError("No frames received yet") symbol_duration = ( self.__reception.frames[0].signal.duration / self.__reception.frames[0].symbols.raw.size ) self.__reception.frames[0].signal.eye.update_visualization( symbol_duration=symbol_duration, visualization=visualization ) def __del__(self) -> None: self.__hook.remove()
[docs] class ReceivedConstellationPlot(HardwareLoopPlot[ScatterVisualization]): """Plot the constellation diagram of a received signal.""" __reception: CommunicationReception __hook: Hook[CommunicationReception] def __init__(self, modem: ReceivingModem, title: str | None = None) -> None: """ Args: modem (ReceivingModem): Modem of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. """ # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__reception = CommunicationReception(Signal.Empty(1.0, 1), []) self.__hook = modem.add_receive_callback(self.__update_reception) def __update_reception(self, reception: CommunicationReception) -> None: """Callback invoked by receiving modem to notify the plot about new receptions. Args: reception (CommunicationReception): The most recent reception. """ self.__reception = reception @property def _default_title(self) -> str: return "Received Symbol Constellation" def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = plt.subplots(1, 1, squeeze=False) return figure, axes def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> ScatterVisualization: return self.__reception.equalized_symbols.plot_constellation.visualize(axes=axes) def _update_plot(self, sample: HardwareLoopSample, visualization: ScatterVisualization) -> None: self.__reception.equalized_symbols.plot_constellation.update_visualization(visualization) def __del__(self) -> None: self.__hook.remove()
[docs] class RadarRangePlot(HardwareLoopPlot[PlotVisualization]): """Plot of a radar's range-power profile.""" "" __reception: RadarReception __hook: Hook[RadarReception] __radar: Radar def __init__(self, radar: Radar, title: str | None = None) -> None: """ Args: radar (Radar): Radar of which information is to be plotted. title (str, optional): Title of the hardware loop plot. If not specified, resorts to the default title of the plot. """ # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__reception = None self.__hook = radar.add_receive_callback(self.__update_reception) self.__radar = radar def __update_reception(self, reception: RadarReception) -> None: """Callback invoked by radar to notify the plot about new receptions. Args: reception (RadarReception): The most recent reception. """ self.__reception = reception @property def radar(self) -> Radar: """Radar of which information is to be plotted.""" return self.__radar @property def _default_title(self) -> str: return "Range-Power Profile" def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = plt.subplots(1, 1, squeeze=False) return figure, axes def __get_cube(self) -> RadarCube: """Fetch the radar cube from the radar reception. Returns: The cube. """ if not self.__reception: raise RuntimeError("Radar reception is not available") cube = self.__reception.cube cube.normalize_power() # This might be a problem since the normalization is in-place return self.__reception.cube def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> PlotVisualization: cube = self.__get_cube() plot = cube.plot_range(axes=axes) # axes[0, 0].set_ylim((0.0, 1.1)) return plot def _update_plot(self, sample: HardwareLoopSample, visualization: PlotVisualization) -> None: self.__get_cube().plot_range.update_visualization(visualization) def __del__(self) -> None: self.__hook.remove()
[docs] class HardwareLoopEvaluatorPlot(Generic[VT], HardwareLoopPlot[VT], ABC): """Common base class for plots of information generated by an evaluator.""" __evaluator: Evaluator def __init__(self, evaluator: Evaluator, title: str = "") -> None: # Initialize base class HardwareLoopPlot.__init__(self, title) # Initialize class attributes self.__evaluator = evaluator @property def _default_title(self) -> str: return self.evaluator.title @property def evaluator(self) -> Evaluator: """Evaluator of which information is to be plotted.""" return self.__evaluator @property def evaluator_index(self) -> int: """Index of the evaluator within the HardwareLoop's configuration.""" return self.hardware_loop.evaluator_index(self.evaluator) def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = plt.subplots(1, 1, squeeze=False) return figure, axes
[docs] class EvaluationPlot(Generic[VT], HardwareLoopEvaluatorPlot[VT]): """Plot of an evaluation.""" def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> VT: return sample.evaluations[self.evaluator_index].visualize(axes) def _update_plot(self, sample: HardwareLoopSample, visualization: VT) -> None: sample.evaluations[self.evaluator_index].update_visualization(visualization)
[docs] class ArtifactPlot(HardwareLoopEvaluatorPlot[PlotVisualization]): """Plot of an evaluation's scalar artifact.""" __artifact_queue: np.ndarray __y_axis_limits: Tuple[float, float] | None def __init__( self, evaluator: Evaluator, title: str | None = None, queue_length: int = 20, y_axis_limits: Tuple[float, float] | None = None, ) -> None: # Initialize base class HardwareLoopEvaluatorPlot.__init__(self, evaluator, title) # Initialize class attributes self.__artifact_queue = np.nan * np.ones(queue_length, dtype=float) self.__artifact_indices = np.arange(queue_length) self.__y_axis_limits = y_axis_limits def _prepare_plot(self) -> Tuple[plt.Figure, VAT]: figure, axes = HardwareLoopEvaluatorPlot._prepare_plot(self) ax: plt.Axes = axes.flat[0] ax.set_xlabel("Drop Index") ax.set_ylabel(self.evaluator.abbreviation) ax.set_xlim(0, self.__artifact_queue.size - 1) if self.__y_axis_limits: ax.set_ylim(self.__y_axis_limits) if self.evaluator.tick_format is ValueType.DB: ax.xaxis.set_major_formatter(ticker.FuncFormatter(lambda x, _: f"{lin2db(x):.2g}dB")) return figure, axes def __update_artifact_queue(self, sample: HardwareLoopSample) -> None: """Update the artifact queue with the most recent artifact. Subroutine of :meth:`ArtifactPlot._update_plot` and :meth:`ArtifactPlot._initial_plot`. Args: sample (HardwareLoopSample): The most recent sample. """ self.__artifact_queue = np.roll(self.__artifact_queue, 1) self.__artifact_queue[0] = sample.artifacts[ self.hardware_loop.evaluator_index(self.evaluator) ].to_scalar() def _initial_plot(self, sample: HardwareLoopSample, axes: VAT) -> PlotVisualization: # Update artifact queue self.__update_artifact_queue(sample) # Plot artifact queue ax: plt.Axes = axes.flat[0] lines = np.empty_like(axes, dtype=np.object_) lines[0, 0] = ax.plot(self.__artifact_indices, self.__artifact_queue) ax.set_yscale(self.evaluator.plot_scale) return PlotVisualization(axes[0, 0].get_figure(), axes, lines) def _update_plot(self, sample: HardwareLoopSample, visualization: PlotVisualization) -> None: # Update artifact queue self.__update_artifact_queue(sample) # Update visualization visualization.lines[0, 0][0].set_ydata(self.__artifact_queue) # Rescale the visualization visualization.axes[0, 0].relim() visualization.axes[0, 0].autoscale_view(True, True, True)