# -*- coding: utf-8 -*-
from __future__ import annotations
from functools import cached_property
from typing import Generic, List, Sequence, Tuple, Type, TypeVar
from typing_extensions import override
import numpy as np
from hermespy.fec import Encoder, EncoderManager
from hermespy.core import (
Device,
DeserializationProcess,
RandomNode,
Serializable,
SerializationProcess,
Transmission,
TransmitSignalCoding,
TransmitStreamEncoder,
Reception,
Signal,
TransmitState,
Transmitter,
Receiver,
ReceiveState,
ReceiveSignalCoding,
ReceiveStreamDecoder,
)
from .bits_source import BitsSource, RandomBitsSource
from .precoding import (
TransmitSymbolCoding,
TransmitSymbolEncoder,
ReceiveSymbolCoding,
ReceiveSymbolDecoder,
)
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT
from .frame_generator import FrameGenerator, FrameGeneratorStub
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class CommunicationTransmissionFrame(Transmission):
"""A single synchronized frame of information generated by transmittgin over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
signal: Signal
"""Communication base-band waveform."""
bits: np.ndarray
"""Communication data bits."""
bit_block_size: int
"""Block size of the forward error correction input."""
encoded_bits: np.ndarray
"""Transmitted bits after FEC encoding."""
code_block_size: int
"""Block size of the forward error correction output."""
symbols: Symbols
"""Communication data symbols."""
encoded_symbols: Symbols
"""Communication data symbols after symbol encoding."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
def __init__(
self,
signal: Signal,
bits: np.ndarray,
bit_block_size: int,
encoded_bits: np.ndarray,
code_block_size: int,
symbols: Symbols,
encoded_symbols: Symbols,
timestamp: float,
) -> None:
"""
Args:
signal:
Transmitted communication base-band waveform.
bits:
Transmitted communication data bits.
bit_block_size:
Block size of the forward error correction input.
encoded_bits:
Transmitted communication bits after FEC encoding.
code_block_size:
Block size of the forward error correction output
symbols:
Transmitted communication data symbols.
encoded_symbols:
Transmitted communication data symbols after symbol encoding.
timestamp:
Time at which the frame was transmitted in seconds.
"""
# Initialize base class
Transmission.__init__(self, signal=signal)
# Initialize class attributes
self.bits = bits
self.bit_block_size = bit_block_size
self.encoded_bits = encoded_bits
self.code_block_size = code_block_size
self.symbols = symbols
self.encoded_symbols = encoded_symbols
self.timestamp = timestamp
[docs]
def serialize(self, process: SerializationProcess) -> None:
Transmission.serialize(self, process)
process.serialize_array(self.bits, "bits")
process.serialize_array(self.encoded_bits, "encoded_bits")
process.serialize_integer(self.bit_block_size, "bit_block_size")
process.serialize_integer(self.code_block_size, "code_block_size")
process.serialize_object(self.symbols, "symbols")
process.serialize_object(self.encoded_symbols, "encoded_symbols")
process.serialize_floating(self.timestamp, "timestamp")
[docs]
@classmethod
def Deserialize(cls, process: DeserializationProcess) -> CommunicationTransmissionFrame:
signal = process.deserialize_object("signal", Signal)
bits = process.deserialize_array("bits", np.int64)
encoded_bits = process.deserialize_array("encoded_bits", np.int64)
bit_block_size = process.deserialize_integer("bit_block_size")
code_block_size = process.deserialize_integer("code_block_size")
symbols = process.deserialize_object("symbols", Symbols)
encoded_symbols = process.deserialize_object("encoded_symbols", Symbols)
timestamp = process.deserialize_floating("timestamp")
return cls(
signal=signal,
bits=bits,
bit_block_size=bit_block_size,
encoded_bits=encoded_bits,
code_block_size=code_block_size,
symbols=symbols,
encoded_symbols=encoded_symbols,
timestamp=timestamp,
)
[docs]
class CommunicationTransmission(Transmission):
"""Collection of information generated by transmitting over a modem.
Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a
:class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance.
"""
frames: List[CommunicationTransmissionFrame]
"""Individual transmitted communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None
) -> None:
"""
Args:
signal:
Transmitted communication base-band waveform.
frames:
Individual transmitted communication frames.
"""
Transmission.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of transmitted communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def bits(self) -> np.ndarray:
"""Transmitted bits before FEC encoding.
Returns: Numpy array of transmitted bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
[docs]
def serialize(self, process: SerializationProcess) -> None:
Transmission.serialize(self, process)
process.serialize_object_sequence(self.frames, "frames")
[docs]
@classmethod
def Deserialize(
cls: Type[CommunicationTransmission], process: DeserializationProcess
) -> CommunicationTransmission:
return cls(
process.deserialize_object("signal", Signal),
list(process.deserialize_object_sequence("frames", CommunicationTransmissionFrame)),
)
[docs]
class CommunicationReceptionFrame(Reception):
"""A single synchronized frame of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
decoded_signal: Signal
"""Communication base-band waveform after MIMO stream decoding."""
symbols: Symbols
"""Received communication symbols."""
decoded_symbols: Symbols
"""Received communication symbols after precoding stage."""
timestamp: float
"""Time at which the frame was transmitted in seconds."""
equalized_symbols: Symbols
"""Equalized communication symbols."""
encoded_bits: np.ndarray
"""Received encoded data bits before error correction."""
code_block_size: int
"""Block size of the forward error correction input."""
decoded_bits: np.ndarray
"""Received decoded data bits after error correction."""
bit_block_size: int
"""Block size of the forward error correction output."""
def __init__(
self,
signal: Signal,
decoded_signal: Signal,
symbols: Symbols,
decoded_symbols: Symbols,
timestamp: float,
equalized_symbols: Symbols,
encoded_bits: np.ndarray,
code_block_size: int,
decoded_bits: np.ndarray,
bit_block_size: int,
) -> None:
"""
Args:
signal:
Received communication base-band waveform.
decoded_signal:
Received communication base-band waveform after MIMO stream decoding.
symbols:
Received communication symbols.
decoded_symbols:
Received communication symbols after precoding stage.
timestamp:
Time at which the frame was transmitted in seconds.
equalized_symbols:
Equalized communication symbols.
encoded_bits:
Received encoded data bits before error correction.
code_block_size:
Block size of the forward error correction input.
decoded_bits:
Received decoded data bits after error correction.
bit_block_size:
Block size of the forward error correction output.
"""
# Initialize base class
Reception.__init__(self, signal)
# Initialize class attributes
self.signal = signal
self.decoded_signal = decoded_signal
self.symbols = symbols
self.decoded_symbols = decoded_symbols
self.timestamp = timestamp
self.equalized_symbols = equalized_symbols
self.encoded_bits = encoded_bits
self.code_block_size = code_block_size
self.decoded_bits = decoded_bits
self.bit_block_size = bit_block_size
[docs]
def serialize(self, process: SerializationProcess) -> None:
Reception.serialize(self, process)
process.serialize_object(self.decoded_signal, "decoded_signal")
process.serialize_object(self.symbols, "symbols")
process.serialize_object(self.decoded_symbols, "decoded_symbols")
process.serialize_object(self.equalized_symbols, "equalized_symbols")
process.serialize_array(self.encoded_bits, "encoded_bits")
process.serialize_array(self.decoded_bits, "decoded_bits")
process.serialize_integer(self.code_block_size, "code_block_size")
process.serialize_integer(self.bit_block_size, "bit_block_size")
process.serialize_floating(self.timestamp, "timestamp")
[docs]
@classmethod
def Deserialize(
cls: Type[CommunicationReceptionFrame], process: DeserializationProcess
) -> CommunicationReceptionFrame:
signal = process.deserialize_object("signal", Signal)
decoded_signal = process.deserialize_object("decoded_signal", Signal)
symbols = process.deserialize_object("symbols", Symbols)
decoded_symbols = process.deserialize_object("decoded_symbols", Symbols)
equalized_symbols = process.deserialize_object("equalized_symbols", Symbols)
encoded_bits = process.deserialize_array("encoded_bits", np.int64)
decoded_bits = process.deserialize_array("decoded_bits", np.int64)
code_block_size = process.deserialize_integer("code_block_size")
bit_block_size = process.deserialize_integer("bit_block_size")
timestamp = process.deserialize_floating("timestamp")
return cls(
signal=signal,
decoded_signal=decoded_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
code_block_size=code_block_size,
decoded_bits=decoded_bits,
bit_block_size=bit_block_size,
timestamp=timestamp,
)
[docs]
class CommunicationReception(Reception):
"""Collection of information generated by receiving over a modem.
Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a
:class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance.
"""
frames: List[CommunicationReceptionFrame]
"""Individual received communication frames."""
def __init__(
self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None
) -> None:
"""
Args:
signal:
Received communication base-band waveform.
frames:
Individual received communication frames.
"""
Reception.__init__(self, signal=signal)
self.frames = [] if frames is None else frames
@property
def num_frames(self) -> int:
"""Number of received communication frames.
Returns:
Number of frames.
"""
return len(self.frames)
@cached_property
def encoded_bits(self) -> np.ndarray:
"""Received bits before FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.encoded_bits)
return concatenated_bits
@cached_property
def bits(self) -> np.ndarray:
"""Received bits after FEC decoding.
Returns:
Numpy array containing received bits.
"""
concatenated_bits = np.empty(0, dtype=np.uint8)
for frame in self.frames:
concatenated_bits = np.append(concatenated_bits, frame.decoded_bits)
return concatenated_bits
@cached_property
def symbols(self) -> Symbols:
"""Collect all communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.symbols)
return symbols
@cached_property
def equalized_symbols(self) -> Symbols:
"""Collect all equalized communication symbols."""
symbols = Symbols()
for frame in self.frames:
symbols.append_symbols(frame.equalized_symbols)
return symbols
[docs]
def serialize(self, process: SerializationProcess) -> None:
Reception.serialize(self, process)
process.serialize_object_sequence(self.frames, "frames")
[docs]
@classmethod
def Deserialize(
cls: Type[CommunicationReception], process: DeserializationProcess
) -> CommunicationReception:
return cls(
process.deserialize_object("signal", Signal),
list(process.deserialize_object_sequence("frames", CommunicationReceptionFrame)),
)
_BMT = TypeVar("_BMT", bound="BaseModem")
"""Type of base modem."""
[docs]
class BaseModem(Generic[CWT], RandomNode, Serializable):
"""Base class for wireless modems transmitting or receiving information over devices.
Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience
wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface.
"""
__encoder_manager: EncoderManager
__waveform: CWT | None
__frame_generator: FrameGenerator
def __init__(
self,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CWT | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
encoding:
Bit coding configuration.
Encodes communication bit frames during transmission and decodes them during reception.
waveform:
The waveform to be transmitted by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Base class initialization
RandomNode.__init__(self, seed=seed)
# Initialize the bit encoding configuration
if isinstance(encoding, EncoderManager):
self.encoder_manager = encoding
else:
self.encoder_manager = EncoderManager()
if encoding is not None:
for encoder in encoding.encoders if isinstance(encoding, EncoderManager) else encoding:
self.encoder_manager.add_encoder(encoder)
# Initialize the remaining attributes
self.waveform = waveform
self.frame_generator = FrameGeneratorStub() if frame_generator is None else frame_generator
@property
def encoder_manager(self) -> EncoderManager:
"""Description of the modem's forward error correction.
Refer to :class:`EncoderManager<hermespy.fec.coding.EncoderManager>` for further information.
"""
return self.__encoder_manager
@encoder_manager.setter
def encoder_manager(self, new_manager: EncoderManager) -> None:
self.__encoder_manager = new_manager
self.__encoder_manager.random_mother = self
new_manager.modem = self
@property
def waveform(self) -> CWT | None:
"""Description of the communication waveform emitted by this modem.
The only mandatory attribute required to transmit or receive over a
:class:`modem<BaseModem>`.
"""
return self.__waveform
@waveform.setter
def waveform(self, value: CWT | None) -> None:
self.__waveform = value
if value is not None:
value.modem = self
value.random_mother = self
@property
def frame_generator(self) -> FrameGenerator:
return self.__frame_generator
@frame_generator.setter
def frame_generator(self, value: FrameGenerator) -> None:
self.__frame_generator = value
@property
def samples_per_frame(self) -> int:
"""Number of discrete-time samples per processed communication frame.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.samples_per_frame
@property
def frame_duration(self) -> float:
"""Duration of a single communication frame in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property.
"""
return self.waveform.frame_duration
@property
def symbol_duration(self) -> float:
"""Duration of a single communication symbol in seconds.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property.
"""
return self.waveform.symbol_duration
@property
def sampling_rate(self) -> float:
"""Sampling rate of the processed waveforms in Hz.
Convenience wrapper for the :attr:`waveform<BaseModem.waveform>`
:attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property.
"""
return self.waveform.sampling_rate
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_object_sequence(self.encoder_manager.encoders, "bit_coding")
if self.waveform is not None:
process.serialize_object(self.waveform, "waveform")
process.serialize_object(self.frame_generator, "frame_generator")
@classmethod
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
return {
"encoding": process.deserialize_object_sequence("bit_coding", Encoder),
"waveform": process.deserialize_object("waveform", CommunicationWaveform, None),
"frame_generator": process.deserialize_object("frame_generator", FrameGenerator, None),
}
[docs]
@classmethod
@override
def Deserialize(cls: Type[_BMT], process: DeserializationProcess) -> _BMT:
parameters = cls._DeserializeParameters(process)
return cls(**parameters) # type: ignore[arg-type]
BMT = TypeVar("BMT", bound=BaseModem)
"""Type of base modem."""
[docs]
class TransmittingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms transmitting information."""
__bits_source: BitsSource
__transmit_symbol_coding: TransmitSymbolCoding
__transmit_signal_coding: TransmitSignalCoding
def __init__(
self,
bits_source: BitsSource | None = None,
transmit_symbol_coding: (
TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None
) = None,
transmit_signal_coding: (
TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None
) = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CWT | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
bits_source:
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
transmit_symbol_coding:
Complex communication symbol coding configuration during transmission.
transmit_signal_coding:
Stream MIMO coding configuration during signal transmission.
encoding:
Bit coding configuration.
waveform:
The waveform to be transmitted by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base class
BaseModem.__init__(self, encoding, waveform, frame_generator, seed)
# Initialize transmit symbol coding configuration
if isinstance(transmit_symbol_coding, TransmitSymbolCoding):
self.__transmit_symbol_coding = transmit_symbol_coding
else:
self.__transmit_symbol_coding = TransmitSymbolCoding()
if transmit_symbol_coding is not None:
for transmit_symbol_encoder in transmit_symbol_coding:
self.__transmit_symbol_coding[-1] = transmit_symbol_encoder
# Initialize transmit signal coding configuration
if isinstance(transmit_signal_coding, TransmitSignalCoding):
self.__transmit_signal_coding = transmit_signal_coding
else:
self.__transmit_signal_coding = TransmitSignalCoding()
if transmit_signal_coding is not None:
for transmit_signal_encoder in transmit_signal_coding:
self.__transmit_signal_coding[-1] = transmit_signal_encoder
# Initialize remaining attributes
self.bits_source = RandomBitsSource() if bits_source is None else bits_source
@property
def bits_source(self) -> BitsSource:
"""Source configuration of communication transmitted by this modem.
Returns: A handle to the source configuration.
"""
return self.__bits_source
@bits_source.setter
def bits_source(self, value: BitsSource):
value.random_mother = self
self.__bits_source = value
@property
def transmit_signal_coding(self) -> TransmitSignalCoding:
"""Stream MIMO coding configuration during signal transmission."""
return self.__transmit_signal_coding
@transmit_signal_coding.setter
def transmit_signal_coding(self, value: TransmitSignalCoding) -> None:
self.__transmit_signal_coding = value
@property
def transmit_symbol_coding(self) -> TransmitSymbolCoding:
"""Complex communication symbol coding configuration during transmission."""
return self.__transmit_symbol_coding
@transmit_symbol_coding.setter
def transmit_symbol_coding(self, value: TransmitSymbolCoding) -> None:
self.__transmit_symbol_coding = value
def __map(self, bits: np.ndarray, num_streams: int) -> Symbols:
"""Map a block of information bits to commuication symbols.
Args:
bits:
The information bits to be mapped.
Returns:
A series of communication symbols.
"""
symbols = Symbols()
for frame_bits in np.reshape(bits, (num_streams, -1)):
symbols.append_stream(self.waveform.map(frame_bits))
return symbols
def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal:
"""Modulates a sequence of MIMO signals into a base-band communication waveform.
Args:
symbols:
Communication symbols to be modulated.
carrier_frequency:
Carrier frequency of the communication signal.
Returns:
The modualted base-band communication frame.
"""
# For each stream resulting from the initial encoding stage
# Place and encode the symbols according to the stream transmit coding configuration
frame_samples = np.empty(
(symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex128
)
for s, stream_symbols in enumerate(symbols.raw):
placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :]))
# Modulate each placed symbol stream individually to its base-band signal representation
frame_samples[s, :] = self.waveform.modulate(placed_symbols)
# Apply the stream transmit coding configuration
frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency)
return frame_signal
[docs]
def _transmit(self, device: TransmitState, duration: float) -> CommunicationTransmission:
# By default, the generated signal's duration will be exactly one frame
if duration <= 0.0:
duration = self.frame_duration
# Compute the number of frames to be transmitted within the given duration
frame_duration = self.frame_duration
num_mimo_frames = int(duration / frame_duration)
# Compute the number of parallel symbol streams required to transmit over the given coding config
num_input_signal_streams = self.transmit_signal_coding.num_transmit_input_streams(
device.num_digital_transmit_ports
)
num_input_symbol_streams = self.transmit_symbol_coding.num_transmit_input_streams(
num_input_signal_streams
)
# Compute the number of serial data bits required to generate a single frame over the given precoding config
required_num_data_symbols = int(
self.waveform.num_data_symbols * self.transmit_symbol_coding.encode_rate
)
required_num_code_bits = (
self.waveform.bits_per_frame(required_num_data_symbols) * num_input_symbol_streams
)
required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits)
signal = Signal.Empty(
self.waveform.sampling_rate,
device.num_digital_transmit_ports,
carrier_frequency=device.carrier_frequency,
)
# Abort if no frame is to be transmitted within the current duration
if num_mimo_frames < 1:
transmission = CommunicationTransmission(signal)
return transmission
frames: List[CommunicationTransmissionFrame] = []
for n in range(num_mimo_frames):
# Generate plain data bits
frame_bits = self.frame_generator.pack_frame(self.bits_source, required_num_data_bits)
# Apply forward error correction
encoded_bits = self.encoder_manager.encode(frame_bits, required_num_code_bits)
# Map bits to communication symbols
mapped_symbols = self.__map(encoded_bits, num_input_symbol_streams)
# Apply the first symbol precoding cofiguration
encoded_symbols = self.transmit_symbol_coding.encode_symbols(
StatedSymbols(
mapped_symbols.raw,
np.ones(
(
mapped_symbols.num_streams,
1,
mapped_symbols.num_blocks,
mapped_symbols.num_symbols,
),
dtype=np.complex128,
),
),
num_input_signal_streams,
)
# Modulate symbols to a base-band signal
frame_signal = self.__modulate(encoded_symbols, device.carrier_frequency)
# Apply stream encoding configuration
encoded_frame_signal = self.transmit_signal_coding.encode_streams(frame_signal, device)
# Save results
signal.append_samples(encoded_frame_signal)
frames.append(
CommunicationTransmissionFrame(
signal=frame_signal,
bits=frame_bits,
bit_block_size=self.encoder_manager.bit_block_size,
encoded_bits=encoded_bits,
code_block_size=self.encoder_manager.code_block_size,
symbols=mapped_symbols,
encoded_symbols=encoded_symbols,
timestamp=n * frame_duration,
)
)
# Save the transmitted information
transmission = CommunicationTransmission(signal, frames)
return transmission
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
BaseModem.serialize(self, process)
process.serialize_object(self.bits_source, "bits_source")
process.serialize_object_sequence(self.transmit_symbol_coding, "transmit_symbol_coding")
process.serialize_object_sequence(self.transmit_signal_coding, "transmit_signal_coding")
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = BaseModem._DeserializeParameters(process)
parameters["bits_source"] = process.deserialize_object("bits_source", BitsSource)
parameters["transmit_symbol_coding"] = process.deserialize_object_sequence(
"transmit_symbol_coding", TransmitSymbolEncoder
)
parameters["transmit_signal_coding"] = process.deserialize_object_sequence(
"transmit_signal_coding", TransmitStreamEncoder
)
return parameters
[docs]
class TransmittingModem(
TransmittingModemBase[CommunicationWaveform], Transmitter[CommunicationTransmission]
):
"""Representation of a wireless modem exclusively transmitting."""
def __init__(
self,
selected_transmit_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
bits_source: BitsSource | None = None,
transmit_symbol_coding: (
TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None
) = None,
transmit_signal_coding: (
TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None
) = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CommunicationWaveform | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
selected_transmit_ports:
Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
carrier_frequency:
Carrier frequency of the transmitted communication signal.
bits_source:
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
transmit_symbol_coding:
Complex communication symbol coding configuration during transmission.
transmit_signal_coding:
Stream MIMO coding configuration during signal transmission.
encoding:
Bit coding configuration.
waveform:
The waveform to be transmitted by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
# Note that the initialization order matters here
Transmitter.__init__(self, seed, selected_transmit_ports, carrier_frequency)
TransmittingModemBase.__init__(
self,
bits_source,
transmit_symbol_coding,
transmit_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
Serializable.__init__(self)
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
Transmitter.serialize(self, process)
TransmittingModemBase.serialize(self, process)
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = Transmitter._DeserializeParameters(process)
parameters.update(TransmittingModemBase._DeserializeParameters(process))
return parameters
[docs]
@classmethod
@override
def Deserialize(
cls: Type[TransmittingModem], process: DeserializationProcess
) -> TransmittingModem:
parameters = cls._DeserializeParameters(process)
return cls(**parameters) # type: ignore[arg-type]
[docs]
class ReceivingModemBase(Generic[CWT], BaseModem[CWT]):
"""Base class of signal processing algorithms receiving information."""
__receive_symbol_coding: ReceiveSymbolCoding
__receive_signal_coding: ReceiveSignalCoding
def __init__(
self,
receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None,
receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CWT | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
receive_symbol_coding:
Complex communication symbol coding configuration during reception.
receive_signal_coding:
Stream MIMO coding configuration during signal reception.
encoding:
Bit coding configuration.
waveform:
The waveform to be transmitted by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base class
BaseModem.__init__(self, encoding, waveform, frame_generator, seed)
# Initialize receive symbol coding configuration
if isinstance(receive_symbol_coding, ReceiveSymbolCoding):
self.__receive_symbol_coding = receive_symbol_coding
else:
self.__receive_symbol_coding = ReceiveSymbolCoding()
if receive_symbol_coding is not None:
for receive_symbol_decoder in receive_symbol_coding:
self.__receive_symbol_coding[-1] = receive_symbol_decoder
# Initialize receive signal coding configuration
if isinstance(receive_signal_coding, ReceiveSignalCoding):
self.__receive_signal_coding = receive_signal_coding
else:
self.__receive_signal_coding = ReceiveSignalCoding()
if receive_signal_coding is not None:
for receive_signal_decoder in receive_signal_coding:
self.__receive_signal_coding[-1] = receive_signal_decoder
@property
def receive_signal_coding(self) -> ReceiveSignalCoding:
"""Stream MIMO coding configuration during signal reception."""
return self.__receive_signal_coding
@receive_signal_coding.setter
def receive_signal_coding(self, value: ReceiveSignalCoding) -> None:
self.__receive_signal_coding = value
@property
def receive_symbol_coding(self) -> ReceiveSymbolCoding:
"""Complex communication symbol coding configuration during reception."""
return self.__receive_symbol_coding
@receive_symbol_coding.setter
def receive_symbol_coding(self, value: ReceiveSymbolCoding) -> None:
self.__receive_symbol_coding = value
def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]:
"""Synchronize a received MIMO base-band stream.
Converts the stream into sections representing communication frames.
Args:
received_signal:
The MIMO signal received over the operated device's RF chain.
Returns:
A sequence signals representing communication frames and their respective detection indices.
"""
# Synchronize raw MIMO data into frames
frame_start_indices = self.waveform.synchronization.synchronize(received_signal.getitem())
frame_length = self.waveform.samples_per_frame
synchronized_signals = []
for frame_start in frame_start_indices:
frame_stop = frame_start + frame_length
frame_samples = received_signal.getitem(
(slice(None, None), slice(frame_start, frame_stop))
)
# Pad the frame if it is too short
# This may happen if the last frame is incomplete, or synhronization is not perfect
if frame_samples.shape[1] < frame_length:
frame_samples = np.pad(
frame_samples,
((0, 0), (0, frame_length - frame_samples.shape[1])),
mode="constant",
)
frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate)
synchronized_signals.append(frame_signal)
return frame_start_indices, synchronized_signals
def __demodulate(self, frame: Signal) -> Symbols:
"""Demodulates a sequence of synchronized MIMO signals into data symbols.
Args:
frame:
Synchronized MIMO signal, representing the samples of a full communication frame.
Returns:
Demodulated frame symbols.
"""
symbols = Symbols()
for stream in frame.getitem():
stream_symbols = self.waveform.demodulate(stream)
symbols.append_stream(stream_symbols)
return symbols
def __unmap(self, symbols: Symbols) -> np.ndarray:
"""Unmap a set of communication symbols to information bits.
Args:
symbols:
The communication symbols to be unmapped
Returns:
A numpy array containing hard information bits.
"""
bits = np.empty(0, dtype=np.uint8)
for stream in symbols.raw:
bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :])))
return bits
[docs]
def _receive(self, signal: Signal, device: ReceiveState) -> CommunicationReception:
# Resample the signal to match the waveform's requirements
signal = signal.resample(self.waveform.sampling_rate)
# Decode the signal using the receive signal coding configuration
decoded_signal = self.receive_signal_coding.decode_streams(signal, device)
# Synchronize incoming signals
frame_start_indices, synchronized_signals = self.__synchronize(decoded_signal)
# Abort if no frame has been detected
if len(synchronized_signals) < 1:
reception = CommunicationReception(signal)
return reception
# Compute the bit generation requirements
# Compute the number of serial data bits required to generate a single frame over the given precoding config
required_num_data_symbols = int(
self.waveform.num_data_symbols * self.receive_symbol_coding.decode_rate
)
required_num_code_bits = self.waveform.bits_per_frame(
required_num_data_symbols
) * self.receive_symbol_coding.num_receive_output_streams(decoded_signal.num_streams)
required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits)
# Process each frame independently
frames: List[CommunicationReceptionFrame] = []
for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals):
# Demodulate raw symbols for each frame independtly
symbols = self.__demodulate(frame_signal)
# Estimate the channel from each frame demodulation
frame_delay = frame_index / self.waveform.sampling_rate
stated_symbols = self.waveform.estimate_channel(symbols, frame_delay)
# Extract data symbols to be decoded
picked_symbols = self.waveform.pick(stated_symbols)
# Decode the pre-equalization symbol precoding stage
decoded_symbols = self.receive_symbol_coding.decode_symbols(picked_symbols)
# Equalize the received symbols for each frame given the estimated channel state
equalized_symbols = self.waveform.equalize_symbols(decoded_symbols)
# Unmap equalized symbols to information bits
encoded_bits = self.__unmap(equalized_symbols)
# Apply inverse FEC configuration to correct errors and remove redundancies
decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits)
# Decode the frame
payload_bits = self.frame_generator.unpack_frame(decoded_bits)
# Store the received information
frames.append(
CommunicationReceptionFrame(
signal=frame_signal,
decoded_signal=frame_signal,
symbols=symbols,
decoded_symbols=decoded_symbols,
timestamp=frame_index * signal.sampling_rate,
equalized_symbols=equalized_symbols,
encoded_bits=encoded_bits,
code_block_size=self.encoder_manager.code_block_size,
decoded_bits=payload_bits,
bit_block_size=self.encoder_manager.bit_block_size,
)
)
# Store the received information of all frames
reception = CommunicationReception(signal=signal, frames=frames)
return reception
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
BaseModem.serialize(self, process)
process.serialize_object_sequence(self.receive_symbol_coding, "receive_symbol_coding")
process.serialize_object_sequence(self.receive_signal_coding, "receive_signal_coding")
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = BaseModem._DeserializeParameters(process)
parameters["receive_symbol_coding"] = process.deserialize_object_sequence(
"receive_symbol_coding", ReceiveSymbolDecoder
)
parameters["receive_signal_coding"] = process.deserialize_object_sequence(
"receive_signal_coding", ReceiveStreamDecoder
)
return parameters
[docs]
class ReceivingModem(
ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable
):
"""Representation of a wireless modem exclusively receiving."""
def __init__(
self,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None,
receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CommunicationWaveform | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
selected_receive_ports:
Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available antenna ports will be considered.
carrier_frequency:
Carrier frequency of the received communication signal.
receive_symbol_coding:
Complex communication symbol coding configuration during reception.
receive_signal_coding:
Stream MIMO coding configuration during signal reception.
encoding:
Bit coding configuration.
waveform:
The waveform to be received by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
# Note that the initialization order matters here
Receiver.__init__(self, seed, selected_receive_ports, carrier_frequency)
ReceivingModemBase.__init__(
self,
receive_symbol_coding,
receive_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
Serializable.__init__(self)
@property
def power(self) -> float:
return self.waveform.power if self.waveform is not None else 0.0
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
ReceivingModemBase.serialize(self, process)
Receiver.serialize(self, process)
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = ReceivingModemBase._DeserializeParameters(process)
parameters.update(Receiver._DeserializeParameters(process))
return parameters
[docs]
@classmethod
@override
def Deserialize(cls: Type[ReceivingModem], process: DeserializationProcess) -> ReceivingModem:
parameters = cls._DeserializeParameters(process)
return cls(**parameters) # type: ignore[arg-type]
[docs]
class DuplexModem(TransmittingModem, ReceivingModem):
"""Representation of a wireless modem simultaneously transmitting and receiving."""
def __init__(
self,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
bits_source: BitsSource | None = None,
transmit_symbol_coding: (
TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None
) = None,
receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None,
transmit_signal_coding: (
TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None
) = None,
receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CommunicationWaveform | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
selected_transmit_ports:
Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports:
Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
carrier_frequency:
Carrier frequency of the transmitted and received communication signals.
bits_source:
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
transmit_symbol_coding:
Complex communication symbol coding configuration during transmission.
receive_symbol_coding:
Complex communication symbol coding configuration during reception.
transmit_signal_coding:
Stream MIMO coding configuration during signal transmission.
receive_signal_coding:
Stream MIMO coding configuration during signal reception.
encoding:
Bit coding configuration.
waveform:
The comminication waveform to be transmitted and received by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
ReceivingModem.__init__(
self,
selected_receive_ports,
carrier_frequency,
receive_symbol_coding,
receive_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
TransmittingModem.__init__(
self,
selected_transmit_ports,
carrier_frequency,
bits_source,
transmit_symbol_coding,
transmit_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
TransmittingModem.serialize(self, process)
ReceivingModem.serialize(self, process)
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = TransmittingModem._DeserializeParameters(process)
parameters.update(ReceivingModem._DeserializeParameters(process))
return parameters
[docs]
@classmethod
@override
def Deserialize(cls: Type[DuplexModem], process: DeserializationProcess) -> DuplexModem:
parameters = cls._DeserializeParameters(process)
return cls(**parameters) # type: ignore[arg-type]
[docs]
class SimplexLink(TransmittingModem, ReceivingModem):
"""Convenience class to manage a simplex communication link between two dedicated devices."""
def __init__(
self,
selected_transmit_ports: Sequence[int] | None = None,
selected_receive_ports: Sequence[int] | None = None,
carrier_frequency: float | None = None,
bits_source: BitsSource | None = None,
transmit_symbol_coding: (
TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None
) = None,
receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None,
transmit_signal_coding: (
TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None
) = None,
receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None,
encoding: EncoderManager | Sequence[Encoder] | None = None,
waveform: CommunicationWaveform | None = None,
frame_generator: FrameGenerator | None = None,
seed: int | None = None,
) -> None:
"""
Args:
selected_transmit_ports:
Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
selected_receive_ports:
Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array.
If not specified, all available ports will be considered.
carrier_frequency:
Carrier frequency of the transmitted and received communication signals.
bits_source:
Source configuration of communication bits transmitted by this modem.
Bits are randomly generated by default.
transmit_symbol_coding:
Complex communication symbol coding configuration during transmission.
receive_symbol_coding:
Complex communication symbol coding configuration during reception.
transmit_signal_coding:
Stream MIMO coding configuration during signal transmission.
receive_signal_coding:
Stream MIMO coding configuration during signal reception.
encoding:
Bit coding configuration.
waveform:
The waveform to be transmitted by this modem.
frame_generator:
Frame generator used to pack and unpack communication frames.
If not specified, a stub generator will be assumed.
seed:
Seed used to initialize the pseudo-random number generator.
"""
# Initialize base classes
ReceivingModem.__init__(
self,
selected_receive_ports,
carrier_frequency,
receive_symbol_coding,
receive_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
TransmittingModem.__init__(
self,
selected_transmit_ports,
carrier_frequency,
bits_source,
transmit_symbol_coding,
transmit_signal_coding,
encoding,
waveform,
frame_generator,
seed,
)
[docs]
def connect(self, transmitting_device: Device, receiving_device: Device) -> None:
"""Connect two devices by this simplex link.
Convenience method that assigns this DSP layer as a transmitter to the transmitting device and as a receiver to the receiving device.
Args:
transmitting_device:
Device transmitting over the simplex link.
receiving_device:
Device receiving over the simplex link.
"""
transmitting_device.transmitters.add(self)
receiving_device.receivers.add(self)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
TransmittingModem.serialize(self, process)
ReceivingModem.serialize(self, process)
@classmethod
@override
def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]:
parameters = TransmittingModem._DeserializeParameters(process)
parameters.update(ReceivingModem._DeserializeParameters(process))
return parameters
[docs]
@classmethod
@override
def Deserialize(cls: Type[SimplexLink], process: DeserializationProcess) -> SimplexLink:
parameters = cls._DeserializeParameters(process)
return cls(**parameters) # type: ignore[arg-type]