# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from typing import Generic, Sequence, Type
from h5py import Group
from hermespy.core import Device, Signal
from hermespy.modem.modem import TransmittingModemBase, ReceivingModemBase
from hermespy.modem import CommunicationTransmission, CommunicationReception, CWT
from hermespy.radar import RadarBase, RadarTransmission, RadarReception
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "Jan Adler"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class JCASTransmission(CommunicationTransmission, RadarTransmission):
"""Information generated by transmitting over a joint communication and sensing operator."""
def __init__(self, transmission: CommunicationTransmission) -> None:
CommunicationTransmission.__init__(
self, signal=transmission.signal, frames=transmission.frames
)
RadarTransmission.__init__(self, signal=transmission.signal)
@classmethod
def from_HDF(cls: Type[JCASTransmission], group: Group) -> JCASTransmission:
return JCASTransmission(CommunicationTransmission.from_HDF(group))
[docs]
class JCASReception(CommunicationReception, RadarReception):
"""Information generated by receiving over a joint communication and sensing operator."""
def __init__(self, communication: CommunicationReception, radar: RadarReception) -> None:
CommunicationReception.__init__(
self, signal=communication.signal, frames=communication.frames
)
RadarReception.__init__(self, radar.signal, radar.cube, radar.cloud)
@classmethod
def from_HDF(cls: Type[JCASReception], group: Group) -> JCASReception:
communication_reception = CommunicationReception.from_HDF(group)
radar_reception = RadarReception.from_HDF(group)
return JCASReception(communication_reception, radar_reception)
def to_HDF(self, group: Group) -> None:
CommunicationReception.to_HDF(self, group)
RadarReception.to_HDF(self, group)
[docs]
class DuplexJCASOperator(
Generic[CWT],
RadarBase[JCASTransmission, JCASReception],
TransmittingModemBase[CWT],
ReceivingModemBase[CWT],
):
"""Base class for duplex joint communication and sensing operators.
Duplex joint communication and sensing operators transmit a modulated waveform while simultaneously deriving a radar
cube from the received backscattered power.
"""
def __init__(
self,
device: Device | None = None,
waveform: CWT | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
device (Device, optional):
Device this operator operating.
Operator is considered floating by default.
waveform (CWT, optional):
Communication waveform emitted by this operator.
selected_transmit_ports (Sequence[int], optional):
Selected transmit ports of the device.
selected_receive_ports (Sequence[int], optional):
Selected receive ports of the device.
"""
# Initialize base classes
TransmittingModemBase.__init__(self)
ReceivingModemBase.__init__(self, **kwargs)
RadarBase.__init__(
self,
device=device,
selected_transmit_ports=selected_transmit_ports,
selected_receive_ports=selected_receive_ports,
)
# Initialize class attributes
self.device = device
self.waveform = waveform
@property
def transmitting_device(self) -> Device | None:
return self.device
@property
def receiving_device(self) -> Device | None:
return self.device
@property
def sampling_rate(self) -> float:
if self.waveform is None:
return 1.0
return self.waveform.sampling_rate
@property
def frame_duration(self) -> float:
if self.waveform is None:
return 0.0
return self.waveform.frame_duration
@abstractmethod
def _transmit(self, duration: float = -1.0) -> JCASTransmission: ... # pragma: no cover
@abstractmethod
def _receive(self, signal: Signal) -> JCASReception: ... # pragma: no cover
def _recall_transmission(self, group: Group) -> JCASTransmission:
return JCASTransmission.from_HDF(group)
def _recall_reception(self, group: Group) -> JCASReception:
return JCASReception.from_HDF(group)