Source code for hermespy.radar.evaluators

# -*- coding: utf-8 -*-
"""
================
Radar Evaluation
================

This module introduces several evaluators for performance indicators in radar detection.
Refer to the :doc:`PyMonte</api/core.monte_carlo>` documentation for a detailed introduction to the concept of
:class:`Evaluators<hermespy.core.monte_carlo.Evaluator>`.

.. autoclasstree:: hermespy.modem.evaluators
   :alt: Communication Evaluator Class Tree
   :strict:
   :namespace: hermespy

The implemented :class:`RadarEvaluator<.RadarEvaluator>` all inherit from the identically named common
base which gets initialized by selecting one :class:`Modem<hermespy.modem.modem.Modem>`, whose performance will be
evaluated and one :class:`RadarChannel<hermespy.channel.radar_channel.RadarChannel>` instance, containing the ground
truth.
The currently considered performance indicators are

========================================= ================================ ============================================================
Evaluator                                 Artifact                         Performance Indicator
========================================= ================================ ============================================================
:class:`.DetectionProbEvaluator`          :class:`.DetectionProbArtifact`  Probability of detecting the target at the right bin
:class:`.ReceiverOperatingCharacteristic` :class:`.RocArtifact`            Probability of detection versus probability of false alarm
:class`.RootMeanSquareError`              :class:`.RootMeanSquareArtifact` Root mean square error of point detections
========================================= ================================ ============================================================

Configuring :class:`RadarEvaluators<.RadarEvaluator>` to evaluate the radar detection of
:class:`Modem<hermespy.modem.modem.Modem>` instances is rather straightforward:

.. code-block:: python

   # Create two separate modem instances
   modem = Modem()
   channel = RadarChannel()

   # Create a radar evaluation as an evaluation example
   radar_evaluator = DetectionProbEvaluator(modem, channel)

   # Extract evaluation
   radar_evaluation = radar_evaluator.evaluate()

   # Visualize evaluation
   radar_evaluation.plot()

"""

from __future__ import annotations
from abc import ABC
from collections.abc import Sequence
from itertools import product
from typing import Type, Union

import matplotlib.pyplot as plt
import numpy as np
from h5py import File
from rich import get_console
from rich.console import Console
from rich.progress import track
from scipy.stats import uniform

from hermespy.core import (
    Executable,
    Hook,
    PlotVisualization,
    ReplayScenario,
    ScatterVisualization,
    Scenario,
    ScenarioMode,
    Serializable,
    VAT,
)
from hermespy.core.monte_carlo import (
    Evaluator,
    Evaluation,
    EvaluationResult,
    EvaluationTemplate,
    GridDimension,
    ArtifactTemplate,
    Artifact,
    ScalarEvaluationResult,
)
from hermespy.radar import Radar, RadarReception
from hermespy.channel.radar.radar import RadarChannelBase, RadarChannelSample
from hermespy.radar.cube import RadarCube
from hermespy.radar.detection import RadarPointCloud
from hermespy.simulation import (
    SimulatedDevice,
    ProcessedSimulatedDeviceInput,
    SimulatedDeviceOutput,
)


__author__ = "André Noll Barreto"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "André Noll Barreto"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class RadarEvaluator(Evaluator, ABC): """Bastract base class for evaluating sensing performance. Inherits from the abstract :class:`Evaluator<hermespy.core.monte_carlo.Evaluator>` base class. Expects the abstract method :meth:`evaluate` as well as the abstract properties :meth:`abbreviation<abbreviation>` and :meth:`title<title>` to be implemented. There are currently three different :class:`RadarEvaluators<.RadarEvaluator>` implemented: .. toctree:: radar.evaluators.DetectionProbEvaluator radar.evaluators.ReceiverOperatingCharacteristic radar.evaluators.RootMeanSquareError """ __receiving_radar: Radar # Handle to the radar receiver __receiving_device: SimulatedDevice __transmitting_device: SimulatedDevice __radar_channel: RadarChannelBase __reception: RadarReception | None __receive_hook: Hook[RadarReception] __channel_sample: RadarChannelSample | None def __init__( self, receiving_radar: Radar, transmitting_device: SimulatedDevice, receiving_device: SimulatedDevice, radar_channel: RadarChannelBase, ) -> None: """ Args: receiving_radar (Radar): Radar detector to be evaluated. transmitting_device (SimulatedDevice): Device transmitting into the evaluated channel. receiving_device (SimulatedDevice): Device receiving from the evaluated channel. The `receiving_radar` must be a receive DSP algorithm of this device. radar_channel (RadarChannelBase): The radar channel containing the ground truth to be evaluated. Raises: ValueError: If the receiving radar is not an operator of the radar_channel receiver. """ # Initialize base class Evaluator.__init__(self) # Check if the receiving radar is a dsp algorithm of the receiving device if receiving_radar not in receiving_device.receivers: raise ValueError( "Receiving radar not registered as DSP algorithm of the receiving device" ) # Initialize class attributes self.__transmitting_device = transmitting_device self.__receiving_device = receiving_device self.__receiving_radar = receiving_radar self.__radar_channel = radar_channel # Register receive callback self.__reception = None self.__receive_hook = receiving_radar.add_receive_callback(self.__receive_callback) # Register sample callback self.__channel_sample = None radar_channel.add_sample_hook( self.__sample_callback, self.__transmitting_device, self.__receiving_device ) def __receive_callback(self, reception: RadarReception) -> None: """Callback for receiving radar receptions. Args: reception (RadarReception): Received radar reception. """ self.__reception = reception def __sample_callback(self, sample: RadarChannelSample) -> None: """Callback for sampling radar channel realizations. Args: sample (RadarChannelSample): Sampled radar channel realization. """ self.__channel_sample = sample def _fetch_reception(self) -> RadarReception: """Fetch the most recent radar reception. Returns: The most recent radar reception. Raises: RuntimeError: If no reception is available. """ if self.__reception is None: raise RuntimeError( "No reception available. Has the radar's receive method been called?" ) return self.__reception def _fetch_pcl(self) -> RadarPointCloud: """Fetch the most recent radar point cloud. Returns: The most recent radar point cloud. Raises: RunmtineError: If no reception is available or the reception does not contain a point cloud. """ reception = self._fetch_reception() if reception.cloud is None: raise RuntimeError( "No point cloud available. Has the radar's receive method been called?" ) return reception.cloud def _fetch_channel(self) -> RadarChannelSample: """Fetch the most recent radar channel sample. Returns: The most recent radar channel sample. Raises: RuntimeError: If no channel sample is available. """ if self.__channel_sample is None: raise RuntimeError("No channel sample available. Has the radar channel been sampled?") return self.__channel_sample @property def receiving_device(self) -> SimulatedDevice: """Device receiving from the evaluated channel.""" return self.__receiving_device @property def transmitting_device(self) -> SimulatedDevice: """Device transmitting into the evaluated channel.""" return self.__transmitting_device @property def receiving_radar(self) -> Radar: """Radar detector, the output of which is to be evaluated.""" return self.__receiving_radar @property def radar_channel(self) -> RadarChannelBase: """The considered radar channel.""" return self.__radar_channel
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> EvaluationResult: return ScalarEvaluationResult(grid, artifacts, self)
def __del__(self) -> None: self.__receive_hook.remove()
[docs] class DetectionProbArtifact(ArtifactTemplate[bool]): """Artifacto of the probability of detection for a radar detector. Represents a boolean indicator of whether a target was detected or not. Generated by the :class:`DetectionProbabilityEvaluation<.DetectionProbabilityEvaluation>`'s :meth:`artifact()<DetectionProbabilityEvaluation.artifact>` method. """ ... # pragma: no cover
[docs] class DetectionProbabilityEvaluation(EvaluationTemplate[bool, ScatterVisualization]): """Evaluation of the probability of detection for a radar detector. Represents a boolean indicator of whether a target was detected or not. Generated by the :class:`DetectionProbEvaluator<.DetectionProbEvaluator>`'s :meth:`evaluate()<DetectionProbEvaluator.evaluate>` method. """ def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> ScatterVisualization: # pragma: no cover raise NotImplementedError("Detection probability evaluation does not support visualization") def _update_visualization( self, visualization: ScatterVisualization, **kwargs ) -> None: # pragma: no cover # ToDo: Implement updating the single-item scatter plot raise NotImplementedError("Detection probability evaluation does not support visualization")
[docs] def artifact(self) -> DetectionProbArtifact: return DetectionProbArtifact(self.evaluation)
[docs] class DetectionProbEvaluator(Evaluator, Serializable): """Estimates the probability of detection for a given radar detector. Assumes a successful detection if the :class:`Radar's<hermespy.radar.radar.Radar>` :meth:`reception<hermespy.radar.radar.Radar.reception>` contains a non-empty point cloud. This is the case if the configured :class:`RadarDetector<hermespy.radar.detection.RadarDetector>` made a positive decision for any bin within the processed :class:`RadarCube<hermespy.radar.cube.RadarCube>`. A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>` evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be: .. literalinclude:: ../scripts/examples/radar_evaluators_DetectionProbEvaluator.py :language: python :linenos: :lines: 03-27 """ yaml_tag = "DetectionProbEvaluator" __radar: Radar __cloud: RadarPointCloud | None __hook: Hook[RadarReception] def __init__(self, radar: Radar) -> None: """ Args: radar (Radar): Radar detector to be evaluated. """ # Initialize base class Evaluator.__init__(self) self.plot_scale = "log" # Plot logarithmically by default # Initialize class attributes self.__cloud = None self.__radar = radar self.__hook = radar.add_receive_callback(self.__receive_callback) def __receive_callback(self, reception: RadarReception) -> None: """Callback for receiving radar receptions. Args: reception (RadarReception): Received radar reception. """ self.__cloud = reception.cloud @property def radar(self) -> Radar: """Radar detector to be evaluated.""" return self.__radar @property def abbreviation(self) -> str: return "PD" @property def title(self) -> str: return "Probability of Detection Evaluation" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> ScalarEvaluationResult: return ScalarEvaluationResult.From_Artifacts(grid, artifacts, self)
[docs] def evaluate(self) -> DetectionProbabilityEvaluation: if self.__cloud is None: raise RuntimeError( "Detection evaluation requires a detector to be configured at the radar" ) # Verify if a target is detected in any bin detection = self.__cloud.num_points > 0 return DetectionProbabilityEvaluation(detection)
def __del__(self) -> None: self.__hook.remove()
[docs] class RocArtifact(Artifact): """Artifact of receiver operating characteristics (ROC) evaluation""" __h0_value: float __h1_value: float def __init__(self, h0_value: float, h1_value: float) -> None: """ Args: h0_value (float): Measured value for null-hypothesis (H0), i.e., noise only h1_value (float): Measured value for alternative hypothesis (H1) """ self.__h0_value = h0_value self.__h1_value = h1_value def __str__(self) -> str: return f"({self.__h0_value:.4}, {self.__h1_value:.4})" @property def h0_value(self) -> float: return self.__h0_value @property def h1_value(self) -> float: return self.__h1_value
[docs] def to_scalar(self) -> None: return None
[docs] class RocEvaluation(Evaluation[PlotVisualization]): """Evaluation of receiver operating characteristics (ROC)""" __cube_h0: RadarCube __cube_h1: RadarCube def __init__(self, cube_h0: RadarCube, cube_h1: RadarCube) -> None: """ Args: cube_h0 (RadarCube): H0 hypothesis radar cube. cube_h1 (RadarCube): H1 hypothesis radar cube. """ # Initialize base class super().__init__() # Initialize class attributes self.__cube_h0 = cube_h0 self.__cube_h1 = cube_h1 @property def cube_h0(self) -> RadarCube: """H0 hypothesis radar cube.""" return self.__cube_h0 @property def cube_h1(self) -> RadarCube: """H1 hypothesis radar cube.""" return self.__cube_h1
[docs] def artifact(self) -> RocArtifact: h0_value = self.__cube_h0.data.max() h1_value = self.__cube_h1.data.max() return RocArtifact(h0_value, h1_value)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: lines = np.empty_like(axes, dtype=np.object_) h0_lines = axes[0, 0].plot( self.__cube_h0.range_bins, np.zeros_like(self.__cube_h0.range_bins) ) h1_lines = axes[0, 0].plot( self.__cube_h1.range_bins, np.zeros_like(self.__cube_h1.range_bins) ) lines[0, 0] = h0_lines + h1_lines return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: _lines = visualization.lines[0, 0] _lines[0].set_ydata(self.__cube_h0.data) _lines[1].set_ydata(self.__cube_h1.data)
[docs] class RocEvaluationResult(EvaluationResult): """Final result of an receive operating characteristcs evaluation.""" __detection_probabilities: np.ndarray __false_alarm_probabilities: np.ndarray def __init__( self, detection_probabilities: np.ndarray, false_alarm_probabilities: np.ndarray, grid: Sequence[GridDimension], evaluator: ReceiverOperatingCharacteristic | None = None, ) -> None: """ Args: detection_probabilities (numpy.ndarray): Detection probabilities for each grid point. false_alarm_probabilities (numpy.ndarray): False alarm probabilities for each grid point. grid (Sequence[GridDimension]): Grid dimensions of the evaluation result. evaluator (ReceiverOperatingCharacteristic, optional): Evaluator that generated the evaluation result. """ # Initialize base class EvaluationResult.__init__(self, grid, evaluator) # Initialize class attributes self.__detection_probabilities = detection_probabilities self.__false_alarm_probabilities = false_alarm_probabilities def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: ax: plt.Axes = axes.flat[0] # Configure axes labels ax.set_xlabel("False Alarm Probability") ax.set_ylabel("Detection Probability") # Configure axes limits ax.set_xlim(0.0, 1.1) ax.set_ylim(0.0, 1.1) line_list = [] section_magnitudes = tuple(s.num_sample_points for s in self.grid) for section_indices in np.ndindex(section_magnitudes): # Generate the graph line label line_label = "" for i, v in enumerate(section_indices): line_label += f"{self.grid[i].title} = {self.grid[i].sample_points[v].title}, " line_label = line_label[:-2] line_list.extend(ax.plot([], [], label=line_label)) # Only plot the legend for an existing sweep grid. if len(self.grid) > 0: with Executable.style_context(): ax.legend() lines = np.empty_like(axes, dtype=np.object_) lines[0, 0] = line_list return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: section_magnitudes = tuple(s.num_sample_points for s in self.grid) for section_indices, line in zip(np.ndindex(section_magnitudes), visualization.lines[0, 0]): # Select the graph line scalars x_axis = self.__false_alarm_probabilities[section_indices] y_axis = self.__detection_probabilities[section_indices] # Update the respective line line.set_data(x_axis, y_axis)
[docs] def to_array(self) -> np.ndarray: return np.stack((self.__detection_probabilities, self.__false_alarm_probabilities), axis=-1)
[docs] class ReceiverOperatingCharacteristic(RadarEvaluator, Serializable): """Evaluate the receiver operating characteristics for a radar operator. The receiver operating characteristics (ROC) curve is a graphical plot that illustrates the performance of a detector by visualizing the probability of false alarm versus the probability of detection for a given parameterization. A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>` evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be: .. literalinclude:: ../scripts/examples/radar_evaluators_ReceiverOperatingCharacteristic.py :language: python :linenos: :lines: 03-23 """ yaml_tag = "ROC" _title = "Receiver Operating Characteristics" __num_thresholds: int __output_hook: Hook[SimulatedDeviceOutput] __input_hook: Hook[ProcessedSimulatedDeviceInput] __output: SimulatedDeviceOutput | None __input: ProcessedSimulatedDeviceInput | None def __init__( self, receiving_radar: Radar, transmitting_device: SimulatedDevice, receiving_device: SimulatedDevice, radar_channel: RadarChannelBase, num_thresholds=101, ) -> None: """ Args: radar (Radar): Radar under test. radar_channel (RadarChannelBase): Radar channel containing a desired target. num_thresholds (int, optional) Number of different thresholds to be considered in ROC curve """ # Initialize base class RadarEvaluator.__init__( self, receiving_radar, transmitting_device, receiving_device, radar_channel ) # Initialize class attributes self.__num_thresholds = num_thresholds self.__input_hook = receiving_device.simulated_input_callbacks.add_callback( self.__input_callback ) self.__output_hook = transmitting_device.simulated_output_callbacks.add_callback( self.__output_callback ) self.__input = None self.__output = None def __input_callback(self, input: ProcessedSimulatedDeviceInput) -> None: """Callback notificying the evaluator of newly generated device inputs. Args: input (ProcessedSimulatedDeviceInput): Newly generated device input. """ self.__input = input def __output_callback(self, output: SimulatedDeviceOutput) -> None: """Callback notificying the evaluator of newly generated device outputs. Args: output (SimulatedDeviceOutput): Newly generated device output. """ self.__output = output @staticmethod def __evaluation_from_receptions( h0_reception: RadarReception, h1_reception: RadarReception ) -> RocEvaluation: """Subroutine to generate an evaluation given two hypothesis radar receptions. Args: h0_reception (RadarReception): Reception missing the target of interest. h1_reception (RadarReception): Reception containing the target of interest. Returns: An initialized :class:`RocEvaluation`. """ # Retrieve radar cubes for both hypothesis radar_cube_h0 = h0_reception.cube radar_cube_h1 = h1_reception.cube # Return resulting evaluation return RocEvaluation(radar_cube_h0, radar_cube_h1)
[docs] def evaluate(self) -> RocEvaluation: reception = self._fetch_reception() channel_sample = self._fetch_channel() if self.__output is None: raise RuntimeError("No device output available") if self.__input is None: raise RuntimeError("No device input available") # Collect required information from the simulation one_hypothesis_sample = channel_sample device_index = self.radar_channel.scenario.device_index(self.transmitting_device) operator_index = self.receiving_device.receivers.operator_index(self.receiving_radar) # Generate the null hypothesis detection radar cube by re-running the radar detection routine null_hypothesis_sample = one_hypothesis_sample.null_hypothesis() # Propagate again over the radar channel null_hypothesis_propagation = null_hypothesis_sample.propagate(self.__output) # Exchange the respective propagated signal impinging_signals = list(self.__input.impinging_signals).copy() impinging_signals[device_index] = null_hypothesis_propagation # Receive again devic_state = self.receiving_device.state(channel_sample.time) null_hypothesis_device_reception = self.receiving_device.process_from_realization( impinging_signals, self.__input, self.__output.trigger_realization, self.__input.leaking_signal, devic_state, ) null_hypothesis_radar_reception = self.receiving_radar.receive( null_hypothesis_device_reception.operator_inputs[operator_index], devic_state, False ) # Generate evaluation return self.__evaluation_from_receptions(null_hypothesis_radar_reception, reception)
@property def abbreviation(self) -> str: return "ROC" # pragma: no cover @property def title(self) -> str: return ReceiverOperatingCharacteristic._title # pragma: no cover
[docs] @staticmethod def GenerateResult( grid: Sequence[GridDimension], artifacts: np.ndarray, num_thresholds: int = 101, evaluator: ReceiverOperatingCharacteristic | None = None, ) -> RocEvaluationResult: """Generate a new receiver operating characteristics evaluation result. Args: grid (Sequence[GridDimension]): Grid dimensions of the evaluation result. artifacts (numpy.ndarray): Artifacts of the evaluation result. num_thresholds (int, optional): Number of different thresholds to be considered in ROC curve 101 by default. evaluator (ReceiverOperatingCharacteristic, optional): Evaluator that generated the evaluation result. Returns: The generated result. """ # Prepare result containers if len(grid) > 0: dimensions = tuple(g.num_sample_points for g in grid) else: dimensions = (1,) artifacts = artifacts.reshape(dimensions) detection_probabilities = np.empty((*dimensions, num_thresholds), dtype=float) false_alarm_probabilities = np.empty((*dimensions, num_thresholds), dtype=float) # Convert artifacts to raw data array for grid_coordinates in np.ndindex(dimensions): artifact_line = artifacts[grid_coordinates] roc_data = np.array([[a.h0_value, a.h1_value] for a in artifact_line]) for t, threshold in enumerate( np.linspace(roc_data.min(), roc_data.max(), num_thresholds, endpoint=True) ): threshold_coordinates = grid_coordinates + (t,) detection_probabilities[threshold_coordinates] = np.mean( roc_data[:, 1] >= threshold ) false_alarm_probabilities[threshold_coordinates] = np.mean( roc_data[:, 0] >= threshold ) return RocEvaluationResult( detection_probabilities, false_alarm_probabilities, grid, evaluator )
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> RocEvaluationResult: """Generate a new receiver operating characteristics evaluation result. Args: grid (Sequence[GridDimension]): Grid dimensions of the evaluation result. artifacts (numpy.ndarray): Artifacts of the evaluation result. Returns: The generated result. """ return self.GenerateResult(grid, artifacts, self.__num_thresholds, self)
[docs] @staticmethod def FromScenarios( h0_scenario: Scenario, h1_scenario: Scenario, h0_operator: Radar | None = None, h1_operator: Radar | None = None, num_thresholds: int = 101, ) -> RocEvaluationResult: """Compute an ROC evaluation result from two scenarios. Args: h0_scenario (Scenario): Scenario of the null hypothesis. h1_scenario (Scenario): Scenario of the alternative hypothesis. h0_operator (Radar, optional): Radar operator of the null hypothesis. If not provided, the first radar operator of the null hypothesis scenario will be used. h1_operator (Radar, optional): Radar operator of the alternative hypothesis. If not provided, the first radar operator of the alternative hypothesis scenario will be used. num_thresholds (int, optional): Number of different thresholds to be considered in ROC curve Returns: The ROC evaluation result. Raises: ValueError: - If the scenarios are not in replay mode - If the scenarios have no recorded drops available - If the operators are not registered within the scenarios - If, for any reason, the operated devices cannot be determined """ # Assert that both scenarios are in replay mode if h0_scenario.mode != ScenarioMode.REPLAY: raise ValueError("Null hypothesis scenario is not in replay mode") if h1_scenario.mode != ScenarioMode.REPLAY: raise ValueError("One hypothesis scenario is not in replay mode") # Assert that both scenarios have at least a single drop recorded if h0_scenario.num_drops < 1: raise ValueError("Null hypothesis scenario has no recorded drops") if h1_scenario.num_drops < 1: raise ValueError("One hypothesis scenario has no recorded drops") # Select operators if none were provided if h0_operator: if h0_operator not in h0_scenario.operators: raise ValueError( "Null hypthesis radar not an operator within the null hypothesis scenario" ) else: if h0_scenario.num_operators < 1: raise ValueError("Null hypothesis radar has no registered operators") inferred_h0_operator = None for operator in h0_scenario.operators: if isinstance(operator, Radar): inferred_h0_operator = operator break if inferred_h0_operator is None: raise ValueError("Could not infer radar operator from list of operators") h0_operator = inferred_h0_operator if h1_operator: if h1_operator not in h1_scenario.operators: raise ValueError( "One hypthesis radar not an operator within the null hypothesis scenario" ) else: if h1_scenario.num_operators < 1: raise ValueError("One hypothesis radar has no registered operators") inferred_h1_operator = None for operator in h1_scenario.operators: if isinstance(operator, Radar): inferred_h1_operator = operator break if inferred_h1_operator is None: raise ValueError("Could not infer radar operator from list of operators") h1_operator = inferred_h1_operator # Find the indices of the operators within the scenarios h0_device_index = -1 h1_device_index = -1 h0_operator_index = -1 h1_operator_index = -1 for d, device in enumerate(h0_scenario.devices): if h0_operator in device.receivers: h0_device_index = d h0_operator_index = device.receivers.index(h0_operator) for d, device in enumerate(h1_scenario.devices): if h1_operator in device.receivers: h1_device_index = d h1_operator_index = device.receivers.index(h1_operator) if ( h0_device_index < 0 or h1_device_index < 0 or h0_operator_index < 0 or h1_operator_index < 0 ): raise ValueError( "Could not detect devices and operators within the provided scenarios." ) # The overall number of considered drops is bounded by the H1 hypothesis num_drops = h1_scenario.num_drops artifacts = np.empty(1, dtype=object) artifacts[0] = [] # Collect artifacts for _ in range(num_drops): h0_drop = h0_scenario.drop() h1_drop = h1_scenario.drop() h0_reception = h0_drop.device_receptions[h0_device_index].operator_receptions[ h0_operator_index ] h1_reception = h1_drop.device_receptions[h1_device_index].operator_receptions[ h1_operator_index ] evaluation = ReceiverOperatingCharacteristic.__evaluation_from_receptions( h0_reception, h1_reception ) artifacts[0].append(evaluation.artifact()) # Generate results grid: Sequence[GridDimension] = [] result = ReceiverOperatingCharacteristic.GenerateResult(grid, artifacts, num_thresholds) return result
[docs] @staticmethod def FromScenario( scenario: Scenario, operator: Radar, h0_campaign: str = "h0", h1_campaign: str = "h1", console: Console | None = None, ) -> RocEvaluationResult: _console = get_console() if console is None else console # Check if the scenario is in replay mode if scenario.mode != ScenarioMode.REPLAY: raise ValueError("Scenario is not in replay mode") null_receptions: list[RadarReception] = [] alt_receptions: list[RadarReception] = [] # Find the operator index within the scenario device_index = -1 operator_index = -1 for d, device in enumerate(scenario.devices): if operator in device.receivers: operator_index = device.receivers.index(operator) device_index = d if device_index < 0 or operator_index < 0: raise ValueError("Could not detect device and operator within the provided scenario.") # Collect receptions for both hypothesis for campaign, hypothesis, receptions in zip( [h0_campaign, h1_campaign], ["null hypothesis", "alt hypothesis"], [null_receptions, alt_receptions], ): scenario.replay(campaign=campaign) for _ in track( range(scenario.num_drops), description="Replaying " + hypothesis, console=_console ): drop = scenario.drop() receptions.append( drop.device_receptions[device_index].operator_receptions[operator_index] ) # Generate the evaluation result grid: Sequence[GridDimension] = [] artifacts = np.empty(1, dtype=object) artifacts[0] = [ ReceiverOperatingCharacteristic.__evaluation_from_receptions(h0, h1).artifact() for h0, h1 in zip(null_receptions, alt_receptions) ] return ReceiverOperatingCharacteristic.GenerateResult(grid, artifacts)
[docs] @classmethod def From_HDF( cls: Type[ReceiverOperatingCharacteristic], file: Union[str, File], h0_campaign="h0_measurements", h1_campaign="h1_measurements", num_thresholds: int = 101, ) -> RocEvaluationResult: """Compute an ROC evaluation result from a savefile. Args: file (Union[str, File]): Savefile containing the measurements. Either as file system location or h5py `File` handle. h0_campaign (str, optional): Campaign identifier of the h0 hypothesis measurements. By default, `h0_measurements` is assumed. h1_campaign (str, optional): Campaign identifier of the h1 hypothesis measurements. By default, `h1_measurements` is assumed. num_thresholds (int, optional): Number of different thresholds to be considered in ROC curve Returns: The ROC evaluation result. """ # Load scenarios with the respective campaigns from the specified savefile h0_scenario = ReplayScenario.Replay(file, h0_campaign) h1_scenario = ReplayScenario.Replay(file, h1_campaign) # Resort to the from scenarios routine for computing the evaluation result result = cls.FromScenarios(h0_scenario, h1_scenario, num_thresholds=num_thresholds) # Close the scenarios properly h0_scenario.stop() h1_scenario.stop() return result
def __del__(self) -> None: self.__input_hook.remove() self.__output_hook.remove()
[docs] class RootMeanSquareArtifact(Artifact): """Artifact of a root mean square evaluation""" __num_errors: int __cummulation: float def __init__(self, num_errors: int, cummulation: float) -> None: """ Args: num_errors (int): Number of errros. cummulation (float): Sum of squared errors distances. """ self.__num_errors = num_errors self.__cummulation = cummulation
[docs] def to_scalar(self) -> float: return np.sqrt(self.cummulation / self.num_errors)
def __str__(self) -> str: return f"{self.to_scalar():4.0f}" @property def num_errors(self) -> int: """Number of cummulated errors""" return self.__num_errors @property def cummulation(self) -> float: """Cummulated squared error""" return self.__cummulation
[docs] class RootMeanSquareEvaluation(Evaluation[ScatterVisualization]): """Result of a single root mean squre evaluation.""" __pcl: RadarPointCloud __ground_truth: np.ndarray def __init__(self, pcl: RadarPointCloud, ground_truth: np.ndarray) -> None: self.__pcl = pcl self.__ground_truth = ground_truth
[docs] def artifact(self) -> RootMeanSquareArtifact: num_errors = self.__pcl.num_points * self.__ground_truth.shape[0] cummulative_square_error = 0.0 for point, truth in product(self.__pcl.points, self.__ground_truth): cummulative_square_error += float(np.linalg.norm(point.position - truth)) ** 2 return RootMeanSquareArtifact(num_errors, cummulative_square_error)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> ScatterVisualization: return self.__pcl._prepare_visualization(figure, axes, **kwargs) def _update_visualization(self, visualization: ScatterVisualization, **kwargs) -> None: self.__pcl._update_visualization(visualization, **kwargs)
[docs] class RootMeanSquareErrorResult(ScalarEvaluationResult): """Result of a root mean square error evaluation.""" ... # pragma: no cover
[docs] class RootMeanSquareError(RadarEvaluator): """Root mean square error of estimated points to ground truth. Root mean square error (RMSE) is a widely used metric for evaluating the performance of a radar detector. It estimates the average distance between the estimated and the ground truth position of a target. A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>` evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be: .. literalinclude:: ../scripts/examples/radar_evaluators_RootMeanSquareError.py :language: python :linenos: :lines: 03-29 """
[docs] def evaluate(self) -> Evaluation: point_cloud = self._fetch_pcl() channel_sample = self._fetch_channel() # Consolide the ground truth ground_truth = np.array( [p.ground_truth[0] for p in channel_sample.paths if p.ground_truth is not None] ) return RootMeanSquareEvaluation(point_cloud, ground_truth)
@property def title(self) -> str: return "Root Mean Square Error" @property def abbreviation(self) -> str: return "RMSE"
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> RootMeanSquareErrorResult: rmse_scalar_results = np.empty(artifacts.shape, dtype=float) for coordinates, section_artifacts in np.ndenumerate(artifacts): cummulative_errors = 0.0 error_count = 0 artifact: RootMeanSquareArtifact for artifact in section_artifacts: cummulative_errors += artifact.cummulation error_count += artifact.num_errors rmse = np.sqrt(cummulative_errors / error_count) rmse_scalar_results[coordinates] = rmse return RootMeanSquareErrorResult(grid, rmse_scalar_results, self)