# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from typing import Optional, Type
import numpy as np
from h5py import Group
from scipy.constants import speed_of_light
from hermespy.beamforming import ReceiveBeamformer, TransmitBeamformer
from hermespy.core import (
Device,
DuplexOperator,
FloatingError,
Signal,
Serializable,
SNRType,
Transmission,
Reception,
)
from .cube import RadarCube
from .detection import RadarDetector, RadarPointCloud
__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "André Noll Barreto"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class RadarTransmission(Transmission):
"""Information generated by transmitting over a radar.
Generated by calling a :class:`.Radar`'s :meth:`transmit()<.Radar.transmit>` method.
"""
def __init__(self, signal: Signal) -> None:
"""
Args:
signal (Signal): Transmitted radar waveform.
"""
Transmission.__init__(self, signal)
[docs]
class RadarReception(Reception):
"""Information generated by receiving over a radar.
Generated by calling a :class:`.Radar`'s :meth:`receive()<.Radar.receive>` method.
"""
__cube: RadarCube # Processed raw radar data
__cloud: RadarPointCloud | None # Detected radar point cloud
def __init__(
self, signal: Signal, cube: RadarCube, cloud: RadarPointCloud | None = None
) -> None:
"""
Args:
signal (Signal): Received radar waveform.
cube (RadarCube): Processed raw radar data.
cloud (RadarPointCloud, optional): Radar point cloud. :obj:`None` if a point cloud was not generated.
"""
# Initialize base class
Reception.__init__(self, signal)
# Initialize class attributes
self.__cube = cube
self.__cloud = cloud
@property
def cube(self) -> RadarCube:
"""Cube of processed raw radar data."""
return self.__cube
@property
def cloud(self) -> RadarPointCloud | None:
"""Detected radar point cloud.
:obj:`None` if a point cloud was not generated."""
return self.__cloud
def to_HDF(self, group: Group) -> None:
# Serialize base class
Reception.to_HDF(self, group)
# Serialize class attributes
self.cube.to_HDF(self._create_group(group, "cube"))
return
@classmethod
def from_HDF(cls: Type[RadarReception], group: Group) -> RadarReception:
signal = Signal.from_HDF(group["signal"])
cube = RadarCube.from_HDF(group["cube"])
return RadarReception(signal, cube)
[docs]
class Radar(DuplexOperator[RadarTransmission, RadarReception], Serializable):
"""Signal processing pipeline of a monostatic radar sensing its environment.
The radar can be configured by assigning four composite objects to respective property attributes:
.. list-table::
:header-rows: 1
* - Property
- Type
* - :meth:`waveform<.waveform>`
- :class:`RadarWaveform`
* - :meth:`transmit_beamformer<.transmit_beamformer>`
- :class:`TransmitBeamformer<hermespy.beamforming.beamformer.TransmitBeamformer>`
* - :meth:`receive_beamformer<.receive_beamformer>`
- :class:`ReceiveBeamformer<hermespy.beamforming.beamformer.ReceiveBeamformer>`
* - :meth:`detector<.detector>`
- :class:`RadarDetector<hermespy.radar.detection.RadarDetector>`
Of those, only a :class:`RadarWaveform` is mandatory.
Beamformers, i.e. a :class:`TransmitBeamformer<hermespy.beamforming.beamformer.TransmitBeamformer>` and a :class:`ReceiveBeamformer<hermespy.beamforming.beamformer.ReceiveBeamformer>`,
are only required when the radar is assigned to a :class:`Device<hermespy.core.device.Device>` configured to multiple :meth:`antennas<hermespy.core.device.Device.antennas>`.
A :class:`RadarDetector<hermespy.radar.detection.RadarDetector>` is optional,
if not configured the radar's generated :class:`RadarReception` will not contain a :class:`RadarPointCloud<hermespy.radar.detection.RadarPointCloud>`.
When assigned to a :class:`Device<hermespy.core.device.Device>`,
device transmission will trigger the radar to generate a :class:`RadarTransmission` by executing the following sequence of calls:
.. mermaid::
sequenceDiagram
participant Device
participant Radar
participant RadarWaveform
participant TransmitBeamformer
Device ->> Radar: _transmit()
Radar ->> RadarWaveform: ping()
RadarWaveform -->> Radar: Signal
Radar ->> TransmitBeamformer: transmit(Signal)
TransmitBeamformer -->> Radar: Signal
Radar -->> Device: RadarTransmission
Initially, the :meth:`ping<RadarWaveform.ping>` method of the :class:`RadarWaveform` is called to generate the model
of a single-antenna radar frame.
For :class:`Devices<hermespy.core.device.Device>` configured to multiple :meth:`antennas<hermespy.core.device.Device.antennas>`,
the configured :class:`TransmitBeamformer<hermespy.beamforming.beamformer.TransmitBeamformer>` is called to encode the signal for each antenna.
The resulting multi-antenna frame, contained within the return :class:`RadarTransmission`, is cached at the assigned :class:`Device<hermespy.core.device.Device>`.
When assigned to a :class:`Device<hermespy.core.device.Device>`,
device reception will trigger the radar to generate a :class:`RadarReception` by executing the following sequence of calls:
.. mermaid::
sequenceDiagram
participant Device
participant Radar
participant ReceiveBeamformer
participant RadarWaveform
participant RadarDetector
Device ->> Radar: _receive(Signal)
Radar ->> ReceiveBeamformer: probe(Signal)
ReceiveBeamformer -->> Radar: line_signals
loop
Radar ->> RadarWaveform: estimate(line_signal)
RadarWaveform -->> Radar: line_estimate
end
Radar ->> RadarDetector: detect(line_estimates)
RadarDetector -->> Radar: RadarPointCloud
Radar -->> Device: RadarReception
Initially, the :meth:`probe<hermespy.beamforming.beamformer.ReceiveBeamformer.probe>` method of the :class:`ReceiveBeamformer<hermespy.beamforming.beamformer.ReceiveBeamformer>` is called to generate a sequence of
line signals from each direction of interest.
We refer to them as *line signals* as they are the result of an antenna arrays beamforing towards a single direction of interest,
so the signal can be though of as having propagated along a single line pointing towards the direction of interest.
This step is only executed for :class:`Devices<hermespy.core.device.Device>` configured to multiple :meth:`antennas<hermespy.core.device.Device.antennas>`.
The sequence of line signals are then indiviually processed by the :meth:`estimate<RadarWaveform.estimate>` method of the :class:`RadarWaveform`,
resulting in a line estimate representing a range-doppler map for each direction of interest.
This sequence line estimates is combined to a single :class:`RadarCube<hermespy.radar.cube.RadarCube>`.
If a :class:`RadarDetector<hermespy.radar.detection.RadarDetector>` is configured, the :meth:`detect<hermespy.radar.detection.RadarDetector.detect>` method is called to generate a :class:`RadarPointCloud<hermespy.radar.detection.RadarPointCloud>`
from the :class:`RadarCube<hermespy.radar.cube.RadarCube>`.
The resulting information is cached as a :class:`RadarReception` at the assigned :class:`Device<hermespy.core.device.Device>`.
"""
yaml_tag = "Radar"
property_blacklist = {"slot"}
__transmit_beamformer: Optional[TransmitBeamformer]
__receive_beamformer: Optional[ReceiveBeamformer]
__waveform: Optional[RadarWaveform]
__detector: Optional[RadarDetector]
def __init__(self, device: Device | None = None, seed: int | None = None) -> None:
"""
Args:
device (Device, optional):
The device the radar is assigned to.
seed (int, optional):
Random seed used to generate the radar's internal state.
"""
self.waveform = None
self.receive_beamformer = None
self.transmit_beamformer = None
self.__waveform = None
self.detector = None
DuplexOperator.__init__(self, device, device, seed)
@property
def transmit_beamformer(self) -> TransmitBeamformer | None:
"""Beamforming applied during signal transmission.
The :class:`TransmitBeamformer<hermespy.beamforming.beamformer.TransmitBeamformer>`'s :meth:`transmit<hermespy.beamforming.beamformer.TransmitBeamformer.transmit>`
method is called as a subroutine of :meth:`Transmitter.transmit()<hermespy.core.device.Transmitter.transmit>`.
Configuration is required for if the assigned :class:`Device<hermespy.core.device.Device>` features multiple :meth:`antennas<hermespy.core.device.Device.antennas>`.
"""
return self.__transmit_beamformer
@transmit_beamformer.setter
def transmit_beamformer(self, value: TransmitBeamformer | None) -> None:
if value is None:
self.__transmit_beamformer = None
else:
value.operator = self
self.__transmit_beamformer = value
@property
def receive_beamformer(self) -> ReceiveBeamformer | None:
"""Beamforming applied during signal reception.
The :class:`TransmitBeamformer<hermespy.beamforming.beamformer.ReceiveBeamformer>`'s :meth:`receive<hermespy.beamforming.beamformer.ReceiveBeamformer.receive>`
method is called as a subroutine of :meth:`Receiver.receive()<hermespy.core.device.Receiver.receive>`.
Configuration is required for if the assigned :class:`Device<hermespy.core.device.Device>` features multiple :meth:`antennas<hermespy.core.device.Device.antennas>`.
"""
return self.__receive_beamformer
@receive_beamformer.setter
def receive_beamformer(self, value: Optional[ReceiveBeamformer]) -> None:
if value is None:
self.__receive_beamformer = None
else:
value.operator = self
self.__receive_beamformer = value
@property
def sampling_rate(self) -> float:
return self.waveform.sampling_rate
@property
def frame_duration(self) -> float:
if self.waveform is None:
return 0.0
return self.waveform.frame_duration
def _noise_power(self, strength: float, snr_type=SNRType) -> float:
# No waveform configured equals no noise required
if self.waveform is None:
return 0.0
if snr_type == SNRType.EN0:
return self.waveform.energy / strength
if snr_type == SNRType.PN0:
return self.waveform.power / strength
raise ValueError(f"SNR of type '{snr_type}' is not supported by radar operators")
@property
def waveform(self) -> RadarWaveform | None:
"""Description of the waveform to be transmitted and received by this radar.
:obj:`None` if no waveform is configured.
During :meth:`transmit<Radar.transmit>` / :meth:`_transmit<Radar._transmit>`,
the :class:`.RadarWaveform`'s :meth:`ping()<.RadarWaveform.ping>` method is called
to generate a signal to be transmitted by the radar.
During :meth:`receive<Radar.receive>` / :meth:`_receive<Radar._receive>`, the :class:`.RadarWaveform`'s :meth:`estimate()<.RadarWaveform.estimate>` method is called
multiple times to generate range-doppler line estimates for each direction of interest.
"""
return self.__waveform
@waveform.setter
def waveform(self, value: RadarWaveform | None) -> None:
self.__waveform = value
@property
def max_range(self) -> float:
"""The radar's maximum detectable range in meters.
Denoted by :math:`R_{\\mathrm{Max}}` of unit :math:`\\left[ R_{\\mathrm{Max}} \\right] = \\mathrm{m}` in literature.
Convenience property that resolves to the configured :class:`.RadarWaveform`'s :meth:`max_range<.RadarWaveform.max_range>` property.
Returns :math:`R_{\\mathrm{Max}} = 0` if no waveform is configured.
"""
return 0.0 if self.waveform is None else self.waveform.max_range
@property
def velocity_resolution(self) -> float:
"""The radar's velocity resolution in meters per second.
Denoted by :math:`\\Delta v` of unit :math:`\\left[ \\Delta v \\right] = \\frac{\\mathrm{m}}{\\mathrm{s}}` in literature.
Computed as
.. math::
\\Delta v = \\frac{c_0}{f_{\\mathrm{c}}} \\Delta f_{\\mathrm{Res}}
querying the configured :class:`.RadarWaveform`'s :meth:`relative_doppler_resolution<.RadarWaveform.relative_doppler_resolution>` property :math:`\\Delta f_{\\mathrm{Res}}`.
"""
if self.waveform is None:
raise FloatingError("Cannot compute velocity resolution without a waveform")
if self.carrier_frequency == 0.0:
raise RuntimeError("Cannot compute velocity resolution in base-band carrier frequency")
return (
0.5
* self.waveform.relative_doppler_resolution
* speed_of_light
/ self.carrier_frequency
)
@property
def detector(self) -> RadarDetector | None:
"""Detector routine configured to generate point clouds from radar cubes.
If configured, during :meth:`_receive<Radar._receive>` / :meth:`receive<Receiver.receive>`,
the detector's :meth:`detect<RadarDetector.detect>` method is called to generate a :class:`RadarPointCloud<hermespy.radar.detection.RadarPointCloud>`.
If not configured, i.e. :obj:`None`, the generated :class:`.RadarReception`'s :attr:`cloud<.RadarReception.cloud>` property will be :obj:`None`.
"""
return self.__detector
@detector.setter
def detector(self, value: Optional[RadarDetector]) -> None:
self.__detector = value
[docs]
def _transmit(self, duration: float = 0.0) -> RadarTransmission:
if not self.__waveform:
raise RuntimeError("Radar waveform not specified")
if not self.device:
raise RuntimeError("Error attempting to transmit over a floating radar operator")
# Generate the radar waveform
signal = self.waveform.ping()
# If the device has more than one antenna, a beamforming strategy is required
if self.device.antennas.num_antennas > 1:
# If no beamformer is configured, only the first antenna will transmit the ping
if self.transmit_beamformer is None:
additional_streams = Signal(
np.zeros(
(
self.device.antennas.num_antennas - signal.num_streams,
signal.num_samples,
),
dtype=complex,
),
signal.sampling_rate,
)
signal.append_streams(additional_streams)
elif self.transmit_beamformer.num_transmit_input_streams != 1:
raise RuntimeError(
"Only transmit beamformers requiring a single input stream are supported by radar operators"
)
elif (
self.transmit_beamformer.num_transmit_output_streams
!= self.device.antennas.num_antennas
):
raise RuntimeError(
"Radar operator transmit beamformers are required to consider the full number of antennas"
)
else:
signal = self.transmit_beamformer.transmit(signal)
# Prepare transmission
signal.carrier_frequency = self.carrier_frequency
transmission = RadarTransmission(signal)
return transmission
[docs]
def _receive(self, signal: Signal) -> RadarReception:
if not self.waveform:
raise RuntimeError("Radar waveform not specified")
if not self.device:
raise RuntimeError("Error attempting to receive over a floating radar operator")
# Resample signal properly
signal = signal.resample(self.__waveform.sampling_rate)
# If the device has more than one antenna, a beamforming strategy is required
if self.device.antennas.num_antennas > 1:
if self.receive_beamformer is None:
raise RuntimeError(
"Receiving over a device with more than one antenna requires a beamforming configuration"
)
if self.receive_beamformer.num_receive_output_streams != 1:
raise RuntimeError(
"Only receive beamformers generating a single output stream are supported by radar operators"
)
if (
self.receive_beamformer.num_receive_input_streams
!= self.device.antennas.num_antennas
):
raise RuntimeError(
"Radar operator receive beamformers are required to consider the full number of antenna streams"
)
beamformed_samples = self.receive_beamformer.probe(signal)[:, 0, :]
else:
beamformed_samples = signal.samples
# Build the radar cube by generating a beam-forming line over all angles of interest
angles_of_interest = (
np.array([[0.0, 0.0]], dtype=float)
if self.receive_beamformer is None
else self.receive_beamformer.probe_focus_points[:, 0, :]
)
range_bins = self.waveform.range_bins
doppler_bins = self.waveform.relative_doppler_bins
cube_data = np.empty(
(len(angles_of_interest), len(doppler_bins), len(range_bins)), dtype=float
)
for angle_idx, line in enumerate(beamformed_samples):
# Process the single angular line by the waveform generator
line_signal = Signal(
line, signal.sampling_rate, carrier_frequency=signal.carrier_frequency
)
line_estimate = self.waveform.estimate(line_signal)
cube_data[angle_idx, ::] = line_estimate
# Create radar cube object
cube = RadarCube(
cube_data, angles_of_interest, doppler_bins, range_bins, self.carrier_frequency
)
# Infer the point cloud, if a detector has been configured
cloud = None if self.detector is None else self.detector.detect(cube)
reception = RadarReception(signal, cube, cloud)
return reception
def _recall_transmission(self, group: Group) -> RadarTransmission:
return RadarTransmission.from_HDF(group)
def _recall_reception(self, group: Group) -> RadarReception:
return RadarReception.from_HDF(group)