# -*- coding: utf-8 -*-
"""
===================
Matched Filter JCAS
===================
"""
from __future__ import annotations
from typing import Optional, Type
import numpy as np
from h5py import Group
from scipy.constants import speed_of_light
from scipy.signal import correlate, correlation_lags
from hermespy.core import Device, Receiver, SNRType, Signal, Serializable, Transmitter
from hermespy.modem import DuplexModem, CommunicationTransmission, CommunicationReception
from hermespy.radar import Radar, RadarTransmission, RadarReception, RadarCube
__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "Jan Adler"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class JCASTransmission(CommunicationTransmission, RadarTransmission):
"""Information generated by transmitting over a joint communication and sensing operator."""
def __init__(self, transmission: CommunicationTransmission) -> None:
CommunicationTransmission.__init__(
self, signal=transmission.signal, frames=transmission.frames
)
RadarTransmission.__init__(self, signal=transmission.signal)
@classmethod
def from_HDF(cls: Type[JCASTransmission], group: Group) -> JCASTransmission:
return JCASTransmission(CommunicationTransmission.from_HDF(group))
[docs]
class JCASReception(CommunicationReception, RadarReception):
"""Information generated by receiving over a joint communication and sensing operator."""
def __init__(self, communication: CommunicationReception, radar: RadarReception) -> None:
CommunicationReception.__init__(
self, signal=communication.signal, frames=communication.frames
)
RadarReception.__init__(self, radar.signal, radar.cube, radar.cloud)
@classmethod
def from_HDF(cls: Type[JCASReception], group: Group) -> JCASReception:
communication_reception = CommunicationReception.from_HDF(group)
radar_reception = RadarReception.from_HDF(group)
return JCASReception(communication_reception, radar_reception)
def to_HDF(self, group: Group) -> None:
CommunicationReception.to_HDF(self, group)
RadarReception.to_HDF(self, group)
[docs]
class MatchedFilterJcas( # type: ignore[misc]
Radar, DuplexModem, Transmitter[JCASTransmission], Receiver[JCASReception], Serializable
):
"""Joint Communication and Sensing Operator.
A combination of communication and sensing operations.
Senses the enviroment via a correlatiom-based time of flight estimation of transmitted waveforms.
"""
yaml_tag = "MatchedFilterJcas"
property_blacklist = {"slot"}
# The specific required sampling rate
__sampling_rate: Optional[float]
__max_range: float # Maximally detectable range
def __init__(self, max_range: float, device: Device | None = None, **kwargs) -> None:
"""
Args:
max_range (float):
Maximally detectable range in m.
"""
# Initialize base classes
Radar.__init__(self)
DuplexModem.__init__(self, **kwargs)
# Initialize class attributes
self.__sampling_rate = None
self.max_range = max_range
self.device = device
def _transmit(self, duration: float = -1.0) -> JCASTransmission:
# Cache the recently transmitted waveform for correlation during reception
transmission = JCASTransmission(DuplexModem._transmit(self, duration)) # type: ignore
return transmission
def _receive(self, signal: Signal) -> JCASReception:
# There must be a recent transmission being cached in order to correlate
if self.transmission is None:
raise RuntimeError(
"Receiving from a matched filter joint must be preceeded by a transmission"
)
# Receive information
communication_reception = DuplexModem._receive(self, signal)
# Re-sample communication waveform
signal = signal.resample(self.sampling_rate)
resolution = self.range_resolution
num_propagated_samples = int(2 * self.max_range / resolution)
# Append additional samples if the signal is too short
required_num_received_samples = (
self.transmission.signal.num_samples + num_propagated_samples
)
if signal.num_samples < required_num_received_samples:
signal.append_samples(
Signal(
np.zeros(
(1, required_num_received_samples - signal.num_samples), dtype=complex
),
self.sampling_rate,
signal.carrier_frequency,
)
)
# Remove possible overhead samples if signal is too long
# resampled_signal.samples = re
# sampled_signal.samples[:, :num_samples]
correlation = (
abs(
correlate(
signal.samples, self.transmission.signal.samples, mode="valid", method="fft"
).flatten()
)
/ self.transmission.signal.num_samples
)
lags = correlation_lags(
signal.num_samples, self.transmission.signal.num_samples, mode="valid"
)
# Append zeros for correct depth estimation
# num_appended_zeros = max(0, num_samples - resampled_signal.num_samples)
# correlation = np.append(correlation, np.zeros(num_appended_zeros))
# Create the cube object
angle_bins = np.array([[0.0, 0.0]])
velocity_bins = np.array([0.0])
range_bins = 0.5 * lags[:num_propagated_samples] * resolution
cube_data = np.array([[correlation[:num_propagated_samples]]], dtype=float)
cube = RadarCube(cube_data, angle_bins, velocity_bins, range_bins, self.carrier_frequency)
# Infer the point cloud, if a detector has been configured
cloud = None if self.detector is None else self.detector.detect(cube)
radar_reception = RadarReception(signal, cube, cloud)
jcas_reception = JCASReception(communication_reception, radar_reception)
return jcas_reception
@property
def sampling_rate(self) -> float:
modem_sampling_rate = self.waveform.sampling_rate
if self.__sampling_rate is None:
return modem_sampling_rate
return max(modem_sampling_rate, self.__sampling_rate)
@sampling_rate.setter
def sampling_rate(self, value: Optional[float]) -> None:
if value is None:
self.__sampling_rate = None
return
if value <= 0.0:
raise ValueError("Sampling rate must be greater than zero")
self.__sampling_rate = value
@property
def range_resolution(self) -> float:
"""Resolution of the Range Estimation.
Returns:
float:
Resolution in m.
Raises:
ValueError:
If the range resolution is smaller or equal to zero.
"""
return speed_of_light / self.sampling_rate
@range_resolution.setter
def range_resolution(self, value: float) -> None:
if value <= 0.0:
raise ValueError("Range resolution must be greater than zero")
self.sampling_rate = speed_of_light / value
@property
def frame_duration(self) -> float:
return self.waveform.frame_duration
@property
def max_range(self) -> float:
"""Maximally Estimated Range.
Returns:
The maximum range in m.
Raises:
ValueError:
If `max_range` is smaller or equal to zero.
"""
return self.__max_range
@max_range.setter
def max_range(self, value) -> None:
if value <= 0.0:
raise ValueError("Maximum range must be greater than zero")
self.__max_range = value
@property
def device(self) -> Device | None:
return Radar.device.fget(self) # type: ignore
@device.setter
def device(self, value: Device | None) -> None:
DuplexModem.device.fset(self, value) # type: ignore
Radar.device.fset(self, value) # type: ignore
def _recall_transmission(self, group: Group) -> JCASTransmission:
return JCASTransmission.from_HDF(group)
def _recall_reception(self, group: Group) -> JCASReception:
return JCASReception.from_HDF(group)
def _noise_power(self, strength: float, snr_type=SNRType) -> float:
# Defer to the DuplexModem noise power calculation
return DuplexModem._noise_power(self, strength, snr_type)