Source code for hermespy.jcas.ofdm_radar

# -*- coding: utf-8 -*-

from __future__ import annotations
from typing import Sequence

import numpy as np
from scipy.constants import speed_of_light
from scipy.fft import ifft, fft, ifftshift

from hermespy.core import Serializable, ReceiveState, Signal, TransmitState
from hermespy.jcas.jcas import JCASReception, JCASTransmission
from hermespy.modem import OFDMWaveform, ReceivingModemBase, TransmittingModemBase, Symbols
from hermespy.radar import RadarCube, RadarReception
from .jcas import DuplexJCASOperator

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class OFDMRadar(DuplexJCASOperator[OFDMWaveform], Serializable): """A joint communication and sensing approach estimating a range-power profile from OFDM symbols. Refer to :footcite:p:`2009:sturm` for the original publication. """ yaml_tag = "OFDMRadar" __last_transmission: JCASTransmission | None = None def __init__( self, waveform: OFDMWaveform | None = None, selected_transmit_ports: Sequence[int] | None = None, selected_receive_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, seed: int | None = None, ) -> None: """ Args: waveform (OFDMWaveform, optional): Communication waveform emitted by this operator. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. carrier_frequency (float, optional): Central frequency of the mixed signal in radio-frequency transmission band. If not specified, the operated device's default carrier frequency will be assumed during signal processing. seed (int, optional): Random seed used to initialize the pseudo-random number generator. """ # Initalize base class DuplexJCASOperator.__init__( self, waveform, selected_transmit_ports, selected_receive_ports, carrier_frequency, seed ) # Initialize class attributes self.__last_transmission = None @property def max_range(self) -> float: """Maximum range detectable by OFDM radar. Defined by equation (12) in :footcite:p:`2009:sturm` as .. math:: d_\\mathrm{max} = \\frac{c_0}{2 \\Delta f} \\quad \\text{.} Returns: Maximum range in m. """ if self.waveform is None: return 0.0 return speed_of_light / (2 * self.waveform.subcarrier_spacing) @property def range_resolution(self) -> float: """Range resolution achievable by OFDM radar. Defined by equation (13) in :footcite:p:`2009:sturm` as .. math:: \\Delta r = \\frac{c_0}{2 B} = \\frac{c_0}{2 N \\Delta f} = \\frac{d_{\\mathrm{max}}}{N} \\quad \\text{.} """ if self.waveform is None: return 0.0 return self.max_range / self.waveform.num_subcarriers @property def max_relative_doppler(self) -> float: """The maximum relative doppler shift detectable by the OFDM radar in Hz.""" # The maximum velocity is the wavelength divided by four times the pulse repetition interval max_doppler = 1 / (4 * self.frame_duration) return max_doppler @property def relative_doppler_resolution(self) -> float: """The relative doppler resolution achievable by the OFDM radar in Hz.""" # The doppler resolution is the inverse of twice the frame duration resolution = 1 / (2 * self.frame_duration) return resolution @property def power(self) -> float: return 0.0 if self.waveform is None else self.waveform.power def _transmit(self, device: TransmitState, duration: float) -> JCASTransmission: communication_transmission = TransmittingModemBase._transmit(self, device, duration) self.__last_transmission = JCASTransmission(communication_transmission) return self.__last_transmission def __estimate_range(self, transmitted_symbols: Symbols, received_signal: Signal) -> np.ndarray: """Estiamte the range-power profile of the received signal. Args: transmitted_symbols (Symbols): The originally transmitted OFDM symbols. received_signal (Signal): The received OFDM base-band signal samples. Returns: np.ndarray: The range-power profile of the received signal. """ # Demodulate the signal received from an angle of interest received_symbols = self.waveform.demodulate(received_signal.getitem(0).flatten()) # Normalize received demodulated symbols equation (8) normalized_symbols = np.divide( received_symbols.raw, transmitted_symbols.raw, np.zeros_like(received_symbols.raw), where=np.abs(transmitted_symbols.raw) != 0.0, ) # Estimate range-power profile by equation (10) power_profile = ifftshift(fft(ifft(normalized_symbols[0, ::], axis=1), axis=0), axes=0) return np.abs(power_profile) def _receive(self, signal: Signal, device: ReceiveState) -> JCASReception: if self.__last_transmission is None: raise RuntimeError("Unable to receive ") # Retrieve the transmitted symbols transmitted_symbols = self.waveform.place(self.__last_transmission.symbols) # Run the normal communication reception processing communication_reception = ReceivingModemBase._receive(self, signal, device) # Build a radar cube angles_of_interest, beamformed_samples = self._receive_beamform(signal, device) range_bins = np.arange(self.waveform.num_subcarriers) * self.range_resolution doppler_bins = ( np.arange(self.waveform.words_per_frame) * self.relative_doppler_resolution - self.max_relative_doppler ) cube_data = np.empty( (len(angles_of_interest), len(doppler_bins), len(range_bins)), dtype=float ) for angle_idx, line in enumerate(beamformed_samples): # Process the single angular line by the waveform generator line_signal = signal.from_ndarray(line) line_estimate = self.__estimate_range(transmitted_symbols, line_signal) cube_data[angle_idx, ::] = line_estimate # Create radar cube object cube = RadarCube( cube_data, angles_of_interest, doppler_bins, range_bins, device.carrier_frequency ) # Infer the point cloud, if a detector has been configured cloud = None if self.detector is None else self.detector.detect(cube) # Generate reception object radar_reception = RadarReception(signal, cube, cloud) return JCASReception(communication_reception, radar_reception)