# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from typing import Generic, Sequence, Type, TypeVar
from typing_extensions import override
from hermespy.beamforming import ReceiveBeamformer
from hermespy.core import (
ReceiveState,
Signal,
TransmitState,
SerializationProcess,
DeserializationProcess,
)
from hermespy.modem.modem import TransmittingModemBase, ReceivingModemBase
from hermespy.modem import CommunicationTransmission, CommunicationReception, CWT
from hermespy.radar import RadarBase, RadarDetector, RadarTransmission, RadarReception
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "Jan Adler"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class JCASTransmission(CommunicationTransmission, RadarTransmission):
"""Information generated by transmitting over a joint communication and sensing operator."""
def __init__(self, transmission: CommunicationTransmission) -> None:
CommunicationTransmission.__init__(
self, signal=transmission.signal, frames=transmission.frames
)
RadarTransmission.__init__(self, signal=transmission.signal)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
# The radar transmission is serialized as a communication transmission and can be skipped
CommunicationTransmission.serialize(self, process)
[docs]
@classmethod
@override
def Deserialize(
cls: Type[JCASTransmission], process: DeserializationProcess
) -> JCASTransmission:
communication_transmission = CommunicationTransmission.Deserialize(process)
return JCASTransmission(communication_transmission)
[docs]
class JCASReception(CommunicationReception, RadarReception):
"""Information generated by receiving over a joint communication and sensing operator."""
def __init__(self, communication: CommunicationReception, radar: RadarReception) -> None:
CommunicationReception.__init__(
self, signal=communication.signal, frames=communication.frames
)
RadarReception.__init__(self, radar.signal, radar.cube, radar.cloud)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
CommunicationReception.serialize(self, process)
RadarReception.serialize(self, process)
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> JCASReception:
return JCASReception(
CommunicationReception.Deserialize(process), RadarReception.Deserialize(process)
)
_DJOT = TypeVar("_DJOT", bound="DuplexJCASOperator")
[docs]
class DuplexJCASOperator(
Generic[CWT],
RadarBase[JCASTransmission, JCASReception],
TransmittingModemBase[CWT],
ReceivingModemBase[CWT],
):
"""Base class for duplex joint communication and sensing operators.
Duplex joint communication and sensing operators transmit a modulated waveform while simultaneously deriving a radar
cube from the received backscattered power.
"""
def __init__(
self,
waveform: CWT | None = None,
receive_beamformer: ReceiveBeamformer | None = None,
detector: RadarDetector | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
seed: int | None = None,
) -> None:
"""
Args:
waveform:
Communication waveform emitted by this operator.
receive_beamformer:
Beamforming applied during signal reception.
If not specified, no beamforming will be applied during reception.
detector:
Detector routine configured to generate point clouds from radar cubes.
If not specified, no point cloud will be generated during reception.
selected_transmit_ports:
Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports:
Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available antenna ports will be considered.
carrier_frequency:
Central frequency of the mixed signal in radio-frequency transmission band.
If not specified, the operated device's default carrier frequency will be assumed during signal processing.
seed:
Random seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
TransmittingModemBase.__init__(self)
ReceivingModemBase.__init__(self)
RadarBase.__init__(
self,
receive_beamformer,
detector,
selected_transmit_ports,
selected_receive_ports,
carrier_frequency,
seed,
)
# Initialize class attributes
self.waveform = waveform
@property
def sampling_rate(self) -> float:
if self.waveform is None:
return 1.0
return self.waveform.sampling_rate
@property
def frame_duration(self) -> float:
if self.waveform is None:
return 0.0
return self.waveform.frame_duration
@abstractmethod
def _transmit(
self, device: TransmitState, duration: float
) -> JCASTransmission: ... # pragma: no cover
@abstractmethod
def _receive(
self, signal: Signal, device: ReceiveState
) -> JCASReception: ... # pragma: no cover
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
RadarBase.serialize(self, process)
[docs]
@classmethod
@override
def Deserialize(cls: Type[_DJOT], process: DeserializationProcess) -> _DJOT:
return cls(**cls._DeserializeParameters(process)) # type: ignore[arg-type]