# -*- coding: utf-8 -*-
from __future__ import annotations
from functools import cached_property
from typing import Generic, List, Set, Sequence, Tuple, Type, TypeVar
import numpy as np
from h5py import Group
from hermespy.fec import EncoderManager
from hermespy.core import (
Device,
HDFSerializable,
RandomNode,
Transmission,
TransmitSignalCoding,
Reception,
Serializable,
Signal,
TransmitState,
Transmitter,
Receiver,
ReceiveState,
ReceiveSignalCoding,
)
from .bits_source import BitsSource, RandomBitsSource
from .precoding import TransmitSymbolCoding, ReceiveSymbolCoding
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT
from .frame_generator import FrameGenerator, FrameGeneratorStub
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class CommunicationTransmissionFrame(Transmission):
"""A single synchronized frame of information generated by transmittgin over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
signal: Signal
"""Communication base-band waveform."""
bits: np.ndarray
"""Communication data bits."""
bit_block_size: int
"""Block size of the forward error correction input."""
encoded_bits: np.ndarray
"""Transmitted bits after FEC encoding."""
code_block_size: int
"""Block size of the forward error correction output."""
symbols: Symbols
"""Communication data symbols."""
encoded_symbols: Symbols
"""Communication data symbols after symbol encoding."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
def __init__(
self,
signal: Signal,
bits: np.ndarray,
bit_block_size: int,
encoded_bits: np.ndarray,
code_block_size: int,
symbols: Symbols,
encoded_symbols: Symbols,
timestamp: float,
) -> None:
"""
Args:
signal (Signal):
Transmitted communication base-band waveform.
bits (numpy.ndarray):
Transmitted communication data bits.
bit_block_size (int):
Block size of the forward error correction input.
encoded_bits (numpy.ndarray):
Transmitted communication bits after FEC encoding.
code_block_size (int):
Block size of the forward error correction output
symbols (Symbols):
Transmitted communication data symbols.
encoded_symbols (Symbols):
Transmitted communication data symbols after symbol encoding.
timestamp (float):
Time at which the frame was transmitted in seconds.
"""
# Initialize base class
Transmission.__init__(self, signal=signal)
# Initialize class attributes
self.bits = bits
self.bit_block_size = bit_block_size
self.encoded_bits = encoded_bits
self.code_block_size = code_block_size
self.symbols = symbols
self.encoded_symbols = encoded_symbols
self.timestamp = timestamp
@classmethod
def from_HDF(
cls: Type[CommunicationTransmissionFrame],
group: Group,
transmitter: Transmitter | None = None,
) -> CommunicationTransmissionFrame:
# Recall groups
signal = Signal.from_HDF(group["signal"])
symbols = Symbols.from_HDF(group["symbols"])
encoded_symbols = Symbols.from_HDF(group["encoded_symbols"])
# Recall datasets
bits = np.array(group["bits"], dtype=np.uint8)
encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8)
# Recall attributes
bit_block_size = group.attrs.get("bit_block_size", 1)
code_block_size = group.attrs.get("code_block_size", 1)
timestamp = group.attrs.get("timestamp", 0)
# Initialize object from recalled state
return cls(
signal=signal,
bits=bits,
bit_block_size=bit_block_size,
encoded_bits=encoded_bits,
code_block_size=code_block_size,
symbols=symbols,
encoded_symbols=encoded_symbols,
timestamp=timestamp,
)
def to_HDF(self, group: Group) -> None:
# Serialize base class
self.signal.to_HDF(group.create_group("signal"))
# Serialize groups
self.symbols.to_HDF(group.create_group("symbols"))
self.encoded_symbols.to_HDF(group.create_group("encoded_symbols"))
# Serialize datasets
group.create_dataset("bits", data=self.bits)
group.create_dataset("encoded_bits", data=self.encoded_bits)
# Serialize attributes
group.attrs["bit_block_size"] = self.bit_block_size
group.attrs["code_block_size"] = self.code_block_size
group.attrs["timestamp"] = self.timestamp
[docs]
class CommunicationTransmission(Transmission):
"""Collection of information generated by transmitting over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
frames: List[CommunicationTransmissionFrame]
"""Individual transmitted communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None
) -> None:
"""
Args:
signal (Signal):
Transmitted communication base-band waveform.
frames (List[CommunicationTransmissionFrame], optional):
Individual transmitted communication frames.
"""
Transmission.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of transmitted communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def bits(self) -> np.ndarray:
"""Transmitted bits before FEC encoding.
Returns: Numpy array of transmitted bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
@classmethod
def from_HDF(cls: Type[CommunicationTransmission], group: Group) -> CommunicationTransmission:
# Recall base signal
signal = Signal.from_HDF(group["signal"])
# Recall individual detected frames
num_frames = group.attrs.get("num_frames", 0)
frames = [
CommunicationTransmissionFrame.from_HDF(group[f"frame_{f:02d}"])
for f in range(num_frames)
]
return cls(signal=signal, frames=frames)
def to_HDF(self, group: Group) -> None:
# Serialize attributes
group.attrs["num_frames"] = self.num_frames
# Serialize base information
self.signal.to_HDF(group.create_group("signal"))
# Serialize detected frames
for f, frame in enumerate(self.frames):
frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs]
class CommunicationReceptionFrame(HDFSerializable):
"""A single synchronized frame of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
signal: Signal
"""Communication base-band waveform."""
decoded_signal: Signal
"""Communication base-band waveform after MIMO stream decoding."""
symbols: Symbols
"""Received communication symbols."""
decoded_symbols: Symbols
"""Received communication symbols after precoding stage."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
equalized_symbols: Symbols
"""Equalized communication symbols."""
encoded_bits: np.ndarray
"""Received encoded data bits before error correction."""
code_block_size: int
"""Block size of the forward error correction input."""
decoded_bits: np.ndarray
"""Received decoded data bits after error correction."""
bit_block_size: int
"""Block size of the forward error correction output."""
def __init__(
self,
signal: Signal,
decoded_signal: Signal,
symbols: Symbols,
decoded_symbols: Symbols,
timestamp: float,
equalized_symbols: Symbols,
encoded_bits: np.ndarray,
code_block_size: int,
decoded_bits: np.ndarray,
bit_block_size: int,
) -> None:
"""
Args:
signal (Signal):
Received communication base-band waveform.
decoded_signal (Signal):
Received communication base-band waveform after MIMO stream decoding.
symbols (Symbols):
Received communication symbols.
decoded_symbols (Symbols):
Received communication symbols after precoding stage.
timestamp (float):
Time at which the frame was transmitted in seconds.
equalized_symbols (Symbols):
Equalized communication symbols.
encoded_bits (numpy.ndarray):
Received encoded data bits before error correction.
code_block_size (int):
Block size of the forward error correction input.
decoded_bits (numpy.ndarray):
Received decoded data bits after error correction.
bit_block_size (int):
Block size of the forward error correction output.
"""
self.signal = signal
self.decoded_signal = decoded_signal
self.symbols = symbols
self.decoded_symbols = decoded_symbols
self.timestamp = timestamp
self.equalized_symbols = equalized_symbols
self.encoded_bits = encoded_bits
self.code_block_size = code_block_size
self.decoded_bits = decoded_bits
self.bit_block_size = bit_block_size
@classmethod
def from_HDF(
cls: Type[CommunicationReceptionFrame], group: Group
) -> CommunicationReceptionFrame:
# Recall groups
signal = Signal.from_HDF(group["signal"])
decoded_signal = Signal.from_HDF(group["decoded_signal"])
symbols = Symbols.from_HDF(group["symbols"])
decoded_symbols = Symbols.from_HDF(group["decoded_symbols"])
equalized_symbols = Symbols.from_HDF(group["equalized_symbols"])
# Recall datasets
encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8)
decoded_bits = np.array(group["decoded_bits"], dtype=np.uint8)
# Recall attributes
code_block_size = group.attrs.get("code_block_size", 1)
bit_block_size = group.attrs.get("bit_block_size", 1)
timestamp = group.attrs.get("timestamp", 0)
# Initialize object from recalled state
return cls(
signal=signal,
decoded_signal=decoded_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
code_block_size=code_block_size,
decoded_bits=decoded_bits,
bit_block_size=bit_block_size,
timestamp=timestamp,
)
def to_HDF(self, group: Group) -> None:
# Serialize groups
self.signal.to_HDF(group.create_group("signal"))
self.decoded_signal.to_HDF(group.create_group("decoded_signal"))
self.symbols.to_HDF(group.create_group("symbols"))
self.decoded_symbols.to_HDF(group.create_group("decoded_symbols"))
self.equalized_symbols.to_HDF(group.create_group("equalized_symbols"))
# Serialize datasets
group.create_dataset("encoded_bits", data=self.encoded_bits)
group.create_dataset("decoded_bits", data=self.decoded_bits)
# Serialize attributes
group.attrs["code_block_size"] = self.code_block_size
group.attrs["bit_block_size"] = self.bit_block_size
group.attrs["timestamp"] = self.timestamp
[docs]
class CommunicationReception(Reception):
"""Collection of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
frames: List[CommunicationReceptionFrame]
"""Individual received communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None
) -> None:
"""
Args:
signal (Signal):
Received communication base-band waveform.
frames (List[CommunicationReceptionFrame], optional):
Individual received communication frames.
"""
Reception.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of received communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def encoded_bits(self) -> np.ndarray:
"""Received bits before FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.encoded_bits)
return concatenated_bits
@cached_property
def bits(self) -> np.ndarray:
"""Received bits after FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.decoded_bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
"""Collect all communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
@cached_property
def equalized_symbols(self) -> Symbols:
"""Collect all equalized communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.equalized_symbols)
return symbols
@classmethod
def from_HDF(cls: Type[CommunicationReception], group: Group) -> CommunicationReception:
# Recall base signal
signal = Signal.from_HDF(group["signal"])
# Recall individual detected frames
num_frames = group.attrs.get("num_frames", 0)
frames = [
CommunicationReceptionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames)
]
return cls(signal=signal, frames=frames)
def to_HDF(self, group: Group) -> None:
# Serialize attributes
group.attrs["num_frames"] = self.num_frames
# Serialize base information
self.signal.to_HDF(group.create_group("signal"))
# Serialize detected frames
for f, frame in enumerate(self.frames):
frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs]
class BaseModem(Generic[CWT], RandomNode):
"""Base class for wireless modems transmitting or receiving information over devices.
Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience
wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface.
"""
__encoder_manager: EncoderManager
__waveform: CWT | None
__frame_generator: FrameGenerator
@staticmethod
def _arg_signature() -> Set[str]:
return {"encoding", "waveform", "seed"}
def __init__(
self,
encoding: EncoderManager | None = None,
waveform: CWT | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
encoding (EncoderManager, optional):
Bit coding configuration.
Encodes communication bit frames during transmission and decodes them during reception.
waveform (CWT, optional):
The waveform to be transmitted by this modem.
seed (int, optional):
Seed used to initialize the pseudo-random number generator.
"""
# Base class initialization
RandomNode.__init__(self, seed=seed)
self.encoder_manager = EncoderManager() if encoding is None else encoding
self.waveform = waveform
self.frame_generator = FrameGeneratorStub() if frame_generator is None else frame_generator
@property
def encoder_manager(self) -> EncoderManager:
"""Description of the modem's forward error correction.
Refer to :doc:`fec` for further information.
"""
return self.__encoder_manager
@encoder_manager.setter
def encoder_manager(self, new_manager: EncoderManager) -> None:
self.__encoder_manager = new_manager
self.__encoder_manager.random_mother = self
new_manager.modem = self
@property
def waveform(self) -> CWT | None:
"""Description of the communication waveform emitted by this modem.
The only mandatory attribute required to transmit or receive over a
:class:`modem<BaseModem>`.
"""
return self.__waveform
@waveform.setter
def waveform(self, value: CWT | None) -> None:
self.__waveform = value
if value is not None:
value.modem = self
value.random_mother = self
@property
def frame_generator(self) -> FrameGenerator:
return self.__frame_generator
@frame_generator.setter
def frame_generator(self, value: FrameGenerator) -> None:
self.__frame_generator = value
@property
def samples_per_frame(self) -> int:
"""Number of discrete-time samples per processed communication frame.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.samples_per_frame
@property
def frame_duration(self) -> float:
"""Duration of a single communication frame in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property.
"""
return self.waveform.frame_duration
@property
def symbol_duration(self) -> float:
"""Duration of a single communication symbol in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property.
"""
return self.waveform.symbol_duration
@property
def sampling_rate(self) -> float:
"""Sampling rate of the processed waveforms in Hz.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.sampling_rate
BMT = TypeVar("BMT", bound=BaseModem)
"""Type of base modem."""
[docs]
class TransmittingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms transmitting information."""
__bits_source: BitsSource
__transmit_symbol_coding: TransmitSymbolCoding
__transmit_signal_coding: TransmitSignalCoding
def __init__(self, bits_source: BitsSource | None = None, *args, **kwargs) -> None:
"""
Args:
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
"""
# Initialize base classes
BaseModem.__init__(self, *args, **kwargs)
# Initialize clas attributes
self.bits_source = RandomBitsSource() if bits_source is None else bits_source
self.__transmit_symbol_coding = TransmitSymbolCoding()
self.__transmit_signal_coding = TransmitSignalCoding()
@property
def bits_source(self) -> BitsSource:
"""Source configuration of communication transmitted by this modem.
Returns: A handle to the source configuration.
"""
return self.__bits_source
@bits_source.setter
def bits_source(self, value: BitsSource):
value.random_mother = self
self.__bits_source = value
@property
def transmit_signal_coding(self) -> TransmitSignalCoding:
"""Stream MIMO coding configuration during signal transmission."""
return self.__transmit_signal_coding
@transmit_signal_coding.setter
def transmit_signal_coding(self, value: TransmitSignalCoding) -> None:
self.__transmit_signal_coding = value
@property
def transmit_symbol_coding(self) -> TransmitSymbolCoding:
"""Complex communication symbol coding configuration during transmission."""
return self.__transmit_symbol_coding
@transmit_symbol_coding.setter
def transmit_symbol_coding(self, value: TransmitSymbolCoding) -> None:
self.__transmit_symbol_coding = value
def __map(self, bits: np.ndarray, num_streams: int) -> Symbols:
"""Map a block of information bits to commuication symbols.
Args:
bits (numpy.ndarray):
The information bits to be mapped.
Returns:
A series of communication symbols.
"""
symbols = Symbols()
for frame_bits in np.reshape(bits, (num_streams, -1)):
symbols.append_stream(self.waveform.map(frame_bits))
return symbols
def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal:
"""Modulates a sequence of MIMO signals into a base-band communication waveform.
Args:
symbols (Symbols):
Communication symbols to be modulated.
carrier_frequency (float):
Carrier frequency of the communication signal.
Returns:
The modualted base-band communication frame.
"""
# For each stream resulting from the initial encoding stage
# Place and encode the symbols according to the stream transmit coding configuration
frame_samples = np.empty(
(symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex128
)
for s, stream_symbols in enumerate(symbols.raw):
placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :]))
# Modulate each placed symbol stream individually to its base-band signal representation
frame_samples[s, :] = self.waveform.modulate(placed_symbols)
# Apply the stream transmit coding configuration
frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency)
return frame_signal
[docs]
def _transmit(self, device: TransmitState, duration: float) -> CommunicationTransmission:
# By default, the generated signal's duration will be exactly one frame
if duration <= 0.0:
duration = self.frame_duration
# Compute the number of frames to be transmitted within the given duration
frame_duration = self.frame_duration
num_mimo_frames = int(duration / frame_duration)
# Compute the number of parallel symbol streams required to transmit over the given coding config
num_input_signal_streams = self.transmit_signal_coding.num_transmit_input_streams(
device.num_digital_transmit_ports
)
num_input_symbol_streams = self.transmit_symbol_coding.num_transmit_input_streams(
num_input_signal_streams
)
# Compute the number of serial data bits required to generate a single frame over the given precoding config
required_num_data_symbols = int(
self.waveform.num_data_symbols * self.transmit_symbol_coding.encode_rate
)
required_num_code_bits = (
self.waveform.bits_per_frame(required_num_data_symbols) * num_input_symbol_streams
)
required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits)
signal = Signal.Empty(
self.waveform.sampling_rate,
device.num_digital_transmit_ports,
carrier_frequency=device.carrier_frequency,
)
# Abort if no frame is to be transmitted within the current duration
if num_mimo_frames < 1:
transmission = CommunicationTransmission(signal)
return transmission
frames: List[CommunicationTransmissionFrame] = []
for n in range(num_mimo_frames):
# Generate plain data bits
frame_bits = self.frame_generator.pack_frame(self.bits_source, required_num_data_bits)
# Apply forward error correction
encoded_bits = self.encoder_manager.encode(frame_bits, required_num_code_bits)
# Map bits to communication symbols
mapped_symbols = self.__map(encoded_bits, num_input_symbol_streams)
# Apply the first symbol precoding cofiguration
encoded_symbols = self.transmit_symbol_coding.encode_symbols(
StatedSymbols(
mapped_symbols.raw,
np.ones(
(
mapped_symbols.num_streams,
1,
mapped_symbols.num_blocks,
mapped_symbols.num_symbols,
),
dtype=np.complex128,
),
),
num_input_signal_streams,
)
# Modulate symbols to a base-band signal
frame_signal = self.__modulate(encoded_symbols, device.carrier_frequency)
# Apply stream encoding configuration
encoded_frame_signal = self.transmit_signal_coding.encode_streams(frame_signal, device)
# Save results
signal.append_samples(encoded_frame_signal)
frames.append(
CommunicationTransmissionFrame(
signal=frame_signal,
bits=frame_bits,
bit_block_size=self.encoder_manager.bit_block_size,
encoded_bits=encoded_bits,
code_block_size=self.encoder_manager.code_block_size,
symbols=mapped_symbols,
encoded_symbols=encoded_symbols,
timestamp=n * frame_duration,
)
)
# Save the transmitted information
transmission = CommunicationTransmission(signal, frames)
return transmission
[docs]
class TransmittingModem(
TransmittingModemBase[CommunicationWaveform],
Transmitter[CommunicationTransmission],
Serializable,
):
"""Representation of a wireless modem exclusively transmitting."""
yaml_tag = "TxModem"
def __init__(
self, *args, selected_transmit_ports: Sequence[int] | None = None, **kwargs
) -> None:
"""
Args:
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModemBase` for further details.
"""
# Initialize base classes
# Note that the initialization order matters here
Transmitter.__init__(self, selected_transmit_ports=selected_transmit_ports)
TransmittingModemBase.__init__(self, *args, **kwargs)
Serializable.__init__(self)
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
def _recall_transmission(self, group: Group) -> CommunicationTransmission:
return CommunicationTransmission.from_HDF(group)
[docs]
class ReceivingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms receiving information."""
__receive_symbol_coding: ReceiveSymbolCoding
__receive_signal_coding: ReceiveSignalCoding
def __init__(self, *args, **kwargs) -> None:
# Initialize base class
BaseModem.__init__(self, *args, **kwargs)
# Initialize class attributes
self.__receive_symbol_coding = ReceiveSymbolCoding()
self.__receive_signal_coding = ReceiveSignalCoding()
@property
def receive_signal_coding(self) -> ReceiveSignalCoding:
"""Stream MIMO coding configuration during signal reception."""
return self.__receive_signal_coding
@receive_signal_coding.setter
def receive_signal_coding(self, value: ReceiveSignalCoding) -> None:
self.__receive_signal_coding = value
@property
def receive_symbol_coding(self) -> ReceiveSymbolCoding:
"""Complex communication symbol coding configuration during reception."""
return self.__receive_symbol_coding
@receive_symbol_coding.setter
def receive_symbol_coding(self, value: ReceiveSymbolCoding) -> None:
self.__receive_symbol_coding = value
def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]:
"""Synchronize a received MIMO base-band stream.
Converts the stream into sections representing communication frames.
Args:
received_signal (Signal):
The MIMO signal received over the operated device's RF chain.
Returns:
A sequence signals representing communication frames and their respective detection indices.
"""
# Synchronize raw MIMO data into frames
frame_start_indices = self.waveform.synchronization.synchronize(received_signal.getitem())
frame_length = self.waveform.samples_per_frame
synchronized_signals = []
for frame_start in frame_start_indices:
frame_stop = frame_start + frame_length
frame_samples = received_signal.getitem(
(slice(None, None), slice(frame_start, frame_stop))
)
# Pad the frame if it is too short
# This may happen if the last frame is incomplete, or synhronization is not perfect
if frame_samples.shape[1] < frame_length:
frame_samples = np.pad(
frame_samples,
((0, 0), (0, frame_length - frame_samples.shape[1])),
mode="constant",
)
frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate)
synchronized_signals.append(frame_signal)
return frame_start_indices, synchronized_signals
def __demodulate(self, frame: Signal) -> Symbols:
"""Demodulates a sequence of synchronized MIMO signals into data symbols.
Args:
frame (Signal):
Synchronized MIMO signal, representing the samples of a full communication frame.
Returns:
Demodulated frame symbols.
"""
symbols = Symbols()
for stream in frame.getitem():
stream_symbols = self.waveform.demodulate(stream)
symbols.append_stream(stream_symbols)
return symbols
def __unmap(self, symbols: Symbols) -> np.ndarray:
"""Unmap a set of communication symbols to information bits.
Args:
symbols (Symbols):
The communication symbols to be unmapped
Returns:
A numpy array containing hard information bits.
"""
bits = np.empty(0, dtype=np.uint8)
for stream in symbols.raw:
bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :])))
return bits
[docs]
def _receive(self, signal: Signal, device: ReceiveState) -> CommunicationReception:
# Resample the signal to match the waveform's requirements
signal = signal.resample(self.waveform.sampling_rate)
# Decode the signal using the receive signal coding configuration
decoded_signal = self.receive_signal_coding.decode_streams(signal, device)
# Synchronize incoming signals
frame_start_indices, synchronized_signals = self.__synchronize(decoded_signal)
# Abort if no frame has been detected
if len(synchronized_signals) < 1:
reception = CommunicationReception(signal)
return reception
# Compute the bit generation requirements
# Compute the number of serial data bits required to generate a single frame over the given precoding config
required_num_data_symbols = int(
self.waveform.num_data_symbols * self.receive_symbol_coding.decode_rate
)
required_num_code_bits = self.waveform.bits_per_frame(
required_num_data_symbols
) * self.receive_symbol_coding.num_receive_output_streams(decoded_signal.num_streams)
required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits)
# Process each frame independently
frames: List[CommunicationReceptionFrame] = []
for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals):
# Demodulate raw symbols for each frame independtly
symbols = self.__demodulate(frame_signal)
# Estimate the channel from each frame demodulation
frame_delay = frame_index / self.waveform.sampling_rate
stated_symbols = self.waveform.estimate_channel(symbols, frame_delay)
# Extract data symbols to be decoded
picked_symbols = self.waveform.pick(stated_symbols)
# Decode the pre-equalization symbol precoding stage
decoded_symbols = self.receive_symbol_coding.decode_symbols(picked_symbols)
# Equalize the received symbols for each frame given the estimated channel state
equalized_symbols = self.waveform.equalize_symbols(decoded_symbols)
# Unmap equalized symbols to information bits
encoded_bits = self.__unmap(equalized_symbols)
# Apply inverse FEC configuration to correct errors and remove redundancies
decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits)
# Decode the frame
payload_bits = self.frame_generator.unpack_frame(decoded_bits)
# Store the received information
frames.append(
CommunicationReceptionFrame(
signal=frame_signal,
decoded_signal=frame_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
timestamp=frame_index * signal.sampling_rate,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
code_block_size=self.encoder_manager.code_block_size,
decoded_bits=payload_bits,
bit_block_size=self.encoder_manager.bit_block_size,
)
)
# Store the received information of all frames
reception = CommunicationReception(signal=signal, frames=frames)
return reception
[docs]
class ReceivingModem(
ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable
):
"""Representation of a wireless modem exclusively receiving."""
yaml_tag = "RxModem"
"""YAML serialization tag"""
def __init__(
self, *args, selected_receive_ports: Sequence[int] | None = None, **kwargs
) -> None:
"""
Args:
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available antenna ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`ReceivingModemBase` for further details.
"""
# Initialize base classes
# Note that the initialization order matters here
Receiver.__init__(self, selected_receive_ports=selected_receive_ports)
ReceivingModemBase.__init__(self, *args, **kwargs)
Serializable.__init__(self)
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
def _recall_reception(self, group: Group) -> CommunicationReception:
return CommunicationReception.from_HDF(group)
[docs]
class DuplexModem(TransmittingModem, ReceivingModem):
"""Representation of a wireless modem simultaneously transmitting and receiving."""
yaml_tag = "Modem"
def __init__(
self,
*args,
bits_source: BitsSource | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details.
"""
# Initialize base classes
ReceivingModem.__init__(self, selected_receive_ports=selected_receive_ports, **kwargs)
TransmittingModem.__init__(
self,
*args,
bits_source=bits_source,
selected_transmit_ports=selected_transmit_ports,
**kwargs,
)
[docs]
class SimplexLink(TransmittingModem, ReceivingModem):
"""Convenience class to manage a simplex communication link between two dedicated devices."""
yaml_tag = "SimplexLink"
def __init__(
self,
*args,
bits_source: BitsSource | None = None,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
**kwargs,
) -> None:
"""
Args:
bits_source (BitsSource, optional):
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
selected_transmit_ports (Sequence[int] | None):
Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports (Sequence[int] | None):
Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array.
If not specified, all available ports will be considered.
*args, \**kwargs:
Modem initialization parameters.
Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details.
"""
TransmittingModem.__init__(
self, bits_source=bits_source, selected_transmit_ports=selected_transmit_ports, **kwargs
)
ReceivingModem.__init__(
self, *args, selected_receive_ports=selected_receive_ports, **kwargs
)
[docs]
def connect(self, transmitting_device: Device, receiving_device: Device) -> None:
"""Connect two devices by this simplex link.
Convenience method that assigns this DSP layer as a transmitter to the transmitting device and as a receiver to the receiving device.
Args:
transmitting_device (Device):
Device transmitting over the simplex link.
receiving_device (Device):
Device receiving over the simplex link.
"""
transmitting_device.transmitters.add(self)
receiving_device.receivers.add(self)