# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from math import ceil
from typing import Any, Generic, Mapping, Set, Sequence, Tuple, Type, TypeVar
import numpy as np
from h5py import Group
from scipy.constants import pi, speed_of_light
from sparse import GCXS # type: ignore
from hermespy.core import (
AntennaMode,
ChannelStateInformation,
ChannelStateFormat,
DeviceState,
Direction,
HDFSerializable,
SignalBlock,
)
from ..channel import (
Channel,
ChannelSample,
LinkState,
ChannelSampleHook,
ChannelRealization,
InterpolationMode,
)
__author__ = "Andre Noll Barreto"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Andre Noll Barreto", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
RCST = TypeVar("RCST", bound="RadarChannelSample")
"""Type of radar channel sample."""
[docs]
class RadarChannelSample(ChannelSample):
"""Realization of a radar channel.
Generated by :meth:`RadarChannelBase.realize` and :meth:`RadarChannelBase.realize_interference`.
"""
__paths: Sequence[RadarPath]
__gain: float
def __init__(self, paths: Sequence[RadarPath], gain: float, state: LinkState) -> None:
"""
Args:
paths (Sequence[RadarPath]):
Sequence of realized radar propagation paths.
gain (float):
Channel gain in linear scale.
state (ChannelState):
State of the channel at the time of sampling.
"""
# Initialize base class
ChannelSample.__init__(self, state)
# Initialize class attributes
self.__paths = paths
self.__gain = gain
@property
def paths(self) -> Sequence[RadarPath]:
"""Sequence of realized radar propagation paths."""
return self.__paths
@property
def gain(self) -> float:
"""Channel gain in linear scale."""
return self.__gain
@property
def expected_energy_scale(self) -> float:
return self.__gain # ToDo: Consider the energy scale of the paths
def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
delays = np.array(
[
path.propagation_delay(self.transmitter_state, self.receiver_state)
for path in self.paths
]
)
velocities = np.array(
[
path.relative_velocity(self.transmitter_state, self.receiver_state)
for path in self.paths
]
)
# Compute the expected sample overhead of the propagated sample resultin from propagtion delays
if delays.size > 0:
max_delay_in_samples = ceil(
delays.max() * self.bandwidth
+ 2 * velocities.max() * signal.num_samples / (self.bandwidth * speed_of_light)
)
else:
max_delay_in_samples = 0
propagated_samples = np.zeros(
(self.num_receive_antennas, signal.num_samples + max_delay_in_samples),
dtype=np.complex128,
)
# Compute the signal propagated along each respective path realization
for path, delay, velocity in zip(self.paths, delays, velocities):
path.add_propagation(
self.transmitter_state,
self.receiver_state,
signal,
self.bandwidth,
self.carrier_frequency,
propagated_samples,
delay,
velocity,
)
# Apply the channel gain
propagated_samples *= self.gain**0.5
return SignalBlock(propagated_samples, signal._offset)
[docs]
def state(
self,
num_samples: int,
max_num_taps: int,
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST,
) -> ChannelStateInformation:
raw_state = np.zeros(
(
self.receiver_state.antennas.num_receive_antennas,
self.transmitter_state.antennas.num_transmit_antennas,
num_samples,
max_num_taps,
),
dtype=np.complex128,
)
for path in self.paths:
path.add_state(
self.transmitter_state,
self.receiver_state,
self.bandwidth,
self.carrier_frequency,
0.0,
raw_state,
)
# Apply the channel gain
raw_state *= self.gain**0.5
return ChannelStateInformation(
ChannelStateFormat.IMPULSE_RESPONSE, GCXS.from_numpy(raw_state)
)
[docs]
def null_hypothesis(self) -> RadarChannelSample:
"""Generate a null hypothesis channel sample rom a given channel sample.
Null hypothesis sample will remove non-static propagation components from the channel model.
This function is, for example, accessed to evaluate a radar link's receiver operating characteristics.
Returns: The null hypothesis radar channel realization.
"""
# Remove non-static paths
static_paths = [path for path in self.paths if path.static]
return RadarChannelSample(
static_paths,
self.gain,
LinkState(
self.transmitter_state,
self.receiver_state,
self.carrier_frequency,
self.bandwidth,
0.0,
),
)
[docs]
class RadarChannelRealization(ChannelRealization[RadarChannelSample]):
"""Realization of a radar channel."""
def __init__(
self, sample_hooks: Set[ChannelSampleHook[RadarChannelSample]], gain: float
) -> None:
"""_summary_
Args:
sample_hooks (Set[ChannelSampleHook[CST]], optional):
Hooks to be called after the channel is sampled.
gain (float):
Linear power gain factor a signal experiences when being propagated over this realization.
"""
# Initialize base class
ChannelRealization.__init__(self, sample_hooks, gain)
@abstractmethod
def _generate_paths(self, state: LinkState) -> Sequence[RadarPath]:
"""Generate the sequence of realized radar propagation paths.
Subroutine of :meth:`RadarChannelRealization._sample`.
Args:
state (ChannelState):
State of the channel at the time of sampling.
Returns: Sequence of realized radar propagation paths.
"""
... # pragma: no cover
def _sample(self, state: LinkState) -> RadarChannelSample:
return RadarChannelSample(self._generate_paths(state), self.gain, state)
def _reciprocal_sample(
self, sample: RadarChannelSample, state: LinkState
) -> RadarChannelSample:
return RadarChannelSample(sample.paths, sample.gain, state)
RCRT = TypeVar("RCRT", bound=RadarChannelRealization)
"""Type of radar channel realization."""
[docs]
class RadarPath(HDFSerializable):
"""Realization of a radar propagation path between transmitter and receiver"""
__attenuate: bool
__static: bool
def __init__(self, attenuate: bool = True, static: bool = False) -> None:
"""
Args:
attenuate (bool, optional):
Should the propagated signal be attenuated during propagation modeling?
Enabled by default.
static (bool, optional):
Is the path considered static?
Static paths will remain during null hypothesis testing.
Disabled by default.
"""
# Initialize class attributes
self.__attenuate = attenuate
self.__static = static
@property
def attenuate(self) -> bool:
"""Should a propagated signal be attenuated during propagation modeling?"""
return self.__attenuate
@attenuate.setter
def attenuate(self, value: bool) -> None:
self.__attenuate = value
@property
def static(self) -> bool:
"""Is the path considered static?"""
return self.__static
@static.setter
def static(self, value: bool) -> None:
self.__static = value
@property
@abstractmethod
def ground_truth(self) -> Tuple[np.ndarray, np.ndarray] | None:
"""Consolidate the true target information represented by this path.
Either a tuple of the target's position and velocity or `None` if the path
represents only interference / clutter.
"""
... # pragma: no cover
[docs]
@abstractmethod
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
"""Propagation delay of the wave from transmitter over target to receiver.
Denoted by :math:`\\tau_{\\ast}` within the respective equations.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (Device):
Receiving device.
Returns: Propagation delay in seconds.
"""
... # pragma: no cover
[docs]
@abstractmethod
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
"""Relative velocity between transmitter and receiver.
Denoted by :math:`v_{\\ast}` within the respective equations.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
Returns: Relative velocity in m/s.
"""
... # pragma: no cover
[docs]
@abstractmethod
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
"""Multipath sensor array response matrix from transmitter to receiver.
Includes polarization losses.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
carrier_frequency (float):
Carrier frequency of the propagated signal in Hz.
Denoted by :math:`f_{\\mathrm{c}}^{(\\alpha)}` within the respective equations.
Returns: Numpy matrix of antenna response weights.
"""
... # pragma: no cover
[docs]
def add_propagation(
self,
transmitter_state: DeviceState,
receiver_state: DeviceState,
signal: np.ndarray,
bandwidth: float,
carrier_frequency: float,
propagated_samples: np.ndarray,
propagation_delay: float | None = None,
relative_velocity: float | None = None,
) -> None:
"""Add propagation of a signal over this path realization to a given sample buffer.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
signal (numpy.ndarray):
Signal samples to be propagated.
bandwidth (float):
Sampling rate of the the propagated signal model in Hz.
carrier_frequency (float):
Central carrier frequency of the propagated signal in Hz.
propagated_samples (numpy.ndarray):
Sample buffer to be written to.
propagation_delay (float, optional):
Propagation delay of the wave from transmitter over target to receiver.
If not specified, the delay will be queried from :meth:`propagation_delay`.
relative_velocity (float, optional):
Relative velocity between transmitter and receiver.
If not specified, the velocity will be queried from :meth:`relative_velocity`.
"""
# Query the required parameters
propagation_delay = (
self.propagation_delay(transmitter_state, receiver_state)
if propagation_delay is None
else propagation_delay
)
relative_velocity = (
self.relative_velocity(transmitter_state, receiver_state)
if relative_velocity is None
else relative_velocity
)
propagation_response = self.propagation_response(
transmitter_state, receiver_state, carrier_frequency
)
delay_sample_offset = int(propagation_delay * bandwidth)
doppler_shift = relative_velocity * carrier_frequency / speed_of_light
# ToDo: Exact time of flight resampling
# echo_timestamps = propagation_delay + 2 * relative_velocity * signal.timestamps / speed_of_light
# echo_weights = np.exp(2j * pi * (doppler_shift * echo_timestamps))
echo_weights = np.exp(2j * pi * (doppler_shift / bandwidth * np.arange(signal.shape[1])))
propagated_samples[
:, delay_sample_offset : delay_sample_offset + signal.shape[1]
] += np.einsum("ij,jk,k->ik", propagation_response, signal, echo_weights)
[docs]
def add_state(
self,
transmitter: DeviceState,
receiver: DeviceState,
bandwidth: float,
carrier_frequency: float,
delay: float,
state: np.ndarray,
) -> None:
"""Add propagation of a signal over this path realization to a given channel state information sample buffer.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
bandwidth (float):
Sampling rate of the the propagated signal model in Hz.
carrier_frequency (float):
Central carrier frequency of the propagated signal in Hz.
delay (float):
Delay of the channel state information in seconds.
state (numpy.ndarray):
Sample buffer to be written to.
"""
# Query the required parameters
propagation_delay = self.propagation_delay(transmitter, receiver)
relative_velocity = self.relative_velocity(transmitter, receiver)
propagation_response = self.propagation_response(transmitter, receiver, carrier_frequency)
delay_sample_offset = int(propagation_delay * bandwidth - delay)
if delay_sample_offset < 0 or delay_sample_offset >= state.shape[3]:
return
doppler_shift = relative_velocity * carrier_frequency / speed_of_light
# echo_timestamps = delay + 2 * relative_velocity * np.arange(state.shape[2]) / speed_of_light
# echo_weights = np.exp(2j * pi * (doppler_shift * echo_timestamps))
echo_weights = np.exp(2j * pi * (doppler_shift / bandwidth * np.arange(state.shape[2])))
state[:, :, :, delay_sample_offset] += np.einsum(
"ij,k->ijk", propagation_response, echo_weights
)
def to_HDF(self, group: Group) -> None:
# Serialize the class attributes
group.attrs["attenuate"] = self.attenuate
group.attrs["static"] = self.static
@classmethod
def _parameters_from_HDF(cls: Type[RadarPath], group: Group) -> Mapping[str, Any]:
"""Deserialize the object's parameters from HDF5.
Intended to be used as a subroutine of :meth:`From_HDF`.
Returns: The object's parmeters as a keyword argument dictionary.
"""
return {"attenuate": group.attrs["attenuate"], "static": group.attrs["static"]}
[docs]
class RadarTargetPath(RadarPath):
"""Realization of a radar propagation path resulting from a target scattering"""
def __init__(
self,
position: np.ndarray,
velocity: np.ndarray,
cross_section: float,
reflection_phase: float,
attenuate: bool = True,
static: bool = False,
) -> None:
"""
Args:
position (numpy.ndarray):
Global position of the path's target.
velocity (numpy.ndarray):
Global velocity of the path's target.
cross_section (float):
Radar cross section of the path's target in :math:`\\mathrm{m}^2`.
reflection_phase (float):
Reflection phase of the path's target in radians.
attenuate (bool, optional):
Should the propagated signal be attenuated during propagation modeling?
Enabled by default.
static (bool, optional):
Is the path considered static?
Static paths will remain during null hypothesis testing.
Disabled by default.
"""
# Initialize the base class
RadarPath.__init__(self, attenuate, static)
# Initialize class attributes
self.__global_position = position
self.__global_velocity = velocity
self.__cross_section = cross_section
self.__reflection_phase = reflection_phase
@property
def position(self) -> np.ndarray:
"""Global position of the path's target.
Denoted by :math:`\\mathbf{p}^{(\\ell)}` within the respective equations.
"""
return self.__global_position
@property
def velocity(self) -> np.ndarray:
"""Global velocity of the path's target in m/s as a cartesian vector.
Denoted by :math:`\\mathbf{v}^{(\\ell)}` within the respective equations.
"""
return self.__global_velocity
@property
def cross_section(self) -> float:
"""Radar cross section of the path's target in :math:`\\mathrm{m}^2`.
Denoted by :math:`\\sigma_{\\ell}` within the respective equations.
"""
return self.__cross_section
@property
def reflection_phase(self) -> float:
"""Reflection phase of the path's target in radians.
Represented by :math:`\\phi_{\\mathrm{Target}}^{(\\ell)}` within the respective equations.
"""
return self.__reflection_phase
@property
def ground_truth(self) -> Tuple[np.ndarray, np.ndarray]:
return self.position, self.velocity
[docs]
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
emerging_vector = self.position - transmitter.position
impinging_vector = receiver.position - self.position
delay = (
np.linalg.norm(emerging_vector) + np.linalg.norm(impinging_vector)
) / speed_of_light
return delay
[docs]
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
target_position = self.position
emerging_vector = target_position - transmitter.position
impinging_vector = receiver.position - target_position
# Model the doppler-shift from transmitter to receiver
target_velocity = self.velocity
relative_transmitter_velocity = np.dot(
Direction.From_Cartesian(emerging_vector, normalize=True),
target_velocity - transmitter.velocity,
)
relative_receiver_velocity = np.dot(
Direction.From_Cartesian(impinging_vector, normalize=True),
receiver.velocity - target_velocity,
)
return relative_transmitter_velocity + relative_receiver_velocity
[docs]
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
# Query the sensor array responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, self.position, "global", AntennaMode.RX
)
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, self.position, "global", AntennaMode.TX
)
if self.attenuate:
# Compute propagation distances
tx_distance = np.linalg.norm(self.position - transmitter.position)
rx_distance = np.linalg.norm(receiver.position - self.position)
wavelength = speed_of_light / carrier_frequency
amplitude_factor = (
wavelength * self.cross_section**0.5 / ((4 * pi) ** 1.5 * tx_distance * rx_distance)
)
else:
amplitude_factor = 1.0
# Compute the MIMO response
return (
amplitude_factor
* np.exp(1j * self.reflection_phase)
* np.inner(rx_response, tx_response)
)
def to_HDF(self, group: Group) -> None:
# Serialize base class
RadarPath.to_HDF(self, group)
# Serialize class attributes
self._write_dataset(group, "position", self.position)
self._write_dataset(group, "velocity", self.velocity)
group.attrs["cross_section"] = self.cross_section
group.attrs["reflection_phase"] = self.reflection_phase
@classmethod
def from_HDF(cls: Type[RadarTargetPath], group: Group) -> RadarTargetPath:
# Deserialize base class
parameters = RadarPath._parameters_from_HDF(group)
# Deserialize class attributes
position = np.array(group["position"], dtype=np.float64)
velocity = np.array(group["velocity"], dtype=np.float64)
cross_section = group.attrs["cross_section"]
reflection_phase = group.attrs["reflection_phase"]
return RadarTargetPath(position, velocity, cross_section, reflection_phase, **parameters)
[docs]
class RadarInterferencePath(RadarPath):
"""Realization of a line of sight interference propgation path between a radar transmitter and receiver"""
@property
def ground_truth(self) -> None:
# Always None because the LOS interference does not represent a real target
return None # pragma: no cover
[docs]
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
delay = np.linalg.norm(receiver.position - transmitter.position) / speed_of_light
return delay
[docs]
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
connection = Direction.From_Cartesian(
receiver.position - transmitter.position, normalize=True
).view(np.ndarray)
return np.dot(transmitter.velocity - receiver.velocity, connection)
[docs]
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
# Model the sensor arrays' spatial responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, transmitter.position, "global"
)
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, receiver.position, "global"
)
if self.attenuate:
# Compute propagation distance
distance = np.linalg.norm(receiver.position - transmitter.position)
wavelength = speed_of_light / carrier_frequency
amplitude_factor = wavelength / (4 * pi * distance)
else:
amplitude_factor = 1.0
# Compute the MIMO response
return amplitude_factor * np.inner(rx_response, tx_response)
@classmethod
def from_HDF(cls: Type[RadarInterferencePath], group: Group) -> RadarInterferencePath:
# Deserialize base class
parameters = RadarPath._parameters_from_HDF(group)
return RadarInterferencePath(**parameters)
[docs]
class RadarChannelBase(Generic[RCRT], Channel[RCRT, RadarChannelSample]):
"""Base class of all radar channel implementations."""
__attenuate: bool # Should signals be attenuated during propagation modeling?
def __init__(self, attenuate: bool = True, *args, **kwargs) -> None:
"""
Args:
attenuate (bool, optional):
Radar channel attenuation flag, see also :meth:`RadarChannelBase.attenuate`.
Enabled by default.
"""
# Initialize base class
Channel.__init__(self, *args, **kwargs)
# Initialize class attributes
self.__attenuate = attenuate
@property
def attenuate(self) -> bool:
"""Radar channel attenuation flag.
If enabled, losses such as free-space propagation and radar cross sections will be considered.
"""
return self.__attenuate
@attenuate.setter
def attenuate(self, value: bool) -> None:
self.__attenuate = value