Source code for hermespy.radar.detection

# -*- coding: utf-8 -*-
"""
===============
Radar Detection
===============
"""

from __future__ import annotations
from abc import abstractmethod
from math import cos, sin
from typing import List, Tuple, Type

import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D  # type: ignore
from scipy.ndimage import generate_binary_structure, maximum_filter
from scipy.signal import convolve

from hermespy.core import Serializable, VAT, Visualizable
from .cube import RadarCube

__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Egor Achkasov"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class PointDetection(object): """A single radar point detection.""" __position: np.ndarray # Cartesian position of the detection in m __velocity: np.ndarray # Velocity of the detection in m/s __power: float # Power of the detection def __init__(self, position: np.ndarray, velocity: np.ndarray, power: float) -> None: """ Args: position (np.ndarray): Cartesian position of the detection in cartesian coordinates. velocity (np.ndarray): Velocity vector of the detection in m/s power (float): Power of the detection. Raises: ValueError: If `position` is not three-dimensional. If `velocity` is not three-dimensional. If `power` is smaller or equal to zero. """ if position.ndim != 1 or len(position) != 3: raise ValueError("Position must be a three-dimensional vector.") if velocity.ndim != 1 or len(velocity) != 3: raise ValueError("Velocity must be a three-dimensional vector.") if power <= 0.0: raise ValueError("Detected power must be greater than zero") self.__position = position self.__velocity = velocity self.__power = power
[docs] @classmethod def FromSpherical( cls: Type[PointDetection], zenith: float, azimuth: float, range: float, velocity: float, power: float, ) -> PointDetection: """Generate a point detection from radar cube spherical coordinates. Args: zenith (float): Zenith angle in Radians. azimuth (float): Azimuth angle in Radians. range (float): Point distance to coordiante system origin in m/s. velocity (float): Velocity from / to the coordinate system origin in m/s. power (float): Point power indicator. """ normal_vector = np.array( [cos(azimuth) * sin(zenith), sin(azimuth) * sin(zenith), cos(zenith)], dtype=float ) position = range * normal_vector velocity_vector = velocity * normal_vector return cls(position=position, velocity=velocity_vector, power=power)
@property def position(self) -> np.ndarray: """Position of the detection. Returns: np.ndarray: Cartesian position in m. """ return self.__position @property def velocity(self) -> np.ndarray: """Velocity of the detection. Returns: np.ndarray: Velocity vector in m/s. """ return self.__velocity @property def power(self) -> float: """Detected power. Returns: float: Power. """ return self.__power def __eq__(self, __value: object) -> bool: if not isinstance(__value, PointDetection): return NotImplemented return ( bool(np.all(self.position == __value.position)) and bool(np.all(self.velocity == __value.velocity)) and self.power == __value.power )
[docs] class RadarPointCloud(Visualizable): """A sparse radar point cloud.""" __points: List[PointDetection] __max_range: float def __init__(self, max_range: float = 0.0) -> None: """ Args: max_range (float, optional): Maximal represented range. Used for visualization. Raises: ValueError: For `max_range` smaller than zero. """ if max_range <= 0.0: raise ValueError( f"Maximal represented range must be greater than zero (not {max_range})" ) # Initialize base class Visualizable.__init__(self) # Initialize class attributes self.__points = [] self.__max_range = max_range @property def max_range(self) -> float: """Maximal represented range. Returns: Maximal range in m. Zero if unknown. """ return self.__max_range @property def points(self) -> List[PointDetection]: """Points contained within the represented point cloud. Returns: List of represented points. """ return self.__points @property def num_points(self) -> int: """Number of points contained within this point cloud. Returns: The number of points. """ return len(self.__points)
[docs] def add_point(self, point: PointDetection) -> None: """Add a new detection to the point cloud. Args: point (PointDetection): The detection to be added. """ self.__points.append(point)
@property def title(self) -> str: return "Radar Point Coud" def _new_axes(self, **kwargs) -> Tuple[plt.Figure, VAT]: figure, axes = plt.subplots(1, 1, squeeze=False, subplot_kw={"projection": "3d"}) return figure, axes def _plot(self, axes: VAT) -> None: ax: Axes3D = axes.flat[0] # type: ignore for point in self.points: position = point.position ax.scatter( position[0], position[2], position[1], marker="o", c=point.power, cmap="Greens" ) # Configure axes ax.set_xlim((-self.max_range, self.max_range)) ax.set_ylim((0, self.max_range)) ax.set_zlim((-self.max_range, self.max_range)) ax.set_xlabel("X [m]") ax.set_ylabel("Z [m]") ax.set_zlabel("Y [m]")
[docs] class RadarDetector(object): """Base class for radar detection algorithms. Radar detection algorithms convert a radar cube to a radar point cloud. """
[docs] @abstractmethod def detect(self, cube: RadarCube) -> RadarPointCloud: """Generate a point cloud from a radar cube. Args: cube (RadarCube): The radar cube to be processed. Returns: The resulting (usually sparse) point cloud. """ ... # pragma: no cover
[docs] class ThresholdDetector(RadarDetector, Serializable): """Extract points by a power threshold.""" yaml_tag = "Threshold" __min_power: float # Minmally required point power __normalize: bool __peak_detection: bool def __init__( self, min_power: float, normalize: bool = True, peak_detection: bool = True ) -> None: """ Args: min_power (float): Minmally required point power. normalize (bool, optional): Normalize the power during detection, so that min_power becomes a relative value betwee zero and one. Enabled by default. peak_detection (bool, optional): Run a peak detection algorithm to only extract points at power peaks. Enabled by default. """ self.min_power = min_power self.__normalize = normalize self.__peak_detection = peak_detection @property def min_power(self) -> float: """Minimally required point power. Returns: Power (linear). Raises: ValueError: On powers smaller or equal to zero. """ return self.__min_power @min_power.setter def min_power(self, value: float) -> None: if value <= 0.0: raise ValueError("Point power threshold must be greater than zero") self.__min_power = value @property def normalize(self) -> bool: """Normalize cube power before detection. Returns: Enabled flag. """ return self.__normalize @property def peak_detection(self) -> bool: """Run a peak search before detection. Returns: Enabled flag. """ return self.__peak_detection @peak_detection.setter def peak_detection(self, value: bool) -> None: self.__peak_detection = value
[docs] def detect(self, cube: RadarCube) -> RadarPointCloud: # Extract cube data and normalize if the respective flag is enabled cube_max = cube.data.max() cube_data: np.ndarray = ( (cube.data - cube.data.min()) / cube_max if self.normalize and cube_max > 0.0 else cube.data ) # Filter the raw cube data for peaks if self.peak_detection: footprint = generate_binary_structure(3, 1) candidates = maximum_filter(cube_data, footprint=footprint) == cube_data detection_point_indices = np.argwhere( np.logical_and(candidates, cube_data >= self.min_power) ) else: # Apply the thershold and extract the coordinates matching the requirements detection_point_indices = np.argwhere(cube_data >= self.min_power) # Transform the detections to a point cloud cloud = RadarPointCloud(cube.range_bins.max()) for point_indices in detection_point_indices: azimuth, zenith = cube.angle_bins[point_indices[0]] velocity = cube.velocity_bins[point_indices[1]] range = cube.range_bins[point_indices[2]] power_indicator = cube.data[tuple(point_indices)] cloud.add_point( PointDetection.FromSpherical(zenith, azimuth, range, velocity, power_indicator) ) return cloud
[docs] class MaxDetector(RadarDetector, Serializable): """Extracts the maximum point from the radar cube.""" yaml_tag = "Max" """YAML serialization tag."""
[docs] def detect(self, cube: RadarCube) -> RadarPointCloud: # Find the maximum point point_index = np.unravel_index(np.argmax(cube.data), cube.data.shape) point_power = cube.data[point_index] # Return an empty point cloud if no maximum was found if point_power <= 0.0: return RadarPointCloud(cube.range_bins.max()) angles_of_arrival = cube.angle_bins[point_index[0], :] velocity = cube.velocity_bins[point_index[1]] range = cube.range_bins[point_index[2]] cloud = RadarPointCloud(cube.range_bins.max()) cloud.add_point( PointDetection.FromSpherical( angles_of_arrival[0], angles_of_arrival[1], range, velocity, point_power ) ) return cloud
class CFARDetector(RadarDetector, Serializable): """Constant False Alarm Rate Detector. Performs a two-dimensional detection over a :class:`RadarCube<hermespy.radar.cube.RadarCube>`'s range and doppler-frequency domain. Refer to :footcite:t:`1983:Rohling` for more information. The implemented detection kernel is of size .. math:: \\left(1 + 2 N_{\\mathrm{T, R}} + 2 N_{\\mathrm{G, R}}\\right) \\times \\left(1 + 2 N_{\\mathrm{T, D}} + 2 N_{\\mathrm{G, D}}\\right) as depicted in the following figure .. figure:: /images/cfardetector_kernel_example.png :alt: CFAR Detector kernel window example :scale: 45% where `c` indicates the cell under test, `g` the guard cells and `t` the training cells. For each cell under test, the detector calculates the noise threshold over the available training cells .. math:: P_{r,d} = \sum_{\\substack{ i \\in \\lbrace \\mathbb{Z} | N_\\mathrm{G, R} < \\lVert r + i \\rVert \\leq N_{\\mathrm{T, R}} \\rbrace \\\\ j \\in \\lbrace \\mathbb{Z} | N_\\mathrm{G, D} < \\lVert d + j \\rVert \\leq N_{\\mathrm{T, D}} \\rbrace }} x_{r+i,d+j} and makes a detection descision .. math:: x_{r,d} > \\left( P_{\\mathrm{Fa}}^{\\frac{-1}{2 N_{\\mathrm{T, R}}+ 2 N_{\\mathrm{T, D}}}} - 1 \\right) P_{r,d} based on the configured probability of false alarm :math:`P_{\\mathrm{Fa}}`. """ yaml_tag = "CFAR" """YAML serialization tag.""" def __init__(self, num_training_cells: tuple, num_guard_cells: tuple, pfa: float) -> None: """ CFAR Detector over the radar cube. Args: num_training_cells (tuple): Number of training cells in a kernel slice. Must be a pair of even ints. num_guard_cells (int): Number of guard cells in a kernel slice. Must be a pair of even ints. pfa (float): Probability of False Alarm. Must be in the interval of [0, 1]. """ self.num_training_cells = num_training_cells self.num_guard_cells = num_guard_cells self.pfa = pfa @property def num_training_cells(self) -> tuple: """Number of training cells in the detection kernel slice. Denoted by :math:`N_{\\mathrm{T, R}}` and :math:`N_{\\mathrm{T, D}}`. """ return self.__num_training_cells @num_training_cells.setter def num_training_cells(self, value: tuple) -> None: if ( not isinstance(value, tuple) or (len(value) != 2) or not isinstance(value[0], int) or not isinstance(value[1], int) ): raise ValueError("num_training_cells must be a tuple of two ints") if (value[0] <= 0) or (value[1] <= 0): raise ValueError("Number of training cells must be greater than zero") self.__num_training_cells = value @property def num_guard_cells(self) -> tuple: """Number of guard cells in the detection kernel slice. Denoted by :math:`N_{\\mathrm{G, R}}` and :math:`N_{\\mathrm{G, D}}`. """ return self.__num_guard_cells @num_guard_cells.setter def num_guard_cells(self, value: tuple) -> None: if ( not isinstance(value, tuple) or (len(value) != 2) or not isinstance(value[0], int) or not isinstance(value[1], int) ): raise ValueError("num_guard_cells must be a tuple of two ints") if (value[0] < 0) or (value[1] < 0): raise ValueError("Number of training cells must be non-negative") self.__num_guard_cells = value @property def pfa(self) -> float: """Probability of False Alarm. Denoted by :math:`P_{\\mathrm{Fa}}`. """ return self.__pfa @pfa.setter def pfa(self, value: float) -> None: if (value < 0) or (value > 1): raise ValueError("Probability of false alarm must be in [0, 1]") self.__pfa = value def detect(self, cube: RadarCube) -> RadarPointCloud: # window is # [half of training cells][half of guard cells][CUT][half of guard cells][half of training cells] window_size_x = 2 * self.num_training_cells[0] + 2 * self.num_guard_cells[0] + 1 window_size_y = 2 * self.num_training_cells[1] + 2 * self.num_guard_cells[1] + 1 # check if data is bigger then the window if (cube.data.shape[1] < window_size_x) or (cube.data.shape[2] < window_size_y): raise ValueError( "Radar cube 2nd and 3rd dimensions must be bigger then num_training_cells + num_guard_cells + 1" ) # prepare convolution kernel kernel = np.ones((1, window_size_x, window_size_y)) kernel[ :, self.num_training_cells[0] : -self.num_training_cells[0], self.num_training_cells[1] : -self.num_training_cells[1], ] = 0 # calculate noise threshold matrix noise_threshold = convolve(cube.data, kernel, mode="same", method="direct") threshold_normalization = convolve( np.ones(cube.data.shape), kernel, mode="same", method="direct" ) # detect threshold_factor = self.pfa ** (-1 / threshold_normalization) - 1 detection_point_indices = np.argwhere( np.abs(cube.data) > np.abs(noise_threshold) * threshold_factor ) # Transform the detections to a point cloud cloud = RadarPointCloud(cube.range_bins.max()) for point_indices in detection_point_indices: azimuth, zenith = cube.angle_bins[point_indices[0]] velocity = cube.velocity_bins[point_indices[1]] range = cube.range_bins[point_indices[2]] power_indicator = cube.data[tuple(point_indices)] cloud.add_point( PointDetection.FromSpherical(zenith, azimuth, velocity, range, power_indicator) ) return cloud