# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import abstractmethod
from typing import Generic, Sequence, Type
from h5py import Group
from hermespy.core import ReceiveState, Signal, TransmitState
from hermespy.modem.modem import TransmittingModemBase, ReceivingModemBase
from hermespy.modem import CommunicationTransmission, CommunicationReception, CWT
from hermespy.radar import RadarBase, RadarTransmission, RadarReception
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "Jan Adler"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class JCASTransmission(CommunicationTransmission, RadarTransmission):
"""Information generated by transmitting over a joint communication and sensing operator."""
def __init__(self, transmission: CommunicationTransmission) -> None:
CommunicationTransmission.__init__(
self, signal=transmission.signal, frames=transmission.frames
)
RadarTransmission.__init__(self, signal=transmission.signal)
@classmethod
def from_HDF(cls: Type[JCASTransmission], group: Group) -> JCASTransmission:
return JCASTransmission(CommunicationTransmission.from_HDF(group))
[docs]
class JCASReception(CommunicationReception, RadarReception):
"""Information generated by receiving over a joint communication and sensing operator."""
def __init__(self, communication: CommunicationReception, radar: RadarReception) -> None:
CommunicationReception.__init__(
self, signal=communication.signal, frames=communication.frames
)
RadarReception.__init__(self, radar.signal, radar.cube, radar.cloud)
@classmethod
def from_HDF(cls: Type[JCASReception], group: Group) -> JCASReception:
communication_reception = CommunicationReception.from_HDF(group)
radar_reception = RadarReception.from_HDF(group)
return JCASReception(communication_reception, radar_reception)
def to_HDF(self, group: Group) -> None:
CommunicationReception.to_HDF(self, group)
RadarReception.to_HDF(self, group)
[docs]
class DuplexJCASOperator(
Generic[CWT],
RadarBase[JCASTransmission, JCASReception],
TransmittingModemBase[CWT],
ReceivingModemBase[CWT],
):
"""Base class for duplex joint communication and sensing operators.
Duplex joint communication and sensing operators transmit a modulated waveform while simultaneously deriving a radar
cube from the received backscattered power.
"""
def __init__(
self,
waveform: CWT | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
seed: int | None = None,
) -> None:
"""
Args:
waveform (CWT, optional):
Communication waveform emitted by this operator.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available antenna ports will be considered.
carrier_frequency (float, optional):
Central frequency of the mixed signal in radio-frequency transmission band.
If not specified, the operated device's default carrier frequency will be assumed during signal processing.
seed (int, optional):
Random seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
TransmittingModemBase.__init__(self)
ReceivingModemBase.__init__(self)
RadarBase.__init__(
self, selected_transmit_ports, selected_receive_ports, carrier_frequency, seed
)
# Initialize class attributes
self.waveform = waveform
@property
def sampling_rate(self) -> float:
if self.waveform is None:
return 1.0
return self.waveform.sampling_rate
@property
def frame_duration(self) -> float:
if self.waveform is None:
return 0.0
return self.waveform.frame_duration
@abstractmethod
def _transmit(
self, device: TransmitState, duration: float
) -> JCASTransmission: ... # pragma: no cover
@abstractmethod
def _receive(
self, signal: Signal, device: ReceiveState
) -> JCASReception: ... # pragma: no cover
def _recall_transmission(self, group: Group) -> JCASTransmission:
return JCASTransmission.from_HDF(group)
def _recall_reception(self, group: Group) -> JCASReception:
return JCASReception.from_HDF(group)