Source code for hermespy.modem.modem

# -*- coding: utf-8 -*-

from __future__ import annotations
from functools import cached_property
from typing import Generic, List, Sequence, Tuple, Type, TypeVar
from typing_extensions import override

import numpy as np

from hermespy.fec import Encoder, EncoderManager
from hermespy.core import (
    Device,
    DeserializationProcess,
    RandomNode,
    Serializable,
    SerializationProcess,
    Transmission,
    TransmitSignalCoding,
    TransmitStreamEncoder,
    Reception,
    Signal,
    TransmitState,
    Transmitter,
    Receiver,
    ReceiveState,
    ReceiveSignalCoding,
    ReceiveStreamDecoder,
)
from .bits_source import BitsSource, RandomBitsSource
from .precoding import (
    TransmitSymbolCoding,
    TransmitSymbolEncoder,
    ReceiveSymbolCoding,
    ReceiveSymbolDecoder,
)
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT
from .frame_generator import FrameGenerator, FrameGeneratorStub

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class CommunicationTransmissionFrame(Transmission): """A single synchronized frame of information generated by transmittgin over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ signal: Signal """Communication base-band waveform.""" bits: np.ndarray """Communication data bits.""" bit_block_size: int """Block size of the forward error correction input.""" encoded_bits: np.ndarray """Transmitted bits after FEC encoding.""" code_block_size: int """Block size of the forward error correction output.""" symbols: Symbols """Communication data symbols.""" encoded_symbols: Symbols """Communication data symbols after symbol encoding.""" timestamp: float """Time at which the frame was transmitted in seconds.""" def __init__( self, signal: Signal, bits: np.ndarray, bit_block_size: int, encoded_bits: np.ndarray, code_block_size: int, symbols: Symbols, encoded_symbols: Symbols, timestamp: float, ) -> None: """ Args: signal: Transmitted communication base-band waveform. bits: Transmitted communication data bits. bit_block_size: Block size of the forward error correction input. encoded_bits: Transmitted communication bits after FEC encoding. code_block_size: Block size of the forward error correction output symbols: Transmitted communication data symbols. encoded_symbols: Transmitted communication data symbols after symbol encoding. timestamp: Time at which the frame was transmitted in seconds. """ # Initialize base class Transmission.__init__(self, signal=signal) # Initialize class attributes self.bits = bits self.bit_block_size = bit_block_size self.encoded_bits = encoded_bits self.code_block_size = code_block_size self.symbols = symbols self.encoded_symbols = encoded_symbols self.timestamp = timestamp
[docs] def serialize(self, process: SerializationProcess) -> None: Transmission.serialize(self, process) process.serialize_array(self.bits, "bits") process.serialize_array(self.encoded_bits, "encoded_bits") process.serialize_integer(self.bit_block_size, "bit_block_size") process.serialize_integer(self.code_block_size, "code_block_size") process.serialize_object(self.symbols, "symbols") process.serialize_object(self.encoded_symbols, "encoded_symbols") process.serialize_floating(self.timestamp, "timestamp")
[docs] @classmethod def Deserialize(cls, process: DeserializationProcess) -> CommunicationTransmissionFrame: signal = process.deserialize_object("signal", Signal) bits = process.deserialize_array("bits", np.int64) encoded_bits = process.deserialize_array("encoded_bits", np.int64) bit_block_size = process.deserialize_integer("bit_block_size") code_block_size = process.deserialize_integer("code_block_size") symbols = process.deserialize_object("symbols", Symbols) encoded_symbols = process.deserialize_object("encoded_symbols", Symbols) timestamp = process.deserialize_floating("timestamp") return cls( signal=signal, bits=bits, bit_block_size=bit_block_size, encoded_bits=encoded_bits, code_block_size=code_block_size, symbols=symbols, encoded_symbols=encoded_symbols, timestamp=timestamp, )
[docs] class CommunicationTransmission(Transmission): """Collection of information generated by transmitting over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ frames: List[CommunicationTransmissionFrame] """Individual transmitted communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None ) -> None: """ Args: signal: Transmitted communication base-band waveform. frames: Individual transmitted communication frames. """ Transmission.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of transmitted communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def bits(self) -> np.ndarray: """Transmitted bits before FEC encoding. Returns: Numpy array of transmitted bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols
[docs] def serialize(self, process: SerializationProcess) -> None: Transmission.serialize(self, process) process.serialize_object_sequence(self.frames, "frames")
[docs] @classmethod def Deserialize( cls: Type[CommunicationTransmission], process: DeserializationProcess ) -> CommunicationTransmission: return cls( process.deserialize_object("signal", Signal), list(process.deserialize_object_sequence("frames", CommunicationTransmissionFrame)), )
[docs] class CommunicationReceptionFrame(Reception): """A single synchronized frame of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ decoded_signal: Signal """Communication base-band waveform after MIMO stream decoding.""" symbols: Symbols """Received communication symbols.""" decoded_symbols: Symbols """Received communication symbols after precoding stage.""" timestamp: float """Time at which the frame was transmitted in seconds.""" equalized_symbols: Symbols """Equalized communication symbols.""" encoded_bits: np.ndarray """Received encoded data bits before error correction.""" code_block_size: int """Block size of the forward error correction input.""" decoded_bits: np.ndarray """Received decoded data bits after error correction.""" bit_block_size: int """Block size of the forward error correction output.""" def __init__( self, signal: Signal, decoded_signal: Signal, symbols: Symbols, decoded_symbols: Symbols, timestamp: float, equalized_symbols: Symbols, encoded_bits: np.ndarray, code_block_size: int, decoded_bits: np.ndarray, bit_block_size: int, ) -> None: """ Args: signal: Received communication base-band waveform. decoded_signal: Received communication base-band waveform after MIMO stream decoding. symbols: Received communication symbols. decoded_symbols: Received communication symbols after precoding stage. timestamp: Time at which the frame was transmitted in seconds. equalized_symbols: Equalized communication symbols. encoded_bits: Received encoded data bits before error correction. code_block_size: Block size of the forward error correction input. decoded_bits: Received decoded data bits after error correction. bit_block_size: Block size of the forward error correction output. """ # Initialize base class Reception.__init__(self, signal) # Initialize class attributes self.signal = signal self.decoded_signal = decoded_signal self.symbols = symbols self.decoded_symbols = decoded_symbols self.timestamp = timestamp self.equalized_symbols = equalized_symbols self.encoded_bits = encoded_bits self.code_block_size = code_block_size self.decoded_bits = decoded_bits self.bit_block_size = bit_block_size
[docs] def serialize(self, process: SerializationProcess) -> None: Reception.serialize(self, process) process.serialize_object(self.decoded_signal, "decoded_signal") process.serialize_object(self.symbols, "symbols") process.serialize_object(self.decoded_symbols, "decoded_symbols") process.serialize_object(self.equalized_symbols, "equalized_symbols") process.serialize_array(self.encoded_bits, "encoded_bits") process.serialize_array(self.decoded_bits, "decoded_bits") process.serialize_integer(self.code_block_size, "code_block_size") process.serialize_integer(self.bit_block_size, "bit_block_size") process.serialize_floating(self.timestamp, "timestamp")
[docs] @classmethod def Deserialize( cls: Type[CommunicationReceptionFrame], process: DeserializationProcess ) -> CommunicationReceptionFrame: signal = process.deserialize_object("signal", Signal) decoded_signal = process.deserialize_object("decoded_signal", Signal) symbols = process.deserialize_object("symbols", Symbols) decoded_symbols = process.deserialize_object("decoded_symbols", Symbols) equalized_symbols = process.deserialize_object("equalized_symbols", Symbols) encoded_bits = process.deserialize_array("encoded_bits", np.int64) decoded_bits = process.deserialize_array("decoded_bits", np.int64) code_block_size = process.deserialize_integer("code_block_size") bit_block_size = process.deserialize_integer("bit_block_size") timestamp = process.deserialize_floating("timestamp") return cls( signal=signal, decoded_signal=decoded_signal, symbols=symbols, decoded_symbols=decoded_symbols, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, code_block_size=code_block_size, decoded_bits=decoded_bits, bit_block_size=bit_block_size, timestamp=timestamp, )
[docs] class CommunicationReception(Reception): """Collection of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ frames: List[CommunicationReceptionFrame] """Individual received communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None ) -> None: """ Args: signal: Received communication base-band waveform. frames: Individual received communication frames. """ Reception.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of received communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def encoded_bits(self) -> np.ndarray: """Received bits before FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.encoded_bits) return concatenated_bits @cached_property def bits(self) -> np.ndarray: """Received bits after FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.decoded_bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: """Collect all communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols @cached_property def equalized_symbols(self) -> Symbols: """Collect all equalized communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.equalized_symbols) return symbols
[docs] def serialize(self, process: SerializationProcess) -> None: Reception.serialize(self, process) process.serialize_object_sequence(self.frames, "frames")
[docs] @classmethod def Deserialize( cls: Type[CommunicationReception], process: DeserializationProcess ) -> CommunicationReception: return cls( process.deserialize_object("signal", Signal), list(process.deserialize_object_sequence("frames", CommunicationReceptionFrame)), )
_BMT = TypeVar("_BMT", bound="BaseModem") """Type of base modem."""
[docs] class BaseModem(Generic[CWT], RandomNode, Serializable): """Base class for wireless modems transmitting or receiving information over devices. Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface. """ __encoder_manager: EncoderManager __waveform: CWT | None __frame_generator: FrameGenerator def __init__( self, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CWT | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: encoding: Bit coding configuration. Encodes communication bit frames during transmission and decodes them during reception. waveform: The waveform to be transmitted by this modem. frame_generator: Frame generator used to pack and unpack communication frames. If not specified, a stub generator will be assumed. seed: Seed used to initialize the pseudo-random number generator. """ # Base class initialization RandomNode.__init__(self, seed=seed) # Initialize the bit encoding configuration if isinstance(encoding, EncoderManager): self.encoder_manager = encoding else: self.encoder_manager = EncoderManager() if encoding is not None: for encoder in encoding.encoders if isinstance(encoding, EncoderManager) else encoding: self.encoder_manager.add_encoder(encoder) # Initialize the remaining attributes self.waveform = waveform self.frame_generator = FrameGeneratorStub() if frame_generator is None else frame_generator @property def encoder_manager(self) -> EncoderManager: """Description of the modem's forward error correction. Refer to :class:`EncoderManager<hermespy.fec.coding.EncoderManager>` for further information. """ return self.__encoder_manager @encoder_manager.setter def encoder_manager(self, new_manager: EncoderManager) -> None: self.__encoder_manager = new_manager self.__encoder_manager.random_mother = self new_manager.modem = self @property def waveform(self) -> CWT | None: """Description of the communication waveform emitted by this modem. The only mandatory attribute required to transmit or receive over a :class:`modem<BaseModem>`. """ return self.__waveform @waveform.setter def waveform(self, value: CWT | None) -> None: self.__waveform = value if value is not None: value.modem = self value.random_mother = self @property def frame_generator(self) -> FrameGenerator: return self.__frame_generator @frame_generator.setter def frame_generator(self, value: FrameGenerator) -> None: self.__frame_generator = value @property def samples_per_frame(self) -> int: """Number of discrete-time samples per processed communication frame. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.samples_per_frame @property def frame_duration(self) -> float: """Duration of a single communication frame in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property. """ return self.waveform.frame_duration @property def symbol_duration(self) -> float: """Duration of a single communication symbol in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property. """ return self.waveform.symbol_duration @property def sampling_rate(self) -> float: """Sampling rate of the processed waveforms in Hz. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.sampling_rate
[docs] @override def serialize(self, process: SerializationProcess) -> None: process.serialize_object_sequence(self.encoder_manager.encoders, "bit_coding") if self.waveform is not None: process.serialize_object(self.waveform, "waveform") process.serialize_object(self.frame_generator, "frame_generator")
@classmethod def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: return { "encoding": process.deserialize_object_sequence("bit_coding", Encoder), "waveform": process.deserialize_object("waveform", CommunicationWaveform, None), "frame_generator": process.deserialize_object("frame_generator", FrameGenerator, None), }
[docs] @classmethod @override def Deserialize(cls: Type[_BMT], process: DeserializationProcess) -> _BMT: parameters = cls._DeserializeParameters(process) return cls(**parameters) # type: ignore[arg-type]
BMT = TypeVar("BMT", bound=BaseModem) """Type of base modem."""
[docs] class TransmittingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms transmitting information.""" __bits_source: BitsSource __transmit_symbol_coding: TransmitSymbolCoding __transmit_signal_coding: TransmitSignalCoding def __init__( self, bits_source: BitsSource | None = None, transmit_symbol_coding: ( TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None ) = None, transmit_signal_coding: ( TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None ) = None, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CWT | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: bits_source: Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. transmit_symbol_coding: Complex communication symbol coding configuration during transmission. transmit_signal_coding: Stream MIMO coding configuration during signal transmission. encoding: Bit coding configuration. waveform: The waveform to be transmitted by this modem. frame_generator: Frame generator used to pack and unpack communication frames. If not specified, a stub generator will be assumed. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base class BaseModem.__init__(self, encoding, waveform, frame_generator, seed) # Initialize transmit symbol coding configuration if isinstance(transmit_symbol_coding, TransmitSymbolCoding): self.__transmit_symbol_coding = transmit_symbol_coding else: self.__transmit_symbol_coding = TransmitSymbolCoding() if transmit_symbol_coding is not None: for transmit_symbol_encoder in transmit_symbol_coding: self.__transmit_symbol_coding[-1] = transmit_symbol_encoder # Initialize transmit signal coding configuration if isinstance(transmit_signal_coding, TransmitSignalCoding): self.__transmit_signal_coding = transmit_signal_coding else: self.__transmit_signal_coding = TransmitSignalCoding() if transmit_signal_coding is not None: for transmit_signal_encoder in transmit_signal_coding: self.__transmit_signal_coding[-1] = transmit_signal_encoder # Initialize remaining attributes self.bits_source = RandomBitsSource() if bits_source is None else bits_source @property def bits_source(self) -> BitsSource: """Source configuration of communication transmitted by this modem. Returns: A handle to the source configuration. """ return self.__bits_source @bits_source.setter def bits_source(self, value: BitsSource): value.random_mother = self self.__bits_source = value @property def transmit_signal_coding(self) -> TransmitSignalCoding: """Stream MIMO coding configuration during signal transmission.""" return self.__transmit_signal_coding @transmit_signal_coding.setter def transmit_signal_coding(self, value: TransmitSignalCoding) -> None: self.__transmit_signal_coding = value @property def transmit_symbol_coding(self) -> TransmitSymbolCoding: """Complex communication symbol coding configuration during transmission.""" return self.__transmit_symbol_coding @transmit_symbol_coding.setter def transmit_symbol_coding(self, value: TransmitSymbolCoding) -> None: self.__transmit_symbol_coding = value def __map(self, bits: np.ndarray, num_streams: int) -> Symbols: """Map a block of information bits to commuication symbols. Args: bits: The information bits to be mapped. Returns: A series of communication symbols. """ symbols = Symbols() for frame_bits in np.reshape(bits, (num_streams, -1)): symbols.append_stream(self.waveform.map(frame_bits)) return symbols def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal: """Modulates a sequence of MIMO signals into a base-band communication waveform. Args: symbols: Communication symbols to be modulated. carrier_frequency: Carrier frequency of the communication signal. Returns: The modualted base-band communication frame. """ # For each stream resulting from the initial encoding stage # Place and encode the symbols according to the stream transmit coding configuration frame_samples = np.empty( (symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex128 ) for s, stream_symbols in enumerate(symbols.raw): placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :])) # Modulate each placed symbol stream individually to its base-band signal representation frame_samples[s, :] = self.waveform.modulate(placed_symbols) # Apply the stream transmit coding configuration frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency) return frame_signal
[docs] def _transmit(self, device: TransmitState, duration: float) -> CommunicationTransmission: # By default, the generated signal's duration will be exactly one frame if duration <= 0.0: duration = self.frame_duration # Compute the number of frames to be transmitted within the given duration frame_duration = self.frame_duration num_mimo_frames = int(duration / frame_duration) # Compute the number of parallel symbol streams required to transmit over the given coding config num_input_signal_streams = self.transmit_signal_coding.num_transmit_input_streams( device.num_digital_transmit_ports ) num_input_symbol_streams = self.transmit_symbol_coding.num_transmit_input_streams( num_input_signal_streams ) # Compute the number of serial data bits required to generate a single frame over the given precoding config required_num_data_symbols = int( self.waveform.num_data_symbols * self.transmit_symbol_coding.encode_rate ) required_num_code_bits = ( self.waveform.bits_per_frame(required_num_data_symbols) * num_input_symbol_streams ) required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits) signal = Signal.Empty( self.waveform.sampling_rate, device.num_digital_transmit_ports, carrier_frequency=device.carrier_frequency, ) # Abort if no frame is to be transmitted within the current duration if num_mimo_frames < 1: transmission = CommunicationTransmission(signal) return transmission frames: List[CommunicationTransmissionFrame] = [] for n in range(num_mimo_frames): # Generate plain data bits frame_bits = self.frame_generator.pack_frame(self.bits_source, required_num_data_bits) # Apply forward error correction encoded_bits = self.encoder_manager.encode(frame_bits, required_num_code_bits) # Map bits to communication symbols mapped_symbols = self.__map(encoded_bits, num_input_symbol_streams) # Apply the first symbol precoding cofiguration encoded_symbols = self.transmit_symbol_coding.encode_symbols( StatedSymbols( mapped_symbols.raw, np.ones( ( mapped_symbols.num_streams, 1, mapped_symbols.num_blocks, mapped_symbols.num_symbols, ), dtype=np.complex128, ), ), num_input_signal_streams, ) # Modulate symbols to a base-band signal frame_signal = self.__modulate(encoded_symbols, device.carrier_frequency) # Apply stream encoding configuration encoded_frame_signal = self.transmit_signal_coding.encode_streams(frame_signal, device) # Save results signal.append_samples(encoded_frame_signal) frames.append( CommunicationTransmissionFrame( signal=frame_signal, bits=frame_bits, bit_block_size=self.encoder_manager.bit_block_size, encoded_bits=encoded_bits, code_block_size=self.encoder_manager.code_block_size, symbols=mapped_symbols, encoded_symbols=encoded_symbols, timestamp=n * frame_duration, ) ) # Save the transmitted information transmission = CommunicationTransmission(signal, frames) return transmission
[docs] @override def serialize(self, process: SerializationProcess) -> None: BaseModem.serialize(self, process) process.serialize_object(self.bits_source, "bits_source") process.serialize_object_sequence(self.transmit_symbol_coding, "transmit_symbol_coding") process.serialize_object_sequence(self.transmit_signal_coding, "transmit_signal_coding")
@classmethod @override def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: parameters = BaseModem._DeserializeParameters(process) parameters["bits_source"] = process.deserialize_object("bits_source", BitsSource) parameters["transmit_symbol_coding"] = process.deserialize_object_sequence( "transmit_symbol_coding", TransmitSymbolEncoder ) parameters["transmit_signal_coding"] = process.deserialize_object_sequence( "transmit_signal_coding", TransmitStreamEncoder ) return parameters
[docs] class TransmittingModem( TransmittingModemBase[CommunicationWaveform], Transmitter[CommunicationTransmission] ): """Representation of a wireless modem exclusively transmitting.""" def __init__( self, selected_transmit_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, bits_source: BitsSource | None = None, transmit_symbol_coding: ( TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None ) = None, transmit_signal_coding: ( TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None ) = None, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CommunicationWaveform | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: selected_transmit_ports: Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array. If not specified, all available ports will be considered. carrier_frequency: Carrier frequency of the transmitted communication signal. bits_source: Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. transmit_symbol_coding: Complex communication symbol coding configuration during transmission. transmit_signal_coding: Stream MIMO coding configuration during signal transmission. encoding: Bit coding configuration. waveform: The waveform to be transmitted by this modem. frame_generator: Frame generator used to pack and unpack communication frames. If not specified, a stub generator will be assumed. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base classes # Note that the initialization order matters here Transmitter.__init__(self, seed, selected_transmit_ports, carrier_frequency) TransmittingModemBase.__init__( self, bits_source, transmit_symbol_coding, transmit_signal_coding, encoding, waveform, frame_generator, seed, ) Serializable.__init__(self) @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0
[docs] @override def serialize(self, process: SerializationProcess) -> None: Transmitter.serialize(self, process) TransmittingModemBase.serialize(self, process)
@classmethod @override def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: parameters = Transmitter._DeserializeParameters(process) parameters.update(TransmittingModemBase._DeserializeParameters(process)) return parameters
[docs] @classmethod @override def Deserialize( cls: Type[TransmittingModem], process: DeserializationProcess ) -> TransmittingModem: parameters = cls._DeserializeParameters(process) return cls(**parameters) # type: ignore[arg-type]
[docs] class ReceivingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms receiving information.""" __receive_symbol_coding: ReceiveSymbolCoding __receive_signal_coding: ReceiveSignalCoding def __init__( self, receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None, receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CWT | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: receive_symbol_coding: Complex communication symbol coding configuration during reception. receive_signal_coding: Stream MIMO coding configuration during signal reception. encoding: Bit coding configuration. waveform: The waveform to be transmitted by this modem. frame_generator: Frame generator used to pack and unpack communication frames. If not specified, a stub generator will be assumed. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base class BaseModem.__init__(self, encoding, waveform, frame_generator, seed) # Initialize receive symbol coding configuration if isinstance(receive_symbol_coding, ReceiveSymbolCoding): self.__receive_symbol_coding = receive_symbol_coding else: self.__receive_symbol_coding = ReceiveSymbolCoding() if receive_symbol_coding is not None: for receive_symbol_decoder in receive_symbol_coding: self.__receive_symbol_coding[-1] = receive_symbol_decoder # Initialize receive signal coding configuration if isinstance(receive_signal_coding, ReceiveSignalCoding): self.__receive_signal_coding = receive_signal_coding else: self.__receive_signal_coding = ReceiveSignalCoding() if receive_signal_coding is not None: for receive_signal_decoder in receive_signal_coding: self.__receive_signal_coding[-1] = receive_signal_decoder @property def receive_signal_coding(self) -> ReceiveSignalCoding: """Stream MIMO coding configuration during signal reception.""" return self.__receive_signal_coding @receive_signal_coding.setter def receive_signal_coding(self, value: ReceiveSignalCoding) -> None: self.__receive_signal_coding = value @property def receive_symbol_coding(self) -> ReceiveSymbolCoding: """Complex communication symbol coding configuration during reception.""" return self.__receive_symbol_coding @receive_symbol_coding.setter def receive_symbol_coding(self, value: ReceiveSymbolCoding) -> None: self.__receive_symbol_coding = value def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]: """Synchronize a received MIMO base-band stream. Converts the stream into sections representing communication frames. Args: received_signal: The MIMO signal received over the operated device's RF chain. Returns: A sequence signals representing communication frames and their respective detection indices. """ # Synchronize raw MIMO data into frames frame_start_indices = self.waveform.synchronization.synchronize(received_signal.getitem()) frame_length = self.waveform.samples_per_frame synchronized_signals = [] for frame_start in frame_start_indices: frame_stop = frame_start + frame_length frame_samples = received_signal.getitem( (slice(None, None), slice(frame_start, frame_stop)) ) # Pad the frame if it is too short # This may happen if the last frame is incomplete, or synhronization is not perfect if frame_samples.shape[1] < frame_length: frame_samples = np.pad( frame_samples, ((0, 0), (0, frame_length - frame_samples.shape[1])), mode="constant", ) frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate) synchronized_signals.append(frame_signal) return frame_start_indices, synchronized_signals def __demodulate(self, frame: Signal) -> Symbols: """Demodulates a sequence of synchronized MIMO signals into data symbols. Args: frame: Synchronized MIMO signal, representing the samples of a full communication frame. Returns: Demodulated frame symbols. """ symbols = Symbols() for stream in frame.getitem(): stream_symbols = self.waveform.demodulate(stream) symbols.append_stream(stream_symbols) return symbols def __unmap(self, symbols: Symbols) -> np.ndarray: """Unmap a set of communication symbols to information bits. Args: symbols: The communication symbols to be unmapped Returns: A numpy array containing hard information bits. """ bits = np.empty(0, dtype=np.uint8) for stream in symbols.raw: bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :]))) return bits
[docs] def _receive(self, signal: Signal, device: ReceiveState) -> CommunicationReception: # Resample the signal to match the waveform's requirements signal = signal.resample(self.waveform.sampling_rate) # Decode the signal using the receive signal coding configuration decoded_signal = self.receive_signal_coding.decode_streams(signal, device) # Synchronize incoming signals frame_start_indices, synchronized_signals = self.__synchronize(decoded_signal) # Abort if no frame has been detected if len(synchronized_signals) < 1: reception = CommunicationReception(signal) return reception # Compute the bit generation requirements # Compute the number of serial data bits required to generate a single frame over the given precoding config required_num_data_symbols = int( self.waveform.num_data_symbols * self.receive_symbol_coding.decode_rate ) required_num_code_bits = self.waveform.bits_per_frame( required_num_data_symbols ) * self.receive_symbol_coding.num_receive_output_streams(decoded_signal.num_streams) required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits) # Process each frame independently frames: List[CommunicationReceptionFrame] = [] for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals): # Demodulate raw symbols for each frame independtly symbols = self.__demodulate(frame_signal) # Estimate the channel from each frame demodulation frame_delay = frame_index / self.waveform.sampling_rate stated_symbols = self.waveform.estimate_channel(symbols, frame_delay) # Extract data symbols to be decoded picked_symbols = self.waveform.pick(stated_symbols) # Decode the pre-equalization symbol precoding stage decoded_symbols = self.receive_symbol_coding.decode_symbols(picked_symbols) # Equalize the received symbols for each frame given the estimated channel state equalized_symbols = self.waveform.equalize_symbols(decoded_symbols) # Unmap equalized symbols to information bits encoded_bits = self.__unmap(equalized_symbols) # Apply inverse FEC configuration to correct errors and remove redundancies decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits) # Decode the frame payload_bits = self.frame_generator.unpack_frame(decoded_bits) # Store the received information frames.append( CommunicationReceptionFrame( signal=frame_signal, decoded_signal=frame_signal, symbols=symbols, decoded_symbols=decoded_symbols, timestamp=frame_index * signal.sampling_rate, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, code_block_size=self.encoder_manager.code_block_size, decoded_bits=payload_bits, bit_block_size=self.encoder_manager.bit_block_size, ) ) # Store the received information of all frames reception = CommunicationReception(signal=signal, frames=frames) return reception
[docs] @override def serialize(self, process: SerializationProcess) -> None: BaseModem.serialize(self, process) process.serialize_object_sequence(self.receive_symbol_coding, "receive_symbol_coding") process.serialize_object_sequence(self.receive_signal_coding, "receive_signal_coding")
@classmethod @override def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: parameters = BaseModem._DeserializeParameters(process) parameters["receive_symbol_coding"] = process.deserialize_object_sequence( "receive_symbol_coding", ReceiveSymbolDecoder ) parameters["receive_signal_coding"] = process.deserialize_object_sequence( "receive_signal_coding", ReceiveStreamDecoder ) return parameters
[docs] class ReceivingModem( ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable ): """Representation of a wireless modem exclusively receiving.""" def __init__( self, selected_receive_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None, receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CommunicationWaveform | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: selected_receive_ports: Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array. If not specified, all available antenna ports will be considered. carrier_frequency: Carrier frequency of the received communication signal. receive_symbol_coding: Complex communication symbol coding configuration during reception. receive_signal_coding: Stream MIMO coding configuration during signal reception. encoding: Bit coding configuration. waveform: The waveform to be received by this modem. frame_generator: Frame generator used to pack and unpack communication frames. If not specified, a stub generator will be assumed. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base classes # Note that the initialization order matters here Receiver.__init__(self, seed, selected_receive_ports, carrier_frequency) ReceivingModemBase.__init__( self, receive_symbol_coding, receive_signal_coding, encoding, waveform, frame_generator, seed, ) Serializable.__init__(self) @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0
[docs] @override def serialize(self, process: SerializationProcess) -> None: ReceivingModemBase.serialize(self, process) Receiver.serialize(self, process)
@classmethod @override def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: parameters = ReceivingModemBase._DeserializeParameters(process) parameters.update(Receiver._DeserializeParameters(process)) return parameters
[docs] @classmethod @override def Deserialize(cls: Type[ReceivingModem], process: DeserializationProcess) -> ReceivingModem: parameters = cls._DeserializeParameters(process) return cls(**parameters) # type: ignore[arg-type]
[docs] class DuplexModem(TransmittingModem, ReceivingModem): """Representation of a wireless modem simultaneously transmitting and receiving.""" def __init__( self, selected_transmit_ports: Sequence[int] | None = None, selected_receive_ports: Sequence[int] | None = None, carrier_frequency: float | None = None, bits_source: BitsSource | None = None, transmit_symbol_coding: ( TransmitSymbolCoding | Sequence[TransmitSymbolEncoder] | None ) = None, receive_symbol_coding: ReceiveSymbolCoding | Sequence[ReceiveSymbolDecoder] | None = None, transmit_signal_coding: ( TransmitSignalCoding | Sequence[TransmitStreamEncoder] | None ) = None, receive_signal_coding: ReceiveSignalCoding | Sequence[ReceiveStreamDecoder] | None = None, encoding: EncoderManager | Sequence[Encoder] | None = None, waveform: CommunicationWaveform | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: selected_transmit_ports: Indices of antenna ports selected for transmission from the operated :class:`Device's<hermespy.core.device.Device>` antenna array. If not specified, all available ports will be considered. selected_receive_ports: Indices of antenna ports selected for reception from the operated :class:`Device's<hermespy.core.device.Device>` antenna array. If not specified, all available ports will be considered. carrier_frequency: Carrier frequency of the transmitted and received communication signals. bits_source: Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. transmit_symbol_coding: Complex communication symbol coding configuration during transmission. receive_symbol_coding: Complex communication symbol coding configuration during reception. transmit_signal_coding: Stream MIMO coding configuration during signal transmission. receive_signal_coding: Stream MIMO coding configuration during signal reception. encoding: Bit coding configuration. waveform: The comminication waveform to be transmitted and received by this modem. frame_generator: Frame generator used to pack and unpack communication frames. seed: Seed used to initialize the pseudo-random number generator. """ # Initialize base classes ReceivingModem.__init__( self, selected_receive_ports, carrier_frequency, receive_symbol_coding, receive_signal_coding, encoding, waveform, frame_generator, seed, ) TransmittingModem.__init__( self, selected_transmit_ports, carrier_frequency, bits_source, transmit_symbol_coding, transmit_signal_coding, encoding, waveform, frame_generator, seed, )
[docs] @override def serialize(self, process: SerializationProcess) -> None: TransmittingModem.serialize(self, process) ReceivingModem.serialize(self, process)
@classmethod @override def _DeserializeParameters(cls, process: DeserializationProcess) -> dict[str, object]: parameters = TransmittingModem._DeserializeParameters(process) parameters.update(ReceivingModem._DeserializeParameters(process)) return parameters
[docs] @classmethod @override def Deserialize(cls: Type[DuplexModem], process: DeserializationProcess) -> DuplexModem: parameters = cls._DeserializeParameters(process) return cls(**parameters) # type: ignore[arg-type]