# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Generic, List, Set, Sequence, Tuple, Type
import numpy as np
from h5py import Group
from hermespy.fec import EncoderManager
from hermespy.core import (
HDFSerializable,
RandomNode,
Transmission,
Reception,
Serializable,
Signal,
Device,
Transmitter,
Receiver,
)
from hermespy.precoding import ReceiveStreamCoding, TransmitStreamCoding
from .precoding import SymbolPrecoding
from .bits_source import BitsSource, RandomBitsSource
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class CommunicationTransmissionFrame(Transmission):
"""A single synchronized frame of information generated by transmittgin over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
signal: Signal
"""Communication base-band waveform."""
bits: np.ndarray
"""Communication data bits."""
encoded_bits: np.ndarray
"""Transmitted bits after FEC encoding."""
symbols: Symbols
"""Communication data symbols."""
encoded_symbols: Symbols
"""Communication data symbols after symbol encoding."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
def __init__(
self,
signal: Signal,
bits: np.ndarray,
encoded_bits: np.ndarray,
symbols: Symbols,
encoded_symbols: Symbols,
timestamp: float,
) -> None:
"""
Args:
signal (Signal):
Transmitted communication base-band waveform.
bits (np.ndarray):
Transmitted communication data bits.
encoded_bits (np.ndarray):
Transmitted communication bits after FEC encoding.
symbols (Symbols):
Transmitted communication data symbols.
encoded_symbols (Symbols):
Transmitted communication data symbols after symbol encoding.
timestamp (float):
Time at which the frame was transmitted in seconds.
"""
Transmission.__init__(self, signal=signal)
self.bits = bits
self.encoded_bits = encoded_bits
self.symbols = symbols
self.encoded_symbols = encoded_symbols
self.timestamp = timestamp
@classmethod
def from_HDF(
cls: Type[CommunicationTransmissionFrame],
group: Group,
transmitter: Transmitter | None = None,
) -> CommunicationTransmissionFrame:
# Recall groups
signal = Signal.from_HDF(group["signal"])
symbols = Symbols.from_HDF(group["symbols"])
encoded_symbols = Symbols.from_HDF(group["encoded_symbols"])
# Recall datasets
bits = np.array(group["bits"], dtype=np.uint8)
encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8)
# Recall attributes
timestamp = group.attrs.get("timestamp", 0)
# Initialize object from recalled state
return cls(
signal=signal,
bits=bits,
encoded_bits=encoded_bits,
symbols=symbols,
encoded_symbols=encoded_symbols,
timestamp=timestamp,
)
def to_HDF(self, group: Group) -> None:
# Serialize base class
self.signal.to_HDF(group.create_group("signal"))
# Serialize groups
self.symbols.to_HDF(group.create_group("symbols"))
self.encoded_symbols.to_HDF(group.create_group("encoded_symbols"))
# Serialize datasets
group.create_dataset("bits", data=self.bits)
group.create_dataset("encoded_bits", data=self.encoded_bits)
# Serialize attributes
group.attrs["timestamp"] = self.timestamp
[docs]
class CommunicationTransmission(Transmission):
"""Collection of information generated by transmitting over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
frames: List[CommunicationTransmissionFrame]
"""Individual transmitted communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None
) -> None:
"""
Args:
signal (Signal):
Transmitted communication base-band waveform.
frames (List[CommunicationTransmissionFrame], optional):
Individual transmitted communication frames.
"""
Transmission.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of transmitted communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def bits(self) -> np.ndarray:
"""Transmitted bits before FEC encoding.
Returns: Numpy array of transmitted bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
@classmethod
def from_HDF(cls: Type[CommunicationTransmission], group: Group) -> CommunicationTransmission:
# Recall base signal
signal = Signal.from_HDF(group["signal"])
# Recall individual detected frames
num_frames = group.attrs.get("num_frames", 0)
frames = [
CommunicationTransmissionFrame.from_HDF(group[f"frame_{f:02d}"])
for f in range(num_frames)
]
return cls(signal=signal, frames=frames)
def to_HDF(self, group: Group) -> None:
# Serialize attributes
group.attrs["num_frames"] = self.num_frames
# Serialize base information
self.signal.to_HDF(group.create_group("signal"))
# Serialize detected frames
for f, frame in enumerate(self.frames):
frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs]
class CommunicationReceptionFrame(HDFSerializable):
"""A single synchronized frame of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
signal: Signal
"""Communication base-band waveform."""
decoded_signal: Signal
"""Communication base-band waveform after MIMO stream decoding."""
symbols: Symbols
"""Received communication symbols."""
decoded_symbols: Symbols
"""Received communication symbols after precoding stage."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
equalized_symbols: Symbols
"""Equalized communication symbols."""
encoded_bits: np.ndarray
"""Received encoded data bits before error correction."""
decoded_bits: np.ndarray
"""Received decoded data bits after error correction."""
def __init__(
self,
signal: Signal,
decoded_signal: Signal,
symbols: Symbols,
decoded_symbols: Symbols,
timestamp: float,
equalized_symbols: Symbols,
encoded_bits: np.ndarray,
decoded_bits: np.ndarray,
) -> None:
self.signal = signal
self.decoded_signal = decoded_signal
self.symbols = symbols
self.decoded_symbols = decoded_symbols
self.timestamp = timestamp
self.equalized_symbols = equalized_symbols
self.encoded_bits = encoded_bits
self.decoded_bits = decoded_bits
@classmethod
def from_HDF(
cls: Type[CommunicationReceptionFrame], group: Group
) -> CommunicationReceptionFrame:
# Recall groups
signal = Signal.from_HDF(group["signal"])
decoded_signal = Signal.from_HDF(group["decoded_signal"])
symbols = Symbols.from_HDF(group["symbols"])
decoded_symbols = Symbols.from_HDF(group["decoded_symbols"])
equalized_symbols = Symbols.from_HDF(group["equalized_symbols"])
# Recall datasets
encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8)
decoded_bits = np.array(group["decoded_bits"], dtype=np.uint8)
# Recall attributes
timestamp = group.attrs.get("timestamp", 0)
# Initialize object from recalled state
return cls(
signal=signal,
decoded_signal=decoded_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
decoded_bits=decoded_bits,
timestamp=timestamp,
)
def to_HDF(self, group: Group) -> None:
# Serialize groups
self.signal.to_HDF(group.create_group("signal"))
self.decoded_signal.to_HDF(group.create_group("decoded_signal"))
self.symbols.to_HDF(group.create_group("symbols"))
self.decoded_symbols.to_HDF(group.create_group("decoded_symbols"))
self.equalized_symbols.to_HDF(group.create_group("equalized_symbols"))
# Serialize datasets
group.create_dataset("encoded_bits", data=self.encoded_bits)
group.create_dataset("decoded_bits", data=self.decoded_bits)
# Serialize attributes
group.attrs["timestamp"] = self.timestamp
[docs]
class CommunicationReception(Reception):
"""Collection of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
frames: List[CommunicationReceptionFrame]
"""Individual received communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None
) -> None:
"""
Args:
signal (Signal):
Received communication base-band waveform.
frames (List[CommunicationReceptionFrame], optional):
Individual received communication frames.
"""
Reception.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of received communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def encoded_bits(self) -> np.ndarray:
"""Received bits before FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.encoded_bits)
return concatenated_bits
@cached_property
def bits(self) -> np.ndarray:
"""Received bits after FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.decoded_bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
"""Collect all communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
@cached_property
def equalized_symbols(self) -> Symbols:
"""Collect all equalized communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.equalized_symbols)
return symbols
@classmethod
def from_HDF(cls: Type[CommunicationReception], group: Group) -> CommunicationReception:
# Recall base signal
signal = Signal.from_HDF(group["signal"])
# Recall individual detected frames
num_frames = group.attrs.get("num_frames", 0)
frames = [
CommunicationReceptionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames)
]
return cls(signal=signal, frames=frames)
def to_HDF(self, group: Group) -> None:
# Serialize attributes
group.attrs["num_frames"] = self.num_frames
# Serialize base information
self.signal.to_HDF(group.create_group("signal"))
# Serialize detected frames
for f, frame in enumerate(self.frames):
frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs]
class BaseModem(ABC, Generic[CWT], RandomNode):
"""Base class for wireless modems transmitting or receiving information over devices.
Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience
wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface.
"""
__encoder_manager: EncoderManager
__precoding: SymbolPrecoding
__waveform: CWT | None
@staticmethod
def _arg_signature() -> Set[str]:
return {"encoding", "precoding", "waveform", "seed"}
def __init__(
self,
encoding: EncoderManager | None = None,
precoding: SymbolPrecoding | None = None,
waveform: CWT | None = None,
seed: int | None = None,
) -> None:
"""
Args:
encoding (EncoderManager, optional):
Bit coding configuration.
Encodes communication bit frames during transmission and decodes them during reception.
precoding (SymbolPrecoding, optional):
Modulation symbol coding configuration.
waveform (CWT, optional):
The waveform to be transmitted by this modem.
seed (int, optional):
Seed used to initialize the pseudo-random number generator.
"""
# Base class initialization
RandomNode.__init__(self, seed=seed)
self.encoder_manager = EncoderManager() if encoding is None else encoding
self.precoding = SymbolPrecoding(modem=self) if precoding is None else precoding
self.waveform = waveform
@property
@abstractmethod
def transmitting_device(self) -> Device | None:
"""Tranmsitting device operated by the modem.
:py:obj:`None` if the device is unspecified.
"""
... # pragma no cover
@property
@abstractmethod
def receiving_device(self) -> Device | None:
"""Receiving device operated by the modem.
:py:obj:`None` if the device is unspecified.
"""
... # pragma no cover
@property
def encoder_manager(self) -> EncoderManager:
"""Description of the modem's forward error correction.
Refer to :doc:`fec` for further information.
"""
return self.__encoder_manager
@encoder_manager.setter
def encoder_manager(self, new_manager: EncoderManager) -> None:
self.__encoder_manager = new_manager
self.__encoder_manager.random_mother = self
new_manager.modem = self
@property
def waveform(self) -> CWT | None:
"""Description of the communication waveform emitted by this modem.
The only mandatory attribute required to transmit or receive over a
:class:`modem<BaseModem>`.
"""
return self.__waveform
@waveform.setter
def waveform(self, value: CWT | None) -> None:
self.__waveform = value
if value is not None:
value.modem = self
value.random_mother = self
@property
def precoding(self) -> SymbolPrecoding:
"""Description of the modem's precoding on a symbol level."""
return self.__precoding
@precoding.setter
def precoding(self, coding: SymbolPrecoding) -> None:
self.__precoding = coding
self.__precoding.modem = self
def _bit_requirements(self) -> Tuple[int, int]:
"""Compute the bit generation requirements of the modem for the given configuration.
Returns: Tuple of required data bits and required code bits.
Raises:
RuntimeError: If the symbol precoding rate does not match the waveform configuration.
"""
if self.waveform.num_data_symbols % self.precoding.rate.denominator != 0:
raise RuntimeError(
f"Symbol precoding rate does not match the waveform configuration ({self.waveform.num_data_symbols} % {self.precoding.rate.denominator} != 0)"
)
required_num_data_symbols = int(self.waveform.num_data_symbols * self.precoding.rate)
required_num_code_bits = (
self.waveform.bits_per_frame(required_num_data_symbols)
* self.precoding.num_input_streams
)
required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits)
return required_num_data_bits, required_num_code_bits
@property
def num_data_bits_per_frame(self) -> int:
"""Compute the number of required data bits to generate a single frame.
The number of bits depends on the configured :attr:`waveform<BaseModem.waveform>`,
as well as the :attr:`encoder_manager<BaseModem.encoder_manager>` and :attr:`precoding<BaseModem.precoding>`.
"""
required_num_data_bits, _ = self._bit_requirements()
return required_num_data_bits
@property
def samples_per_frame(self) -> int:
"""Number of discrete-time samples per processed communication frame.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.samples_per_frame
@property
def frame_duration(self) -> float:
"""Duration of a single communication frame in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property.
"""
return self.waveform.frame_duration
@property
def symbol_duration(self) -> float:
"""Duration of a single communication symbol in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property.
"""
return self.waveform.symbol_duration
@property
def sampling_rate(self) -> float:
"""Sampling rate of the processed waveforms in Hz.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.sampling_rate
@property
def num_transmit_ports(self) -> int:
"""Number of transmit antenna ports."""
return 0 # pragma: no cover
@property
def num_receive_ports(self) -> int:
"""Number of receive antenna ports."""
return 0 # pragma: no cover
[docs]
class TransmittingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms transmitting information."""
__bits_source: BitsSource
__transmit_stream_coding: TransmitStreamCoding
def __init__(self, bits_source: BitsSource | None = None, *args, **kwargs) -> None:
"""
Args:
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
"""
# Initialize base classes
BaseModem.__init__(self, *args, **kwargs)
# Initialize clas attributes
self.bits_source = RandomBitsSource() if bits_source is None else bits_source
self.__transmit_stream_coding = TransmitStreamCoding(modem=self)
@property
def bits_source(self) -> BitsSource:
"""Source configuration of communication transmitted by this modem.
Returns: A handle to the source configuration.
"""
return self.__bits_source
@bits_source.setter
def bits_source(self, value: BitsSource):
value.random_mother = self
self.__bits_source = value
@property
def num_transmit_ports(self) -> int:
return (
0 if self.transmitting_device is None else self.transmitting_device.num_transmit_ports
)
@property
def transmit_stream_coding(self) -> TransmitStreamCoding:
"""Stream MIMO coding configuration during signal transmission.
Returns: Handle to the coding configuration.
"""
return self.__transmit_stream_coding
@transmit_stream_coding.setter
def transmit_stream_coding(self, value: TransmitStreamCoding) -> None:
self.__transmit_stream_coding = value
value.modem = self
def __map(self, bits: np.ndarray, num_streams: int) -> Symbols:
"""Map a block of information bits to commuication symbols.
Args:
bits (np.ndarray):
The information bits to be mapped.
Returns:
A series of communication symbols.
"""
symbols = Symbols()
for frame_bits in np.reshape(bits, (num_streams, -1)):
symbols.append_stream(self.waveform.map(frame_bits))
return symbols
def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal:
"""Modulates a sequence of MIMO signals into a base-band communication waveform.
Args:
symbols (Symbols):
Communication symbols to be modulated.
carrier_frequency (float):
Carrier frequency of the communication signal.
Returns:
The modualted base-band communication frame.
"""
# For each stream resulting from the initial encoding stage
# Place and encode the symbols according to the stream transmit coding configuration
frame_samples = np.empty(
(symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex_
)
for s, stream_symbols in enumerate(symbols.raw):
placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :]))
# Modulate each placed symbol stream individually to its base-band signal representation
frame_samples[s, :] = self.waveform.modulate(placed_symbols)
# Apply the stream transmit coding configuration
frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency)
return frame_signal
[docs]
def _transmit(self, duration: float = -1.0) -> CommunicationTransmission:
"""Returns an array with the complex base-band samples of a waveform generator.
The signal may be distorted by RF impairments.
Args:
duration (float, optional): Length of signal in seconds.
Returns:
Transmitted information.
"""
# By default, the drop duration will be exactly one frame
if duration <= 0.0:
duration = self.frame_duration
# Infer required parameters
carrier_frequency = (
self.transmitting_device.carrier_frequency
if self.transmitting_device is not None
else 0.0
)
frame_duration = self.frame_duration
num_mimo_frames = int(duration / frame_duration)
(required_num_data_bits, required_num_code_bits) = self._bit_requirements()
# Ultimately, the number of resulting output streams is determined by the stream coding configuration
if len(self.transmit_stream_coding) > 0:
num_output_streams = self.transmit_stream_coding.num_output_streams
# If not stream coding configuration is available, the number of output streams is determined by the precoding configuration
elif len(self.precoding) > 0:
num_output_streams = self.precoding.num_output_streams
# The default number of output streams is one
else:
num_output_streams = 1
# Assert that the number of output streams matches the antenna count
if num_output_streams != self.num_transmit_ports:
raise RuntimeError(
f"Modem MIMO configuration generates invalid number of antenna streams ({num_output_streams} instead of {self.num_transmit_ports})"
)
signal = Signal.Empty(
self.waveform.sampling_rate, num_output_streams, carrier_frequency=carrier_frequency
)
# Abort if no frame is to be transmitted within the current duration
if num_mimo_frames < 1:
transmission = CommunicationTransmission(signal)
return transmission
frames: List[CommunicationTransmissionFrame] = []
for n in range(num_mimo_frames):
# Generate plain data bits
data_bits = self.bits_source.generate_bits(required_num_data_bits)
# Apply forward error correction
encoded_bits = self.encoder_manager.encode(data_bits, required_num_code_bits)
# Map bits to communication symbols
mapped_symbols = self.__map(encoded_bits, self.precoding.num_input_streams)
# Apply the first symbol precoding cofiguration
encoded_symbols = self.precoding.encode(
StatedSymbols(
mapped_symbols.raw,
np.ones(
(
mapped_symbols.num_streams,
1,
mapped_symbols.num_blocks,
mapped_symbols.num_symbols,
),
dtype=np.complex_,
),
)
)
# Modulate symbols to a base-band signal
frame_signal = self.__modulate(encoded_symbols, carrier_frequency)
# Apply stream encoding configuration
encoded_frame_signal = self.__transmit_stream_coding.encode(frame_signal)
# Save results
signal.append_samples(encoded_frame_signal)
frames.append(
CommunicationTransmissionFrame(
signal=frame_signal,
bits=data_bits,
encoded_bits=encoded_bits,
symbols=mapped_symbols,
encoded_symbols=encoded_symbols,
timestamp=n * frame_duration,
)
)
# Save the transmitted information
transmission = CommunicationTransmission(signal, frames)
return transmission
[docs]
class TransmittingModem(
TransmittingModemBase[CommunicationWaveform],
Transmitter[CommunicationTransmission],
Serializable,
):
"""Representation of a wireless modem exclusively transmitting."""
yaml_tag = "TxModem"
def __init__(
self,
*args,
device: Device | None = None,
selected_transmit_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
device (Device, optional):
Device operated by the modem.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModemBase` for further details.
"""
# Initialize base classes
# Note that the initialization order matters here
Transmitter.__init__(self, selected_transmit_ports=selected_transmit_ports)
TransmittingModemBase.__init__(self, *args, **kwargs)
Serializable.__init__(self)
self.device = device
@property
def transmitting_device(self) -> Device | None:
# The transmitting device resolves to the operated device
return self.device
@property
def receiving_device(self) -> Device | None:
return None
@Transmitter.device.setter # type: ignore
def device(self, value: Device) -> None:
if Transmitter.device.fget(self) is not None: # type: ignore
self.device.transmitters.remove(self)
if value is not None and self not in value.transmitters:
value.transmitters.add(self)
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
def _recall_transmission(self, group: Group) -> CommunicationTransmission:
return CommunicationTransmission.from_HDF(group)
[docs]
class ReceivingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms receiving information."""
__receive_stream_coding: ReceiveStreamCoding
def __init__(self, *args, **kwargs) -> None:
self.__receive_stream_coding = ReceiveStreamCoding(modem=self)
BaseModem.__init__(self, *args, **kwargs)
@property
def receive_stream_coding(self) -> ReceiveStreamCoding:
"""Stream MIMO coding configuration during signal reception.
Returns: Handle to the coding configuration.
"""
return self.__receive_stream_coding
@receive_stream_coding.setter
def receive_stream_coding(self, value: ReceiveStreamCoding) -> None:
self.__receive_stream_coding = value
value.modem = self
def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]:
"""Synchronize a received MIMO base-band stream.
Converts the stream into sections representing communication frames.
Args:
received_signal (Signal):
The MIMO signal received over the operated device's RF chain.
Returns:
A sequence signals representing communication frames and their respective detection indices.
"""
# Synchronize raw MIMO data into frames
frame_start_indices = self.waveform.synchronization.synchronize(received_signal[:, :])
frame_length = self.waveform.samples_per_frame
synchronized_signals = []
for frame_start in frame_start_indices:
frame_stop = frame_start + frame_length
frame_samples = received_signal[:, frame_start:frame_stop]
# Pad the frame if it is too short
# This may happen if the last frame is incomplete, or synhronization is not perfect
if frame_samples.shape[1] < frame_length:
frame_samples = np.pad(
frame_samples,
((0, 0), (0, frame_length - frame_samples.shape[1])),
mode="constant",
)
frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate)
synchronized_signals.append(frame_signal)
return frame_start_indices, synchronized_signals
def __demodulate(self, frame: Signal) -> Symbols:
"""Demodulates a sequence of synchronized MIMO signals into data symbols.
Args:
frame (Signal):
Synchronized MIMO signal, representing the samples of a full communication frame.
Returns:
Demodulated frame symbols.
"""
symbols = Symbols()
for stream in frame[:]:
stream_symbols = self.waveform.demodulate(stream)
symbols.append_stream(stream_symbols)
return symbols
def __unmap(self, symbols: Symbols) -> np.ndarray:
"""Unmap a set of communication symbols to information bits.
Args:
symbols (Symbols):
The communication symbols to be unmapped
Returns:
A numpy array containing hard information bits.
"""
bits = np.empty(0, dtype=np.uint8)
for stream in symbols.raw:
bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :])))
return bits
[docs]
def _receive(self, signal: Signal) -> CommunicationReception:
# Resample the signal to match the waveform's requirements
signal = signal.resample(self.waveform.sampling_rate)
# Synchronize incoming signals
frame_start_indices, synchronized_signals = self.__synchronize(signal)
# Abort if no frame has been detected
if len(synchronized_signals) < 1:
reception = CommunicationReception(signal)
return reception
# Compute the bit generation requirements
required_num_data_bits, _ = self._bit_requirements()
# Process each frame independently
frames: List[CommunicationReceptionFrame] = []
for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals):
# Apply the stream transmit decoding configuration
decoded_frame_signal = self.__receive_stream_coding.decode(frame_signal)
# Demodulate raw symbols for each frame independtly
symbols = self.__demodulate(decoded_frame_signal)
# Estimate the channel from each frame demodulation
frame_delay = frame_index / self.waveform.sampling_rate
stated_symbols = self.waveform.estimate_channel(symbols, frame_delay)
picked_symbols = self.waveform.pick(stated_symbols)
# Decode the pre-equalization symbol precoding stage
decoded_symbols = self.precoding.decode(picked_symbols)
# Equalize the received symbols for each frame given the estimated channel state
equalized_symbols = self.waveform.equalize_symbols(decoded_symbols)
# Unmap equalized symbols to information bits
encoded_bits = self.__unmap(equalized_symbols)
# Apply inverse FEC configuration to correct errors and remove redundancies
decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits)
# Store the received information
frames.append(
CommunicationReceptionFrame(
signal=frame_signal,
decoded_signal=decoded_frame_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
timestamp=frame_index * signal.sampling_rate,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
decoded_bits=decoded_bits,
)
)
# Store the received information of all frames
reception = CommunicationReception(signal=signal, frames=frames)
return reception
[docs]
class ReceivingModem(
ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable
):
"""Representation of a wireless modem exclusively receiving."""
yaml_tag = "RxModem"
"""YAML serialization tag"""
def __init__(
self,
*args,
device: Device | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
device (Device, optional):
Device operated by the modem.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available antenna ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`ReceivingModemBase` for further details.
"""
# Initialize base classes
# Note that the initialization order matters here
Receiver.__init__(self, selected_receive_ports=selected_receive_ports)
ReceivingModemBase.__init__(self, *args, **kwargs)
Serializable.__init__(self)
self.device = device
@property
def transmitting_device(self) -> Device | None:
return None
@property
def receiving_device(self) -> Device | None:
# The receiving device resolves to the operated device
return self.device
@Receiver.device.setter # type: ignore
def device(self, value: Device) -> None:
if Receiver.device.fget(self) is not None: # type: ignore
self.device.receivers.remove(self)
if value is not None and self not in value.receivers:
value.receivers.add(self)
@property
def num_receive_ports(self) -> int:
if self.receiving_device is None:
return 0
else:
return self.receiving_device.num_receive_ports
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
def _recall_reception(self, group: Group) -> CommunicationReception:
return CommunicationReception.from_HDF(group)
[docs]
class DuplexModem(TransmittingModem, ReceivingModem):
"""Representation of a wireless modem simultaneously transmitting and receiving."""
yaml_tag = "Modem"
def __init__(
self,
*args,
bits_source: BitsSource | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details.
"""
# Initialize base classes
ReceivingModem.__init__(self, selected_receive_ports=selected_receive_ports, **kwargs)
TransmittingModem.__init__(
self,
*args,
bits_source=bits_source,
selected_transmit_ports=selected_transmit_ports,
**kwargs,
)
@property
def transmitting_device(self) -> Device | None:
return TransmittingModem.transmitting_device.fget(self) # type: ignore
@property
def receiving_device(self) -> Device | None:
return ReceivingModem.receiving_device.fget(self) # type: ignore
@TransmittingModem.device.setter # type: ignore
def device(self, value: Device) -> None:
TransmittingModem.device.fset(self, value)
ReceivingModem.device.fset(self, value)
[docs]
class SimplexLink(TransmittingModem, ReceivingModem):
"""Convenience class to manage a simplex communication link between two dedicated devices."""
yaml_tag = "SimplexLink"
__transmitting_device: Device # Transmitting device
__receiving_device: Device # Receiving device
def __init__(
self,
transmitting_device: Device,
receiving_device: Device,
*args,
bits_source: BitsSource | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
transmitting_device (Device):
Transmitting device.
receiving_device (Device):
Receiving device.
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details.
"""
self.__transmitting_device = transmitting_device
self.__receiving_device = receiving_device
TransmittingModem.__init__(
self, bits_source=bits_source, selected_transmit_ports=selected_transmit_ports, **kwargs
)
ReceivingModem.__init__(
self, *args, selected_receive_ports=selected_receive_ports, **kwargs
)
transmitting_device.transmitters.add(self)
receiving_device.receivers.add(self)
return
@property
def transmitting_device(self) -> Device:
"""The transmitting device.
Returns: Device handle.
"""
return self.__transmitting_device
@property
def reference(self) -> Device | None:
return self.transmitting_device
@reference.setter
def reference(self, value: Device | None) -> None:
if value is not self.transmitting_device and value is not None:
raise RuntimeError("Specifying the reference device of a simplex link is not supported")
@property
def receiving_device(self) -> Device:
"""The receiving device.
Returns: Device handle.
"""
return self.__receiving_device