Source code for hermespy.core.evaluators

# -*- coding: utf-8 -*-

from __future__ import annotations

from __future__ import annotations
from collections.abc import Sequence
import matplotlib.pyplot as plt

import numpy as np

from .device import Receiver, Reception, Transmitter, Transmission
from .hooks import Hook
from .monte_carlo import Artifact, Evaluation, ScalarEvaluationResult, Evaluator, GridDimension
from .visualize import StemVisualization, VAT

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class PowerResult(ScalarEvaluationResult): """The result of a received power evaluation. Final result of power evaluations after all evaluation artifacts have been collected. """ __average_powers: np.ndarray def __init__( self, average_powers: np.ndarray, grid: Sequence[GridDimension], evaluator: Evaluator, plot_surface: bool = False, ) -> None: """ Args: average_powers (numpy.ndarray): The received power in Watts for each antenna stream. grid (Sequence[GridDimension]): The grid dimensions of the evaluation. evaluator (Evaluator): The evaluator that generated this result. plot_surface (bool): Whether two-dimensional evaluations should be plotted as surface plots. """ # Initialize the base class scalars = np.sum(average_powers, axis=-1, keepdims=False) ScalarEvaluationResult.__init__(self, grid, scalars, evaluator, plot_surface) # Initialize class members self.__average_powers = average_powers @property def average_powers(self) -> np.ndarray: """The received power in Watts for each antenna stream.""" return self.__average_powers
[docs] def to_array(self) -> np.ndarray: return self.__average_powers
[docs] class PowerArtifact(Artifact): """The artifact of a received power evaluation. Generated by the :meth:`artifact<hermespy.core.evaluators.PowerEvaluation.artifact>` of :class:`PowerEvaluation<hermespy.core.evaluators.PowerEvaluation>`. """ __power: np.ndarray def __init__(self, power: np.ndarray) -> None: """ Args: power: The received power in Watts for each antenna stream. """ # Initialize the base class Artifact.__init__(self) # Initialize class members self.__power = power @property def power(self) -> np.ndarray: """The received power in Watts for each antenna stream.""" return self.__power def __str__(self) -> str: power_db = 10 * np.log10(np.sum(self.__power)) return f"{power_db:.2} dB"
[docs] def to_scalar(self) -> float: return float(np.sum(self.__power))
[docs] class PowerEvaluation(Evaluation[StemVisualization]): """The evaluation of a received power evaluation. Generated by evaluating power evaluators. """ __power: np.ndarray def __init__(self, power: np.ndarray) -> None: """ Args: power: The received power in Watts for each antenna stream. """ # Initialize the base class super().__init__() # Initialize class members self.__power = power @property def power(self) -> np.ndarray: """The received power indicator in V^2 for each antenna stream.""" return self.__power
[docs] def artifact(self) -> PowerArtifact: return PowerArtifact(self.__power)
@property def title(self) -> str: return "Received Power" def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> StemVisualization: ax: plt.Axes = axes.flat[0] ax.set_xlabel("Antenna Stream") ax.set_ylabel("Power [W]") container = ax.stem(np.zeros_like(self.__power)) return StemVisualization(figure, axes, container) def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None: visualization.container.markerline.set_ydata(self.__power)
[docs] class ReceivePowerEvaluator(Evaluator): """Estimates the signal power received receivers.""" __receive_hook: Hook[Reception] __reception: Reception | None def __init__(self, target: Receiver, plot_scale: str = "log") -> None: """ Args: target (Receiver): The device or receiver to measure the received power of. plot_scale (str): The scale of the plot. Can be ``'linear'`` or ``'log'``. """ # Initialize the base class Evaluator.__init__(self) # Initialize class members self.__receive_hook = target.add_receive_callback(self.__receive_callback) self.__reception = None self.plot_scale = plot_scale def __receive_callback(self, reception: Reception) -> None: """Callback function notifying the evaluator of a new reception.""" self.__reception = reception
[docs] def evaluate(self) -> PowerEvaluation: if self.__reception is None: raise RuntimeError( "Power evaluator could not fetch reception. Has the receiver received data?" ) power = self.__reception.signal.power return PowerEvaluation(power)
@property def abbreviation(self) -> str: return "RxPwr" @property def title(self) -> str: return "Receive Power"
[docs] def generate_result(self, grid: Sequence[GridDimension], artifacts: np.ndarray) -> PowerResult: # Find the maximum number of receive ports over all artifacts max_ports = max( max(artifact.power.size for artifact in artifacts) for artifacts in artifacts.flat ) average_powers = np.zeros((*artifacts.shape, max_ports), dtype=np.float64) for grid_index, artifacts in np.ndenumerate(artifacts): for artifact in artifacts: average_powers[grid_index] += artifact.power num_artifacts = len(artifacts) if num_artifacts > 0: average_powers[grid_index] /= len(artifacts) return PowerResult(average_powers, grid, self)
def __del__(self) -> None: self.__receive_hook.remove()
[docs] class TransmitPowerEvaluator(Evaluator): """Estimates the signal power transmitted by transmitters.""" __transmit_hook: Hook[Transmission] __transmission: Transmission | None def __init__(self, target: Transmitter, plot_scale: str = "log") -> None: """ Args: target (Transmitter): The device or receiver to measure the received power of. plot_scale (str): The scale of the plot. Can be ``'linear'`` or ``'log'``. """ # Initialize the base class Evaluator.__init__(self) # Initialize class members self.__transmit_hook = target.add_transmit_callback(self.__transmit_callback) self.__transmission = None self.plot_scale = plot_scale def __transmit_callback(self, transmission: Transmission) -> None: """Callback function notifying the evaluator of a new transmission.""" self.__transmission = transmission
[docs] def evaluate(self) -> PowerEvaluation: if self.__transmission is None: raise RuntimeError( "Power evaluator could not fetch transmission. Has the transmitter transmitted data?" ) return PowerEvaluation(self.__transmission.signal.power)
@property def abbreviation(self) -> str: return "TxPwr" @property def title(self) -> str: return "Transmit Power"
[docs] def generate_result(self, grid: Sequence[GridDimension], artifacts: np.ndarray) -> PowerResult: # Find the maximum number of receive ports over all artifacts max_ports = max( max(artifact.power.size for artifact in artifacts) for artifacts in artifacts.flat ) average_powers = np.zeros((*artifacts.shape, max_ports), dtype=np.float64) for grid_index, artifacts in np.ndenumerate(artifacts): for artifact in artifacts: average_powers[grid_index] += artifact.power num_artifacts = len(artifacts) if num_artifacts > 0: average_powers[grid_index] /= len(artifacts) return PowerResult(average_powers, grid, self)
def __del__(self) -> None: self.__transmit_hook.remove()