# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from math import ceil
from typing import Any, Generic, Mapping, Set, Sequence, Tuple, Type, TYPE_CHECKING, TypeVar
import numpy as np
from h5py import Group
from scipy.constants import pi, speed_of_light
from sparse import GCXS # type: ignore
from hermespy.core import (
ChannelStateInformation,
ChannelStateFormat,
Direction,
HDFSerializable,
SignalBlock,
)
from ..channel import (
Channel,
ChannelSample,
LinkState,
ChannelSampleHook,
ChannelRealization,
InterpolationMode,
)
if TYPE_CHECKING:
from hermespy.simulation import DeviceState # pragma: no cover
__author__ = "Andre Noll Barreto"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Andre Noll Barreto", "Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
RCST = TypeVar("RCST", bound="RadarChannelSample")
"""Type of radar channel sample."""
[docs]
class RadarChannelSample(ChannelSample):
"""Realization of a radar channel.
Generated by :meth:`RadarChannelBase.realize` and :meth:`RadarChannelBase.realize_interference`.
"""
__paths: Sequence[RadarPath]
__gain: float
def __init__(self, paths: Sequence[RadarPath], gain: float, state: LinkState) -> None:
"""
Args:
paths (Sequence[RadarPath]):
Sequence of realized radar propagation paths.
gain (float):
Channel gain in linear scale.
state (ChannelState):
State of the channel at the time of sampling.
"""
# Initialize base class
ChannelSample.__init__(self, state)
# Initialize class attributes
self.__paths = paths
self.__gain = gain
@property
def paths(self) -> Sequence[RadarPath]:
"""Sequence of realized radar propagation paths."""
return self.__paths
@property
def gain(self) -> float:
"""Channel gain in linear scale."""
return self.__gain
@property
def expected_energy_scale(self) -> float:
return self.__gain # ToDo: Consider the energy scale of the paths
def _propagate(self, signal: SignalBlock, interpolation: InterpolationMode) -> SignalBlock:
delays = np.array(
[
path.propagation_delay(self.transmitter_state, self.receiver_state)
for path in self.paths
]
)
velocities = np.array(
[
path.relative_velocity(self.transmitter_state, self.receiver_state)
for path in self.paths
]
)
# Compute the expected sample overhead of the propagated sample resultin from propagtion delays
if delays.size > 0:
max_delay_in_samples = ceil(
delays.max() * self.bandwidth
+ 2 * velocities.max() * signal.num_samples / (self.bandwidth * speed_of_light)
)
else:
max_delay_in_samples = 0
propagated_samples = np.zeros(
(self.num_receive_antennas, signal.num_samples + max_delay_in_samples),
dtype=np.complex_,
)
# Compute the signal propagated along each respective path realization
for path, delay, velocity in zip(self.paths, delays, velocities):
path.add_propagation(
self.transmitter_state,
self.receiver_state,
signal,
self.bandwidth,
self.carrier_frequency,
propagated_samples,
delay,
velocity,
)
# Apply the channel gain
propagated_samples *= self.gain**0.5
return SignalBlock(propagated_samples, signal._offset)
[docs]
def state(
self,
num_samples: int,
max_num_taps: int,
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST,
) -> ChannelStateInformation:
raw_state = np.zeros(
(
self.receiver_state.antennas.num_receive_antennas,
self.transmitter_state.antennas.num_transmit_antennas,
num_samples,
max_num_taps,
),
dtype=np.complex_,
)
for path in self.paths:
path.add_state(
self.transmitter_state,
self.receiver_state,
self.bandwidth,
self.carrier_frequency,
0.0,
raw_state,
)
# Apply the channel gain
raw_state *= self.gain**0.5
return ChannelStateInformation(
ChannelStateFormat.IMPULSE_RESPONSE, GCXS.from_numpy(raw_state)
)
[docs]
def null_hypothesis(self) -> RadarChannelSample:
"""Generate a null hypothesis channel sample rom a given channel sample.
Null hypothesis sample will remove non-static propagation components from the channel model.
This function is, for example, accessed to evaluate a radar link's receiver operating characteristics.
Returns: The null hypothesis radar channel realization.
"""
# Remove non-static paths
static_paths = [path for path in self.paths if path.static]
return RadarChannelSample(
static_paths,
self.gain,
LinkState(
self.transmitter_state,
self.receiver_state,
self.carrier_frequency,
self.bandwidth,
0.0,
),
)
[docs]
class RadarChannelRealization(ChannelRealization[RadarChannelSample]):
"""Realization of a radar channel."""
def __init__(
self, sample_hooks: Set[ChannelSampleHook[RadarChannelSample]], gain: float
) -> None:
"""_summary_
Args:
sample_hooks (Set[ChannelSampleHook[CST]], optional):
Hooks to be called after the channel is sampled.
gain (float):
Linear power gain factor a signal experiences when being propagated over this realization.
"""
# Initialize base class
ChannelRealization.__init__(self, sample_hooks, gain)
@abstractmethod
def _generate_paths(self, state: LinkState) -> Sequence[RadarPath]:
"""Generate the sequence of realized radar propagation paths.
Subroutine of :meth:`RadarChannelRealization._sample`.
Args:
state (ChannelState):
State of the channel at the time of sampling.
Returns: Sequence of realized radar propagation paths.
"""
... # pragma: no cover
def _sample(self, state: LinkState) -> RadarChannelSample:
return RadarChannelSample(self._generate_paths(state), self.gain, state)
def _reciprocal_sample(
self, sample: RadarChannelSample, state: LinkState
) -> RadarChannelSample:
return RadarChannelSample(sample.paths, sample.gain, state)
RCRT = TypeVar("RCRT", bound=RadarChannelRealization)
"""Type of radar channel realization."""
[docs]
class RadarPath(HDFSerializable):
"""Realization of a radar propagation path between transmitter and receiver"""
__attenuate: bool
__static: bool
def __init__(self, attenuate: bool = True, static: bool = False) -> None:
"""
Args:
attenuate (bool, optional):
Should the propagated signal be attenuated during propagation modeling?
Enabled by default.
static (bool, optional):
Is the path considered static?
Static paths will remain during null hypothesis testing.
Disabled by default.
"""
# Initialize class attributes
self.__attenuate = attenuate
self.__static = static
@property
def attenuate(self) -> bool:
"""Should a propagated signal be attenuated during propagation modeling?"""
return self.__attenuate
@attenuate.setter
def attenuate(self, value: bool) -> None:
self.__attenuate = value
@property
def static(self) -> bool:
"""Is the path considered static?"""
return self.__static
@static.setter
def static(self, value: bool) -> None:
self.__static = value
@property
@abstractmethod
def ground_truth(self) -> Tuple[np.ndarray, np.ndarray] | None:
"""Consolidate the true target information represented by this path.
Either a tuple of the target's position and velocity or `None` if the path
represents only interference / clutter.
"""
... # pragma: no cover
[docs]
@abstractmethod
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
"""Propagation delay of the wave from transmitter over target to receiver.
Denoted by :math:`\\tau_{\\ast}` within the respective equations.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (Device):
Receiving device.
Returns: Propagation delay in seconds.
"""
... # pragma: no cover
[docs]
@abstractmethod
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
"""Relative velocity between transmitter and receiver.
Denoted by :math:`v_{\\ast}` within the respective equations.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
Returns: Relative velocity in m/s.
"""
... # pragma: no cover
[docs]
@abstractmethod
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
"""Multipath sensor array response matrix from transmitter to receiver.
Includes polarization losses.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
carrier_frequency (float):
Carrier frequency of the propagated signal in Hz.
Denoted by :math:`f_{\\mathrm{c}}^{(\\alpha)}` within the respective equations.
Returns: Numpy matrix of antenna response weights.
"""
... # pragma: no cover
[docs]
def add_propagation(
self,
transmitter_state: DeviceState,
receiver_state: DeviceState,
signal: np.ndarray,
bandwidth: float,
carrier_frequency: float,
propagated_samples: np.ndarray,
propagation_delay: float | None = None,
relative_velocity: float | None = None,
) -> None:
"""Add propagation of a signal over this path realization to a given sample buffer.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
signal (np.ndarray):
Signal samples to be propagated.
bandwidth (float):
Sampling rate of the the propagated signal model in Hz.
carrier_frequency (float):
Central carrier frequency of the propagated signal in Hz.
propagated_samples (np.ndarray):
Sample buffer to be written to.
propagation_delay (float, optional):
Propagation delay of the wave from transmitter over target to receiver.
If not specified, the delay will be queried from :meth:`propagation_delay`.
relative_velocity (float, optional):
Relative velocity between transmitter and receiver.
If not specified, the velocity will be queried from :meth:`relative_velocity`.
"""
# Query the required parameters
propagation_delay = (
self.propagation_delay(transmitter_state, receiver_state)
if propagation_delay is None
else propagation_delay
)
relative_velocity = (
self.relative_velocity(transmitter_state, receiver_state)
if relative_velocity is None
else relative_velocity
)
propagation_response = self.propagation_response(
transmitter_state, receiver_state, carrier_frequency
)
delay_sample_offset = int(propagation_delay * bandwidth)
doppler_shift = relative_velocity * carrier_frequency / speed_of_light
# ToDo: Exact time of flight resampling
# echo_timestamps = propagation_delay + 2 * relative_velocity * signal.timestamps / speed_of_light
# echo_weights = np.exp(2j * pi * (doppler_shift * echo_timestamps))
echo_weights = np.exp(2j * pi * (doppler_shift / bandwidth * np.arange(signal.shape[1])))
propagated_samples[
:, delay_sample_offset : delay_sample_offset + signal.shape[1]
] += np.einsum("ij,jk,k->ik", propagation_response, signal, echo_weights)
[docs]
def add_state(
self,
transmitter: DeviceState,
receiver: DeviceState,
bandwidth: float,
carrier_frequency: float,
delay: float,
state: np.ndarray,
) -> None:
"""Add propagation of a signal over this path realization to a given channel state information sample buffer.
Args:
transmitter (DeviceState):
Transmitting device.
receiver (DeviceState):
Receiving device.
bandwidth (float):
Sampling rate of the the propagated signal model in Hz.
carrier_frequency (float):
Central carrier frequency of the propagated signal in Hz.
delay (float):
Delay of the channel state information in seconds.
state (np.ndarray):
Sample buffer to be written to.
"""
# Query the required parameters
propagation_delay = self.propagation_delay(transmitter, receiver)
relative_velocity = self.relative_velocity(transmitter, receiver)
propagation_response = self.propagation_response(transmitter, receiver, carrier_frequency)
delay_sample_offset = int(propagation_delay * bandwidth - delay)
if delay_sample_offset < 0 or delay_sample_offset >= state.shape[3]:
return
doppler_shift = relative_velocity * carrier_frequency / speed_of_light
# echo_timestamps = delay + 2 * relative_velocity * np.arange(state.shape[2]) / speed_of_light
# echo_weights = np.exp(2j * pi * (doppler_shift * echo_timestamps))
echo_weights = np.exp(2j * pi * (doppler_shift / bandwidth * np.arange(state.shape[2])))
state[:, :, :, delay_sample_offset] += np.einsum(
"ij,k->ijk", propagation_response, echo_weights
)
def to_HDF(self, group: Group) -> None:
# Serialize the class attributes
group.attrs["attenuate"] = self.attenuate
group.attrs["static"] = self.static
@classmethod
def _parameters_from_HDF(cls: Type[RadarPath], group: Group) -> Mapping[str, Any]:
"""Deserialize the object's parameters from HDF5.
Intended to be used as a subroutine of :meth:`From_HDF`.
Returns: The object's parmeters as a keyword argument dictionary.
"""
return {"attenuate": group.attrs["attenuate"], "static": group.attrs["static"]}
[docs]
class RadarTargetPath(RadarPath):
"""Realization of a radar propagation path resulting from a target scattering"""
def __init__(
self,
position: np.ndarray,
velocity: np.ndarray,
cross_section: float,
reflection_phase: float,
attenuate: bool = True,
static: bool = False,
) -> None:
"""
Args:
position (np.ndarray):
Global position of the path's target.
velocity (np.ndarray):
Global velocity of the path's target.
cross_section (float):
Radar cross section of the path's target in :math:`\\mathrm{m}^2`.
reflection_phase (float):
Reflection phase of the path's target in radians.
attenuate (bool, optional):
Should the propagated signal be attenuated during propagation modeling?
Enabled by default.
static (bool, optional):
Is the path considered static?
Static paths will remain during null hypothesis testing.
Disabled by default.
"""
# Initialize the base class
RadarPath.__init__(self, attenuate, static)
# Initialize class attributes
self.__global_position = position
self.__global_velocity = velocity
self.__cross_section = cross_section
self.__reflection_phase = reflection_phase
@property
def position(self) -> np.ndarray:
"""Global position of the path's target.
Denoted by :math:`\\mathbf{p}^{(\\ell)}` within the respective equations.
"""
return self.__global_position
@property
def velocity(self) -> np.ndarray:
"""Global velocity of the path's target in m/s as a cartesian vector.
Denoted by :math:`\\mathbf{v}^{(\\ell)}` within the respective equations.
"""
return self.__global_velocity
@property
def cross_section(self) -> float:
"""Radar cross section of the path's target in :math:`\\mathrm{m}^2`.
Denoted by :math:`\\sigma_{\\ell}` within the respective equations.
"""
return self.__cross_section
@property
def reflection_phase(self) -> float:
"""Reflection phase of the path's target in radians.
Represented by :math:`\\phi_{\\mathrm{Target}}^{(\\ell)}` within the respective equations.
"""
return self.__reflection_phase
@property
def ground_truth(self) -> Tuple[np.ndarray, np.ndarray]:
return self.position, self.velocity
[docs]
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
emerging_vector = self.position - transmitter.position
impinging_vector = receiver.position - self.position
delay = (
np.linalg.norm(emerging_vector) + np.linalg.norm(impinging_vector)
) / speed_of_light
return delay
[docs]
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
target_position = self.position
emerging_vector = target_position - transmitter.position
impinging_vector = receiver.position - target_position
# Model the doppler-shift from transmitter to receiver
target_velocity = self.velocity
relative_transmitter_velocity = np.dot(
Direction.From_Cartesian(emerging_vector, normalize=True),
target_velocity - transmitter.velocity,
)
relative_receiver_velocity = np.dot(
Direction.From_Cartesian(impinging_vector, normalize=True),
receiver.velocity - target_velocity,
)
return relative_transmitter_velocity + relative_receiver_velocity
[docs]
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
# Query the sensor array responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
)
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
).conj()
if self.attenuate:
# Compute propagation distances
tx_distance = np.linalg.norm(self.position - transmitter.position)
rx_distance = np.linalg.norm(receiver.position - self.position)
wavelength = speed_of_light / carrier_frequency
amplitude_factor = (
wavelength * self.cross_section**0.5 / ((4 * pi) ** 1.5 * tx_distance * rx_distance)
)
else:
amplitude_factor = 1.0
# Compute the MIMO response
return (
amplitude_factor
* np.exp(1j * self.reflection_phase)
* np.inner(rx_response, tx_response)
)
def to_HDF(self, group: Group) -> None:
# Serialize base class
RadarPath.to_HDF(self, group)
# Serialize class attributes
self._write_dataset(group, "position", self.position)
self._write_dataset(group, "velocity", self.velocity)
group.attrs["cross_section"] = self.cross_section
group.attrs["reflection_phase"] = self.reflection_phase
@classmethod
def from_HDF(cls: Type[RadarTargetPath], group: Group) -> RadarTargetPath:
# Deserialize base class
parameters = RadarPath._parameters_from_HDF(group)
# Deserialize class attributes
position = np.array(group["position"], dtype=np.float_)
velocity = np.array(group["velocity"], dtype=np.float_)
cross_section = group.attrs["cross_section"]
reflection_phase = group.attrs["reflection_phase"]
return RadarTargetPath(position, velocity, cross_section, reflection_phase, **parameters)
[docs]
class RadarInterferencePath(RadarPath):
"""Realization of a line of sight interference propgation path between a radar transmitter and receiver"""
@property
def ground_truth(self) -> None:
# Always None because the LOS interference does not represent a real target
return None # pragma: no cover
[docs]
def propagation_delay(self, transmitter: DeviceState, receiver: DeviceState) -> float:
delay = np.linalg.norm(receiver.position - transmitter.position) / speed_of_light
return delay
[docs]
def relative_velocity(self, transmitter: DeviceState, receiver: DeviceState) -> float:
connection = Direction.From_Cartesian(
receiver.position - transmitter.position, normalize=True
).view(np.ndarray)
return np.dot(transmitter.velocity - receiver.velocity, connection)
[docs]
def propagation_response(
self, transmitter: DeviceState, receiver: DeviceState, carrier_frequency: float
) -> np.ndarray:
# Model the sensor arrays' spatial responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, transmitter.position, "global"
).conj()
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, receiver.position, "global"
)
if self.attenuate:
# Compute propagation distance
distance = np.linalg.norm(receiver.position - transmitter.position)
wavelength = speed_of_light / carrier_frequency
amplitude_factor = wavelength / (4 * pi * distance)
else:
amplitude_factor = 1.0
# Compute the MIMO response
return amplitude_factor * np.inner(rx_response, tx_response)
@classmethod
def from_HDF(cls: Type[RadarInterferencePath], group: Group) -> RadarInterferencePath:
# Deserialize base class
parameters = RadarPath._parameters_from_HDF(group)
return RadarInterferencePath(**parameters)
[docs]
class RadarChannelBase(Generic[RCRT], Channel[RCRT, RadarChannelSample]):
"""Base class of all radar channel implementations."""
__attenuate: bool # Should signals be attenuated during propagation modeling?
def __init__(self, attenuate: bool = True, *args, **kwargs) -> None:
"""
Args:
attenuate (bool, optional):
Radar channel attenuation flag, see also :meth:`RadarChannelBase.attenuate`.
Enabled by default.
"""
# Initialize base class
Channel.__init__(self, *args, **kwargs)
# Initialize class attributes
self.__attenuate = attenuate
@property
def attenuate(self) -> bool:
"""Radar channel attenuation flag.
If enabled, losses such as free-space propagation and radar cross sections will be considered.
"""
return self.__attenuate
@attenuate.setter
def attenuate(self, value: bool) -> None:
self.__attenuate = value