# -*- coding: utf-8 -*-
"""
================
Radar Evaluation
================
This module introduces several evaluators for performance indicators in radar detection.
Refer to the :doc:`PyMonte</api/core.monte_carlo>` documentation for a detailed introduction to the concept of
:class:`Evaluators<hermespy.core.monte_carlo.Evaluator>`.
.. autoclasstree:: hermespy.modem.evaluators
:alt: Communication Evaluator Class Tree
:strict:
:namespace: hermespy
The implemented :class:`RadarEvaluator<.RadarEvaluator>` all inherit from the identically named common
base which gets initialized by selecting one :class:`Modem<hermespy.modem.modem.Modem>`, whose performance will be
evaluated and one :class:`RadarChannel<hermespy.channel.radar_channel.RadarChannel>` instance, containing the ground
truth.
The currently considered performance indicators are
========================================= ================================ ============================================================
Evaluator Artifact Performance Indicator
========================================= ================================ ============================================================
:class:`.DetectionProbEvaluator` :class:`.DetectionProbArtifact` Probability of detecting the target at the right bin
:class:`.ReceiverOperatingCharacteristic` :class:`.RocArtifact` Probability of detection versus probability of false alarm
:class`.RootMeanSquareError` :class:`.RootMeanSquareArtifact` Root mean square error of point detections
========================================= ================================ ============================================================
Configuring :class:`RadarEvaluators<.RadarEvaluator>` to evaluate the radar detection of
:class:`Modem<hermespy.modem.modem.Modem>` instances is rather straightforward:
.. code-block:: python
# Create two separate modem instances
modem = Modem()
channel = RadarChannel()
# Create a radar evaluation as an evaluation example
radar_evaluator = DetectionProbEvaluator(modem, channel)
# Extract evaluation
radar_evaluation = radar_evaluator.evaluate()
# Visualize evaluation
radar_evaluation.plot()
"""
from __future__ import annotations
from abc import ABC
from collections.abc import Sequence
from itertools import product
from typing import Type, Union
import matplotlib.pyplot as plt
import numpy as np
from h5py import File
from rich import get_console
from rich.console import Console
from rich.progress import track
from scipy.stats import uniform
from hermespy.core import (
Executable,
Hook,
PlotVisualization,
ReplayScenario,
ScatterVisualization,
Scenario,
ScenarioMode,
Serializable,
VAT,
)
from hermespy.core.monte_carlo import (
Evaluator,
Evaluation,
EvaluationResult,
EvaluationTemplate,
GridDimension,
ArtifactTemplate,
Artifact,
ScalarEvaluationResult,
)
from hermespy.radar import Radar, RadarReception
from hermespy.channel.radar.radar import RadarChannelBase, RadarChannelSample
from hermespy.radar.cube import RadarCube
from hermespy.radar.detection import RadarPointCloud
from hermespy.simulation import (
SimulatedDevice,
ProcessedSimulatedDeviceInput,
SimulatedDeviceOutput,
)
__author__ = "André Noll Barreto"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "André Noll Barreto"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class RadarEvaluator(Evaluator, ABC):
"""Bastract base class for evaluating sensing performance.
Inherits from the abstract :class:`Evaluator<hermespy.core.monte_carlo.Evaluator>` base class.
Expects the abstract method :meth:`evaluate` as well as the abstract properties
:meth:`abbreviation<abbreviation>` and :meth:`title<title>` to be implemented.
There are currently three different :class:`RadarEvaluators<.RadarEvaluator>` implemented:
.. toctree::
radar.evaluators.DetectionProbEvaluator
radar.evaluators.ReceiverOperatingCharacteristic
radar.evaluators.RootMeanSquareError
"""
__receiving_radar: Radar # Handle to the radar receiver
__receiving_device: SimulatedDevice
__transmitting_device: SimulatedDevice
__radar_channel: RadarChannelBase
__reception: RadarReception | None
__receive_hook: Hook[RadarReception]
__channel_sample: RadarChannelSample | None
def __init__(
self,
receiving_radar: Radar,
transmitting_device: SimulatedDevice,
receiving_device: SimulatedDevice,
radar_channel: RadarChannelBase,
) -> None:
"""
Args:
receiving_radar (Radar):
Radar detector to be evaluated.
transmitting_device (SimulatedDevice):
Device transmitting into the evaluated channel.
receiving_device (SimulatedDevice):
Device receiving from the evaluated channel.
The `receiving_radar` must be a receive DSP algorithm of this device.
radar_channel (RadarChannelBase):
The radar channel containing the ground truth to be evaluated.
Raises:
ValueError: If the receiving radar is not an operator of the radar_channel receiver.
"""
# Initialize base class
Evaluator.__init__(self)
# Check if the receiving radar is a dsp algorithm of the receiving device
if receiving_radar not in receiving_device.receivers:
raise ValueError(
"Receiving radar not registered as DSP algorithm of the receiving device"
)
# Initialize class attributes
self.__transmitting_device = transmitting_device
self.__receiving_device = receiving_device
self.__receiving_radar = receiving_radar
self.__radar_channel = radar_channel
# Register receive callback
self.__reception = None
self.__receive_hook = receiving_radar.add_receive_callback(self.__receive_callback)
# Register sample callback
self.__channel_sample = None
radar_channel.add_sample_hook(
self.__sample_callback, self.__transmitting_device, self.__receiving_device
)
def __receive_callback(self, reception: RadarReception) -> None:
"""Callback for receiving radar receptions.
Args:
reception (RadarReception):
Received radar reception.
"""
self.__reception = reception
def __sample_callback(self, sample: RadarChannelSample) -> None:
"""Callback for sampling radar channel realizations.
Args:
sample (RadarChannelSample):
Sampled radar channel realization.
"""
self.__channel_sample = sample
def _fetch_reception(self) -> RadarReception:
"""Fetch the most recent radar reception.
Returns: The most recent radar reception.
Raises:
RuntimeError: If no reception is available.
"""
if self.__reception is None:
raise RuntimeError(
"No reception available. Has the radar's receive method been called?"
)
return self.__reception
def _fetch_pcl(self) -> RadarPointCloud:
"""Fetch the most recent radar point cloud.
Returns: The most recent radar point cloud.
Raises:
RunmtineError: If no reception is available or the reception does not contain a point cloud.
"""
reception = self._fetch_reception()
if reception.cloud is None:
raise RuntimeError(
"No point cloud available. Has the radar's receive method been called?"
)
return reception.cloud
def _fetch_channel(self) -> RadarChannelSample:
"""Fetch the most recent radar channel sample.
Returns: The most recent radar channel sample.
Raises:
RuntimeError: If no channel sample is available.
"""
if self.__channel_sample is None:
raise RuntimeError("No channel sample available. Has the radar channel been sampled?")
return self.__channel_sample
@property
def receiving_device(self) -> SimulatedDevice:
"""Device receiving from the evaluated channel."""
return self.__receiving_device
@property
def transmitting_device(self) -> SimulatedDevice:
"""Device transmitting into the evaluated channel."""
return self.__transmitting_device
@property
def receiving_radar(self) -> Radar:
"""Radar detector, the output of which is to be evaluated."""
return self.__receiving_radar
@property
def radar_channel(self) -> RadarChannelBase:
"""The considered radar channel."""
return self.__radar_channel
[docs]
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> EvaluationResult:
return ScalarEvaluationResult(grid, artifacts, self)
def __del__(self) -> None:
self.__receive_hook.remove()
[docs]
class DetectionProbArtifact(ArtifactTemplate[bool]):
"""Artifacto of the probability of detection for a radar detector.
Represents a boolean indicator of whether a target was detected or not.
Generated by the :class:`DetectionProbabilityEvaluation<.DetectionProbabilityEvaluation>`'s :meth:`artifact()<DetectionProbabilityEvaluation.artifact>` method.
"""
... # pragma: no cover
[docs]
class DetectionProbabilityEvaluation(EvaluationTemplate[bool, ScatterVisualization]):
"""Evaluation of the probability of detection for a radar detector.
Represents a boolean indicator of whether a target was detected or not.
Generated by the :class:`DetectionProbEvaluator<.DetectionProbEvaluator>`'s :meth:`evaluate()<DetectionProbEvaluator.evaluate>` method.
"""
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> ScatterVisualization: # pragma: no cover
raise NotImplementedError("Detection probability evaluation does not support visualization")
def _update_visualization(
self, visualization: ScatterVisualization, **kwargs
) -> None: # pragma: no cover
# ToDo: Implement updating the single-item scatter plot
raise NotImplementedError("Detection probability evaluation does not support visualization")
[docs]
def artifact(self) -> DetectionProbArtifact:
return DetectionProbArtifact(self.evaluation)
[docs]
class DetectionProbEvaluator(Evaluator, Serializable):
"""Estimates the probability of detection for a given radar detector.
Assumes a successful detection if the :class:`Radar's<hermespy.radar.radar.Radar>` :meth:`reception<hermespy.radar.radar.Radar.reception>` contains a non-empty point cloud.
This is the case if the configured :class:`RadarDetector<hermespy.radar.detection.RadarDetector>` made a positive decision
for any bin within the processed :class:`RadarCube<hermespy.radar.cube.RadarCube>`.
A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>`
evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be:
.. literalinclude:: ../scripts/examples/radar_evaluators_DetectionProbEvaluator.py
:language: python
:linenos:
:lines: 03-27
"""
yaml_tag = "DetectionProbEvaluator"
__radar: Radar
__cloud: RadarPointCloud | None
__hook: Hook[RadarReception]
def __init__(self, radar: Radar) -> None:
"""
Args:
radar (Radar):
Radar detector to be evaluated.
"""
# Initialize base class
Evaluator.__init__(self)
self.plot_scale = "log" # Plot logarithmically by default
# Initialize class attributes
self.__cloud = None
self.__radar = radar
self.__hook = radar.add_receive_callback(self.__receive_callback)
def __receive_callback(self, reception: RadarReception) -> None:
"""Callback for receiving radar receptions.
Args:
reception (RadarReception):
Received radar reception.
"""
self.__cloud = reception.cloud
@property
def radar(self) -> Radar:
"""Radar detector to be evaluated."""
return self.__radar
@property
def abbreviation(self) -> str:
return "PD"
@property
def title(self) -> str:
return "Probability of Detection Evaluation"
@staticmethod
def _scalar_cdf(scalar: float) -> float:
return uniform.cdf(scalar)
[docs]
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> ScalarEvaluationResult:
return ScalarEvaluationResult.From_Artifacts(grid, artifacts, self)
[docs]
def evaluate(self) -> DetectionProbabilityEvaluation:
if self.__cloud is None:
raise RuntimeError(
"Detection evaluation requires a detector to be configured at the radar"
)
# Verify if a target is detected in any bin
detection = self.__cloud.num_points > 0
return DetectionProbabilityEvaluation(detection)
def __del__(self) -> None:
self.__hook.remove()
[docs]
class RocArtifact(Artifact):
"""Artifact of receiver operating characteristics (ROC) evaluation"""
__h0_value: float
__h1_value: float
def __init__(self, h0_value: float, h1_value: float) -> None:
"""
Args:
h0_value (float):
Measured value for null-hypothesis (H0), i.e., noise only
h1_value (float):
Measured value for alternative hypothesis (H1)
"""
self.__h0_value = h0_value
self.__h1_value = h1_value
def __str__(self) -> str:
return f"({self.__h0_value:.4}, {self.__h1_value:.4})"
@property
def h0_value(self) -> float:
return self.__h0_value
@property
def h1_value(self) -> float:
return self.__h1_value
[docs]
def to_scalar(self) -> None:
return None
[docs]
class RocEvaluation(Evaluation[PlotVisualization]):
"""Evaluation of receiver operating characteristics (ROC)"""
__cube_h0: RadarCube
__cube_h1: RadarCube
def __init__(self, cube_h0: RadarCube, cube_h1: RadarCube) -> None:
"""
Args:
cube_h0 (RadarCube): H0 hypothesis radar cube.
cube_h1 (RadarCube): H1 hypothesis radar cube.
"""
# Initialize base class
super().__init__()
# Initialize class attributes
self.__cube_h0 = cube_h0
self.__cube_h1 = cube_h1
@property
def cube_h0(self) -> RadarCube:
"""H0 hypothesis radar cube."""
return self.__cube_h0
@property
def cube_h1(self) -> RadarCube:
"""H1 hypothesis radar cube."""
return self.__cube_h1
[docs]
def artifact(self) -> RocArtifact:
h0_value = self.__cube_h0.data.max()
h1_value = self.__cube_h1.data.max()
return RocArtifact(h0_value, h1_value)
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> PlotVisualization:
lines = np.empty_like(axes, dtype=np.object_)
h0_lines = axes[0, 0].plot(
self.__cube_h0.range_bins, np.zeros_like(self.__cube_h0.range_bins)
)
h1_lines = axes[0, 0].plot(
self.__cube_h1.range_bins, np.zeros_like(self.__cube_h1.range_bins)
)
lines[0, 0] = h0_lines + h1_lines
return PlotVisualization(figure, axes, lines)
def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None:
_lines = visualization.lines[0, 0]
_lines[0].set_ydata(self.__cube_h0.data)
_lines[1].set_ydata(self.__cube_h1.data)
[docs]
class RocEvaluationResult(EvaluationResult):
"""Final result of an receive operating characteristcs evaluation."""
__detection_probabilities: np.ndarray
__false_alarm_probabilities: np.ndarray
def __init__(
self,
detection_probabilities: np.ndarray,
false_alarm_probabilities: np.ndarray,
grid: Sequence[GridDimension],
evaluator: ReceiverOperatingCharacteristic | None = None,
) -> None:
"""
Args:
detection_probabilities (numpy.ndarray):
Detection probabilities for each grid point.
false_alarm_probabilities (numpy.ndarray):
False alarm probabilities for each grid point.
grid (Sequence[GridDimension]):
Grid dimensions of the evaluation result.
evaluator (ReceiverOperatingCharacteristic, optional):
Evaluator that generated the evaluation result.
"""
# Initialize base class
EvaluationResult.__init__(self, grid, evaluator)
# Initialize class attributes
self.__detection_probabilities = detection_probabilities
self.__false_alarm_probabilities = false_alarm_probabilities
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> PlotVisualization:
ax: plt.Axes = axes.flat[0]
# Configure axes labels
ax.set_xlabel("False Alarm Probability")
ax.set_ylabel("Detection Probability")
# Configure axes limits
ax.set_xlim(0.0, 1.1)
ax.set_ylim(0.0, 1.1)
line_list = []
section_magnitudes = tuple(s.num_sample_points for s in self.grid)
for section_indices in np.ndindex(section_magnitudes):
# Generate the graph line label
line_label = ""
for i, v in enumerate(section_indices):
line_label += f"{self.grid[i].title} = {self.grid[i].sample_points[v].title}, "
line_label = line_label[:-2]
line_list.extend(ax.plot([], [], label=line_label))
# Only plot the legend for an existing sweep grid.
if len(self.grid) > 0:
with Executable.style_context():
ax.legend()
lines = np.empty_like(axes, dtype=np.object_)
lines[0, 0] = line_list
return PlotVisualization(figure, axes, lines)
def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None:
section_magnitudes = tuple(s.num_sample_points for s in self.grid)
for section_indices, line in zip(np.ndindex(section_magnitudes), visualization.lines[0, 0]):
# Select the graph line scalars
x_axis = self.__false_alarm_probabilities[section_indices]
y_axis = self.__detection_probabilities[section_indices]
# Update the respective line
line.set_data(x_axis, y_axis)
[docs]
def to_array(self) -> np.ndarray:
return np.stack((self.__detection_probabilities, self.__false_alarm_probabilities), axis=-1)
[docs]
class ReceiverOperatingCharacteristic(RadarEvaluator, Serializable):
"""Evaluate the receiver operating characteristics for a radar operator.
The receiver operating characteristics (ROC) curve is a graphical plot that illustrates the performance of a detector
by visualizing the probability of false alarm versus the probability of detection for a given parameterization.
A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>`
evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be:
.. literalinclude:: ../scripts/examples/radar_evaluators_ReceiverOperatingCharacteristic.py
:language: python
:linenos:
:lines: 03-23
"""
yaml_tag = "ROC"
_title = "Receiver Operating Characteristics"
__num_thresholds: int
__output_hook: Hook[SimulatedDeviceOutput]
__input_hook: Hook[ProcessedSimulatedDeviceInput]
__output: SimulatedDeviceOutput | None
__input: ProcessedSimulatedDeviceInput | None
def __init__(
self,
receiving_radar: Radar,
transmitting_device: SimulatedDevice,
receiving_device: SimulatedDevice,
radar_channel: RadarChannelBase,
num_thresholds=101,
) -> None:
"""
Args:
radar (Radar):
Radar under test.
radar_channel (RadarChannelBase):
Radar channel containing a desired target.
num_thresholds (int, optional)
Number of different thresholds to be considered in ROC curve
"""
# Initialize base class
RadarEvaluator.__init__(
self, receiving_radar, transmitting_device, receiving_device, radar_channel
)
# Initialize class attributes
self.__num_thresholds = num_thresholds
self.__input_hook = receiving_device.simulated_input_callbacks.add_callback(
self.__input_callback
)
self.__output_hook = transmitting_device.simulated_output_callbacks.add_callback(
self.__output_callback
)
self.__input = None
self.__output = None
def __input_callback(self, input: ProcessedSimulatedDeviceInput) -> None:
"""Callback notificying the evaluator of newly generated device inputs.
Args:
input (ProcessedSimulatedDeviceInput):
Newly generated device input.
"""
self.__input = input
def __output_callback(self, output: SimulatedDeviceOutput) -> None:
"""Callback notificying the evaluator of newly generated device outputs.
Args:
output (SimulatedDeviceOutput):
Newly generated device output.
"""
self.__output = output
@staticmethod
def __evaluation_from_receptions(
h0_reception: RadarReception, h1_reception: RadarReception
) -> RocEvaluation:
"""Subroutine to generate an evaluation given two hypothesis radar receptions.
Args:
h0_reception (RadarReception):
Reception missing the target of interest.
h1_reception (RadarReception):
Reception containing the target of interest.
Returns: An initialized :class:`RocEvaluation`.
"""
# Retrieve radar cubes for both hypothesis
radar_cube_h0 = h0_reception.cube
radar_cube_h1 = h1_reception.cube
# Return resulting evaluation
return RocEvaluation(radar_cube_h0, radar_cube_h1)
[docs]
def evaluate(self) -> RocEvaluation:
reception = self._fetch_reception()
channel_sample = self._fetch_channel()
if self.__output is None:
raise RuntimeError("No device output available")
if self.__input is None:
raise RuntimeError("No device input available")
# Collect required information from the simulation
one_hypothesis_sample = channel_sample
device_index = self.radar_channel.scenario.device_index(self.transmitting_device)
operator_index = self.receiving_device.receivers.operator_index(self.receiving_radar)
# Generate the null hypothesis detection radar cube by re-running the radar detection routine
null_hypothesis_sample = one_hypothesis_sample.null_hypothesis()
# Propagate again over the radar channel
null_hypothesis_propagation = null_hypothesis_sample.propagate(self.__output)
# Exchange the respective propagated signal
impinging_signals = list(self.__input.impinging_signals).copy()
impinging_signals[device_index] = null_hypothesis_propagation
# Receive again
devic_state = self.receiving_device.state(channel_sample.time)
null_hypothesis_device_reception = self.receiving_device.process_from_realization(
impinging_signals,
self.__input,
self.__output.trigger_realization,
self.__input.leaking_signal,
devic_state,
)
null_hypothesis_radar_reception = self.receiving_radar.receive(
null_hypothesis_device_reception.operator_inputs[operator_index], devic_state, False
)
# Generate evaluation
return self.__evaluation_from_receptions(null_hypothesis_radar_reception, reception)
@property
def abbreviation(self) -> str:
return "ROC" # pragma: no cover
@property
def title(self) -> str:
return ReceiverOperatingCharacteristic._title # pragma: no cover
[docs]
@staticmethod
def GenerateResult(
grid: Sequence[GridDimension],
artifacts: np.ndarray,
num_thresholds: int = 101,
evaluator: ReceiverOperatingCharacteristic | None = None,
) -> RocEvaluationResult:
"""Generate a new receiver operating characteristics evaluation result.
Args:
grid (Sequence[GridDimension]):
Grid dimensions of the evaluation result.
artifacts (numpy.ndarray):
Artifacts of the evaluation result.
num_thresholds (int, optional):
Number of different thresholds to be considered in ROC curve
101 by default.
evaluator (ReceiverOperatingCharacteristic, optional):
Evaluator that generated the evaluation result.
Returns: The generated result.
"""
# Prepare result containers
if len(grid) > 0:
dimensions = tuple(g.num_sample_points for g in grid)
else:
dimensions = (1,)
artifacts = artifacts.reshape(dimensions)
detection_probabilities = np.empty((*dimensions, num_thresholds), dtype=float)
false_alarm_probabilities = np.empty((*dimensions, num_thresholds), dtype=float)
# Convert artifacts to raw data array
for grid_coordinates in np.ndindex(dimensions):
artifact_line = artifacts[grid_coordinates]
roc_data = np.array([[a.h0_value, a.h1_value] for a in artifact_line])
for t, threshold in enumerate(
np.linspace(roc_data.min(), roc_data.max(), num_thresholds, endpoint=True)
):
threshold_coordinates = grid_coordinates + (t,)
detection_probabilities[threshold_coordinates] = np.mean(
roc_data[:, 1] >= threshold
)
false_alarm_probabilities[threshold_coordinates] = np.mean(
roc_data[:, 0] >= threshold
)
return RocEvaluationResult(
detection_probabilities, false_alarm_probabilities, grid, evaluator
)
[docs]
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> RocEvaluationResult:
"""Generate a new receiver operating characteristics evaluation result.
Args:
grid (Sequence[GridDimension]):
Grid dimensions of the evaluation result.
artifacts (numpy.ndarray):
Artifacts of the evaluation result.
Returns: The generated result.
"""
return self.GenerateResult(grid, artifacts, self.__num_thresholds, self)
[docs]
@staticmethod
def FromScenarios(
h0_scenario: Scenario,
h1_scenario: Scenario,
h0_operator: Radar | None = None,
h1_operator: Radar | None = None,
num_thresholds: int = 101,
) -> RocEvaluationResult:
"""Compute an ROC evaluation result from two scenarios.
Args:
h0_scenario (Scenario):
Scenario of the null hypothesis.
h1_scenario (Scenario):
Scenario of the alternative hypothesis.
h0_operator (Radar, optional):
Radar operator of the null hypothesis.
If not provided, the first radar operator of the null hypothesis scenario will be used.
h1_operator (Radar, optional):
Radar operator of the alternative hypothesis.
If not provided, the first radar operator of the alternative hypothesis scenario will be used.
num_thresholds (int, optional):
Number of different thresholds to be considered in ROC curve
Returns: The ROC evaluation result.
Raises:
ValueError:
- If the scenarios are not in replay mode
- If the scenarios have no recorded drops available
- If the operators are not registered within the scenarios
- If, for any reason, the operated devices cannot be determined
"""
# Assert that both scenarios are in replay mode
if h0_scenario.mode != ScenarioMode.REPLAY:
raise ValueError("Null hypothesis scenario is not in replay mode")
if h1_scenario.mode != ScenarioMode.REPLAY:
raise ValueError("One hypothesis scenario is not in replay mode")
# Assert that both scenarios have at least a single drop recorded
if h0_scenario.num_drops < 1:
raise ValueError("Null hypothesis scenario has no recorded drops")
if h1_scenario.num_drops < 1:
raise ValueError("One hypothesis scenario has no recorded drops")
# Select operators if none were provided
if h0_operator:
if h0_operator not in h0_scenario.operators:
raise ValueError(
"Null hypthesis radar not an operator within the null hypothesis scenario"
)
else:
if h0_scenario.num_operators < 1:
raise ValueError("Null hypothesis radar has no registered operators")
inferred_h0_operator = None
for operator in h0_scenario.operators:
if isinstance(operator, Radar):
inferred_h0_operator = operator
break
if inferred_h0_operator is None:
raise ValueError("Could not infer radar operator from list of operators")
h0_operator = inferred_h0_operator
if h1_operator:
if h1_operator not in h1_scenario.operators:
raise ValueError(
"One hypthesis radar not an operator within the null hypothesis scenario"
)
else:
if h1_scenario.num_operators < 1:
raise ValueError("One hypothesis radar has no registered operators")
inferred_h1_operator = None
for operator in h1_scenario.operators:
if isinstance(operator, Radar):
inferred_h1_operator = operator
break
if inferred_h1_operator is None:
raise ValueError("Could not infer radar operator from list of operators")
h1_operator = inferred_h1_operator
# Find the indices of the operators within the scenarios
h0_device_index = -1
h1_device_index = -1
h0_operator_index = -1
h1_operator_index = -1
for d, device in enumerate(h0_scenario.devices):
if h0_operator in device.receivers:
h0_device_index = d
h0_operator_index = device.receivers.index(h0_operator)
for d, device in enumerate(h1_scenario.devices):
if h1_operator in device.receivers:
h1_device_index = d
h1_operator_index = device.receivers.index(h1_operator)
if (
h0_device_index < 0
or h1_device_index < 0
or h0_operator_index < 0
or h1_operator_index < 0
):
raise ValueError(
"Could not detect devices and operators within the provided scenarios."
)
# The overall number of considered drops is bounded by the H1 hypothesis
num_drops = h1_scenario.num_drops
artifacts = np.empty(1, dtype=object)
artifacts[0] = []
# Collect artifacts
for _ in range(num_drops):
h0_drop = h0_scenario.drop()
h1_drop = h1_scenario.drop()
h0_reception = h0_drop.device_receptions[h0_device_index].operator_receptions[
h0_operator_index
]
h1_reception = h1_drop.device_receptions[h1_device_index].operator_receptions[
h1_operator_index
]
evaluation = ReceiverOperatingCharacteristic.__evaluation_from_receptions(
h0_reception, h1_reception
)
artifacts[0].append(evaluation.artifact())
# Generate results
grid: Sequence[GridDimension] = []
result = ReceiverOperatingCharacteristic.GenerateResult(grid, artifacts, num_thresholds)
return result
[docs]
@staticmethod
def FromScenario(
scenario: Scenario,
operator: Radar,
h0_campaign: str = "h0",
h1_campaign: str = "h1",
console: Console | None = None,
) -> RocEvaluationResult:
_console = get_console() if console is None else console
# Check if the scenario is in replay mode
if scenario.mode != ScenarioMode.REPLAY:
raise ValueError("Scenario is not in replay mode")
null_receptions: list[RadarReception] = []
alt_receptions: list[RadarReception] = []
# Find the operator index within the scenario
device_index = -1
operator_index = -1
for d, device in enumerate(scenario.devices):
if operator in device.receivers:
operator_index = device.receivers.index(operator)
device_index = d
if device_index < 0 or operator_index < 0:
raise ValueError("Could not detect device and operator within the provided scenario.")
# Collect receptions for both hypothesis
for campaign, hypothesis, receptions in zip(
[h0_campaign, h1_campaign],
["null hypothesis", "alt hypothesis"],
[null_receptions, alt_receptions],
):
scenario.replay(campaign=campaign)
for _ in track(
range(scenario.num_drops), description="Replaying " + hypothesis, console=_console
):
drop = scenario.drop()
receptions.append(
drop.device_receptions[device_index].operator_receptions[operator_index]
)
# Generate the evaluation result
grid: Sequence[GridDimension] = []
artifacts = np.empty(1, dtype=object)
artifacts[0] = [
ReceiverOperatingCharacteristic.__evaluation_from_receptions(h0, h1).artifact()
for h0, h1 in zip(null_receptions, alt_receptions)
]
return ReceiverOperatingCharacteristic.GenerateResult(grid, artifacts)
[docs]
@classmethod
def From_HDF(
cls: Type[ReceiverOperatingCharacteristic],
file: Union[str, File],
h0_campaign="h0_measurements",
h1_campaign="h1_measurements",
num_thresholds: int = 101,
) -> RocEvaluationResult:
"""Compute an ROC evaluation result from a savefile.
Args:
file (Union[str, File]):
Savefile containing the measurements.
Either as file system location or h5py `File` handle.
h0_campaign (str, optional):
Campaign identifier of the h0 hypothesis measurements.
By default, `h0_measurements` is assumed.
h1_campaign (str, optional):
Campaign identifier of the h1 hypothesis measurements.
By default, `h1_measurements` is assumed.
num_thresholds (int, optional):
Number of different thresholds to be considered in ROC curve
Returns: The ROC evaluation result.
"""
# Load scenarios with the respective campaigns from the specified savefile
h0_scenario = ReplayScenario.Replay(file, h0_campaign)
h1_scenario = ReplayScenario.Replay(file, h1_campaign)
# Resort to the from scenarios routine for computing the evaluation result
result = cls.FromScenarios(h0_scenario, h1_scenario, num_thresholds=num_thresholds)
# Close the scenarios properly
h0_scenario.stop()
h1_scenario.stop()
return result
def __del__(self) -> None:
self.__input_hook.remove()
self.__output_hook.remove()
[docs]
class RootMeanSquareArtifact(Artifact):
"""Artifact of a root mean square evaluation"""
__num_errors: int
__cummulation: float
def __init__(self, num_errors: int, cummulation: float) -> None:
"""
Args:
num_errors (int):
Number of errros.
cummulation (float):
Sum of squared errors distances.
"""
self.__num_errors = num_errors
self.__cummulation = cummulation
[docs]
def to_scalar(self) -> float:
return np.sqrt(self.cummulation / self.num_errors)
def __str__(self) -> str:
return f"{self.to_scalar():4.0f}"
@property
def num_errors(self) -> int:
"""Number of cummulated errors"""
return self.__num_errors
@property
def cummulation(self) -> float:
"""Cummulated squared error"""
return self.__cummulation
[docs]
class RootMeanSquareEvaluation(Evaluation[ScatterVisualization]):
"""Result of a single root mean squre evaluation."""
__pcl: RadarPointCloud
__ground_truth: np.ndarray
def __init__(self, pcl: RadarPointCloud, ground_truth: np.ndarray) -> None:
self.__pcl = pcl
self.__ground_truth = ground_truth
[docs]
def artifact(self) -> RootMeanSquareArtifact:
num_errors = self.__pcl.num_points * self.__ground_truth.shape[0]
cummulative_square_error = 0.0
for point, truth in product(self.__pcl.points, self.__ground_truth):
cummulative_square_error += float(np.linalg.norm(point.position - truth)) ** 2
return RootMeanSquareArtifact(num_errors, cummulative_square_error)
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> ScatterVisualization:
return self.__pcl._prepare_visualization(figure, axes, **kwargs)
def _update_visualization(self, visualization: ScatterVisualization, **kwargs) -> None:
self.__pcl._update_visualization(visualization, **kwargs)
[docs]
class RootMeanSquareErrorResult(ScalarEvaluationResult):
"""Result of a root mean square error evaluation."""
... # pragma: no cover
[docs]
class RootMeanSquareError(RadarEvaluator):
"""Root mean square error of estimated points to ground truth.
Root mean square error (RMSE) is a widely used metric for evaluating the performance of a radar detector.
It estimates the average distance between the estimated and the ground truth position of a target.
A minimal example within the context of a :class:`Simulation<hermespy.simulation.simulation.Simulation>`
evaluating the probability of detection for a single radar target illuminated by an :class:`FMCW<hermespy.radar.fmcw.FMCW>` radar would be:
.. literalinclude:: ../scripts/examples/radar_evaluators_RootMeanSquareError.py
:language: python
:linenos:
:lines: 03-29
"""
[docs]
def evaluate(self) -> Evaluation:
point_cloud = self._fetch_pcl()
channel_sample = self._fetch_channel()
# Consolide the ground truth
ground_truth = np.array(
[p.ground_truth[0] for p in channel_sample.paths if p.ground_truth is not None]
)
return RootMeanSquareEvaluation(point_cloud, ground_truth)
@property
def title(self) -> str:
return "Root Mean Square Error"
@property
def abbreviation(self) -> str:
return "RMSE"
[docs]
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> RootMeanSquareErrorResult:
rmse_scalar_results = np.empty(artifacts.shape, dtype=float)
for coordinates, section_artifacts in np.ndenumerate(artifacts):
cummulative_errors = 0.0
error_count = 0
artifact: RootMeanSquareArtifact
for artifact in section_artifacts:
cummulative_errors += artifact.cummulation
error_count += artifact.num_errors
rmse = np.sqrt(cummulative_errors / error_count)
rmse_scalar_results[coordinates] = rmse
return RootMeanSquareErrorResult(grid, rmse_scalar_results, self)