# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from typing import List, Set, Sequence
from typing_extensions import override
import numpy as np
from hermespy.core import DeserializationProcess, Direction, Serializable, SerializationProcess
from hermespy.simulation.animation import Moveable, Trajectory, TrajectorySample
from ..channel import ChannelSampleHook, LinkState
from ..consistent import ConsistentGenerator, ConsistentRealization, ConsistentUniform
from .radar import (
RadarChannelBase,
RadarChannelRealization,
RadarChannelSample,
RadarInterferencePath,
RadarTargetPath,
RadarPath,
)
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class RadarTarget(Serializable):
"""Abstract base class of radar targets.
Radar targets represent reflectors of electromagnetic waves within :class:`RadarChannelBase<hermespy.channel.radar.radar.RadarChannelBase>` instances.
"""
_DEFAULT_STATIC = False
__static: bool
def __init__(self, static: bool = _DEFAULT_STATIC) -> None:
"""
Args:
static:
Is the target visible during null hypothesis testing?
Disabled by default.
"""
self.__static = static
[docs]
@abstractmethod
def sample_cross_section(
self, impinging_direction: Direction, emerging_direction: Direction
) -> float:
"""Query the target's radar cross section.
The target radr cross section is denoted by the vector :math:`\\sigma_{\\ell}`
within the respective equations.
Args:
impinging_direction:
Direction from which a far-field source impinges onto the target model.
emerging_direction:
Direction in which the scatter wave leaves the target model.
Returns: The assumed radar cross section in :math:`m^2`.
"""
... # pragma: no cover
[docs]
@abstractmethod
def sample_trajectory(self, timestamp: float) -> TrajectorySample:
"""Sample the target's trajectory at a given time.
Args:
timestamp: Time at which to sample the trajectory in seconds.
Returns: A sample of the trajectory.
"""
... # pragma: no cover
@property
def static(self) -> bool:
"""Is the target visible in the null hypothesis?"""
return self.__static
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_integer(self.static, "static")
@classmethod
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
return {"static": bool(process.deserialize_integer("static", cls._DEFAULT_STATIC))}
[docs]
class RadarCrossSectionModel(Serializable):
"""Base class for spatial radar cross section models."""
[docs]
@abstractmethod
def get_cross_section(
self, impinging_direction: Direction, emerging_direction: Direction
) -> float:
"""Query the model's cross section.
Args:
impinging_direction:
Direction from which a far-field source impinges onto the cross section model.
emerging_direction:
Direction in which the scatter wave leaves the cross section model.
Returns: The assumed cross section in :math:`m^2`.
"""
... # pragma: no cover
[docs]
class FixedCrossSection(RadarCrossSectionModel):
"""Model of a fixed cross section.
Can be interpreted as a spherical target floating in space.
"""
__cross_section: float
def __init__(self, cross_section: float) -> None:
"""
Args:
cross_section:
The cross section in :math:`\\mathrm{m}^2`.
"""
self.cross_section = cross_section
@property
def cross_section(self) -> float:
"""The assumed cross section.
Returns: The cross section in :math:`\\mathrm{m}^2`.
Raises:
ValueError: For cross sections smaller than zero.
"""
return self.__cross_section
@cross_section.setter
def cross_section(self, value: float) -> None:
if value < 0.0:
raise ValueError("Radar cross section must be greater or equal to zero")
self.__cross_section = value
[docs]
@override
def get_cross_section(self, _: Direction, __: Direction) -> float:
return self.__cross_section
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_floating(self.cross_section, "cross_section")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> FixedCrossSection:
return cls(process.deserialize_floating("cross_section"))
[docs]
class VirtualRadarTarget(Moveable, RadarTarget):
"""Model of a spatial radar target only existing within a channe link."""
__cross_section: RadarCrossSectionModel
def __init__(
self,
cross_section: RadarCrossSectionModel,
trajectory: Trajectory | None = None,
static: bool = RadarTarget._DEFAULT_STATIC,
) -> None:
"""
Args:
cross_section:
The assumed cross section model.
trajectory:
The assumed trajectory of the target.
By default, the target is assumed to be static.
static:
See :meth:`RadarTarget.static`.
Disabled by default.
"""
# Initialize base classes
Moveable.__init__(self, trajectory)
RadarTarget.__init__(self, static)
# Initialize class attributes
self.cross_section = cross_section
@property
def cross_section(self) -> RadarCrossSectionModel:
"""The represented radar cross section model."""
return self.__cross_section
@cross_section.setter
def cross_section(self, value: RadarCrossSectionModel) -> None:
self.__cross_section = value
[docs]
@override
def sample_cross_section(
self, impinging_direction: Direction, emerging_direction: Direction
) -> float:
return self.cross_section.get_cross_section(impinging_direction, emerging_direction)
[docs]
@override
def sample_trajectory(self, timestamp: float) -> TrajectorySample:
return self.trajectory.sample(timestamp)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
RadarTarget.serialize(self, process)
process.serialize_object(self.cross_section, "cross_section")
process.serialize_object(self.trajectory, "trajectory")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> VirtualRadarTarget:
return cls(
process.deserialize_object("cross_section", RadarCrossSectionModel),
process.deserialize_object("trajectory", Trajectory),
**RadarTarget._DeserializeParameters(process), # type: ignore[arg-type]
)
[docs]
class PhysicalRadarTarget(RadarTarget, Serializable):
"""Model of a spatial radar target representing a moveable object.
The radar target will always be modeled at its moveable global position.
"""
__cross_section: RadarCrossSectionModel
__moveable: Moveable
def __init__(
self,
cross_section: RadarCrossSectionModel,
moveable: Moveable,
static: bool = RadarTarget._DEFAULT_STATIC,
) -> None:
"""
Args:
cross_section:
The assumed cross section model.
moveable:
The moveable object this radar target represents.
static:
See :meth:`RadarTarget.static`.
Disabled by default.
"""
# Initialize base classes
RadarTarget.__init__(self, static)
# Initialize properties
self.cross_section = cross_section
self.__moveable = moveable
@property
def cross_section(self) -> RadarCrossSectionModel:
"""The represented radar cross section model."""
return self.__cross_section
@cross_section.setter
def cross_section(self, value: RadarCrossSectionModel) -> None:
self.__cross_section = value
@property
def moveable(self) -> Moveable:
"""Moveble this radar model is attached to.
Returns: Handle to the moveable object.
"""
return self.__moveable
[docs]
@override
def sample_cross_section(
self, impinging_direction: Direction, emerging_direction: Direction
) -> float:
return self.cross_section.get_cross_section(impinging_direction, emerging_direction)
[docs]
@override
def sample_trajectory(self, timestamp: float) -> TrajectorySample:
return self.moveable.trajectory.sample(timestamp)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
RadarTarget.serialize(self, process)
process.serialize_object(self.cross_section, "cross_section")
process.serialize_object(self.moveable, "moveable")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> PhysicalRadarTarget:
return cls(
process.deserialize_object("cross_section", RadarCrossSectionModel),
process.deserialize_object("moveable", Moveable),
**RadarTarget._DeserializeParameters(process), # type: ignore[arg-type]
)
[docs]
class MultiTargetRadarChannelRealization(RadarChannelRealization):
"""Realization of a spatial multi target radar channel.
Generated by the :meth:`_realize<MultiTargetRadarChannel._realize>` method of :class:`MultiTargetRadarChannel`.
"""
__consistent_realization: ConsistentRealization
__phase_variable: ConsistentUniform
__targets: Set[RadarTarget]
__interference: bool
__attenuate: bool
def __init__(
self,
consistent_realization: ConsistentRealization,
phase_variable: ConsistentUniform,
targets: Set[RadarTarget],
interference: bool,
attenuate: bool,
sample_hooks: Set[ChannelSampleHook[RadarChannelSample]],
gain: float,
) -> None:
# Initialize base classes
RadarChannelRealization.__init__(self, sample_hooks, gain)
# Initialize class attributes
self.__consistent_realization = consistent_realization
self.__phase_variable = phase_variable
self.__targets = targets
self.__interference = interference
self.__attenuate = attenuate
def __sample_target(self, target: RadarTarget, state: LinkState) -> RadarTargetPath:
"""Realize a single radar target's channel propagation path.
Args:
target:
The radar target to be realized.
state:
The current channel state.
Returns: The realized propagation path.
Raises:
ValueError: If `carrier_frequency` is smaller or equal to zero.
FloatingError: If transmitter or receiver are not specified.
RuntimeError: If `target` and the channel's linked devices are located at identical global positions
"""
# Query target global coordiante system transformations
trajectory_sample = target.sample_trajectory(state.time)
target_backwards_transform = trajectory_sample.pose
target_forwards_transform = trajectory_sample.pose.invert()
# Make sure the transmitter / receiver positions don't collide with target locations
# This implicitly violates the far-field assumption and leads to numeric instabilities
if np.array_equal(target_forwards_transform.translation, state.transmitter.position):
raise RuntimeError(
"Radar channel transmitter position colliding with an assumed target location"
)
if np.array_equal(target_forwards_transform.translation, state.receiver.position):
raise RuntimeError(
"Radar channel receiver position colliding with an assumed target location"
)
# Compute the impinging and emerging far-field wave direction from the target in local target coordinates
target_impinging_direction = target_backwards_transform.transform_direction(
target_forwards_transform.translation - state.transmitter.position, normalize=True
)
target_emerging_direction = target_backwards_transform.transform_direction(
state.receiver.position - target_forwards_transform.translation, normalize=True
)
# Query the radar cross section from the target's model given impinging and emerging directions
cross_section = target.sample_cross_section(
target_impinging_direction, target_emerging_direction
)
# Query reflection phase shift
consistent_sample = self.__consistent_realization.sample(
target_forwards_transform.translation, state.receiver.position
)
reflection_phase = float(2 * np.pi * self.__phase_variable.sample(consistent_sample))
# Return realized information wrapped in a target realization dataclass
return RadarTargetPath(
trajectory_sample.pose.translation,
trajectory_sample.velocity,
cross_section,
reflection_phase,
self.__attenuate,
target.static,
)
@override
def _generate_paths(self, state: LinkState) -> Sequence[RadarPath]:
paths: List[RadarPath] = [self.__sample_target(target, state) for target in self.__targets]
if self.__interference and np.any(state.transmitter.position != state.receiver.position):
paths.append(RadarInterferencePath(self.__attenuate, True))
return paths
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
RadarChannelRealization.serialize(self, process)
process.serialize_object(self.__consistent_realization, "consistent_realization")
process.serialize_object(self.__phase_variable, "phase_variable")
process.serialize_object_sequence(list(self.__targets), "targets")
process.serialize_integer(self.__interference, "interference")
process.serialize_integer(self.__attenuate, "attenuate")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> MultiTargetRadarChannelRealization:
return cls(
process.deserialize_object("consistent_realization", ConsistentRealization),
process.deserialize_object("phase_variable", ConsistentUniform),
set(process.deserialize_object_sequence("targets", RadarTarget)),
bool(process.deserialize_integer("interference")),
bool(process.deserialize_integer("attenuate")),
set(),
**RadarChannelRealization._DeserializeParameters(process), # type: ignore[arg-type]
)
[docs]
class MultiTargetRadarChannel(RadarChannelBase[MultiTargetRadarChannelRealization], Serializable):
"""Model of a spatial radar channel featuring multiple reflecting targets."""
interference: bool
"""Consider interference between linked devices.
Only applies in the bistatic case, where transmitter and receiver are two dedicated device instances.
"""
__targets: Set[RadarTarget]
def __init__(
self,
interference: bool = True,
decorrelation_distance: float = float("inf"),
attenuate: bool = RadarChannelBase._DEFAULT_ATTENUATE,
gain: float = RadarChannelBase._DEFAULT_GAIN,
seed: int | None = None,
) -> None:
"""
Args:
interference:
Should the channel model consider interference between the linked devices?
Enabled by default.
decorrelation_distance:
Distance at which the channel's random variable realizations are considered uncorrelated.
:math:`\\infty` by default, meaning the channel is static in space.
attenuate:
Should the propagated signal be attenuated during propagation modeling?
Enabled by default.
gain:
Linear power gain factor a signal experiences when being propagated over this realization.
:math:`1.0` by default.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
RadarChannelBase.__init__(self, attenuate, gain, seed)
# Initialize attributes
self.interference = interference
self.decorrelation_distance = decorrelation_distance
self.__targets = set()
self.__consistent_generator = ConsistentGenerator(self)
self.__phase_variable = self.__consistent_generator.uniform()
@property
def decorrelation_distance(self) -> float:
"""Decorrelation distance of the radar channel.
Raises:
ValueError: For decorrelation distances smaller than zero.
"""
return self.__decorrelation_distance
@decorrelation_distance.setter
def decorrelation_distance(self, value: float) -> None:
if value < 0.0:
raise ValueError("Decorrelation distance must be greater or equal to zero")
self.__decorrelation_distance = value
@property
def targets(self) -> Set[RadarTarget]:
"""Set of targets considered within the radar channel."""
return self.__targets
[docs]
def add_target(self, target: RadarTarget) -> None:
"""Add a new target to the radar channel.
Args:
target:
Target to be added.
"""
if target not in self.targets:
self.__targets.add(target)
[docs]
def make_target(
self, moveable: Moveable, cross_section: RadarCrossSectionModel, *args, **kwargs
) -> PhysicalRadarTarget:
"""Declare a moveable to be a target within the radar channel.
Args:
moveable:
Moveable to be declared as a target.
cross_section:
Radar cross section model of the target.
*args:
Additional positional arguments passed to the target's constructor.
**kwargs:
Additional keyword arguments passed to the target's constructor.
Returns:
PhysicalRadarTarget: The newly created target.
"""
target = PhysicalRadarTarget(cross_section, moveable, *args, **kwargs)
self.add_target(target)
return target
[docs]
def _realize(self) -> MultiTargetRadarChannelRealization:
return MultiTargetRadarChannelRealization(
self.__consistent_generator.realize(self.decorrelation_distance),
self.__phase_variable,
self.targets,
self.interference,
self.attenuate,
self.sample_hooks,
self.gain,
)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
RadarChannelBase.serialize(self, process)
process.serialize_integer(self.interference, "interference")
process.serialize_floating(self.decorrelation_distance, "decorrelation_distance")
process.serialize_object_sequence(list(self.targets), "targets")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> MultiTargetRadarChannel:
instance = cls(
interference=bool(process.deserialize_integer("interference", 1)),
decorrelation_distance=process.deserialize_floating("decorrelation_distance"),
**RadarChannelBase._DeserializeParameters(process), # type: ignore[arg-type]
)
targets = process.deserialize_object_sequence("targets", RadarTarget)
for target in targets:
instance.add_target(target)
return instance