Source code for hermespy.modem.modem

# -*- coding: utf-8 -*-

from __future__ import annotations
from functools import cached_property
from typing import Generic, List, Set, Sequence, Tuple, Type, TypeVar

import numpy as np
from h5py import Group

from hermespy.fec import EncoderManager
from hermespy.core import (
    Device,
    HDFSerializable,
    RandomNode,
    Transmission,
    TransmitSignalCoding,
    Reception,
    Serializable,
    Signal,
    TransmitState,
    Transmitter,
    Receiver,
    ReceiveState,
    ReceiveSignalCoding,
)
from .bits_source import BitsSource, RandomBitsSource
from .precoding import TransmitSymbolCoding, ReceiveSymbolCoding
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT
from .frame_generator import FrameGenerator, FrameGeneratorStub

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class CommunicationTransmissionFrame(Transmission): """A single synchronized frame of information generated by transmittgin over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ signal: Signal """Communication base-band waveform.""" bits: np.ndarray """Communication data bits.""" bit_block_size: int """Block size of the forward error correction input.""" encoded_bits: np.ndarray """Transmitted bits after FEC encoding.""" code_block_size: int """Block size of the forward error correction output.""" symbols: Symbols """Communication data symbols.""" encoded_symbols: Symbols """Communication data symbols after symbol encoding.""" timestamp: float """Time at which the frame was transmitted in seconds.""" def __init__( self, signal: Signal, bits: np.ndarray, bit_block_size: int, encoded_bits: np.ndarray, code_block_size: int, symbols: Symbols, encoded_symbols: Symbols, timestamp: float, ) -> None: """ Args: signal (Signal): Transmitted communication base-band waveform. bits (numpy.ndarray): Transmitted communication data bits. bit_block_size (int): Block size of the forward error correction input. encoded_bits (numpy.ndarray): Transmitted communication bits after FEC encoding. code_block_size (int): Block size of the forward error correction output symbols (Symbols): Transmitted communication data symbols. encoded_symbols (Symbols): Transmitted communication data symbols after symbol encoding. timestamp (float): Time at which the frame was transmitted in seconds. """ # Initialize base class Transmission.__init__(self, signal=signal) # Initialize class attributes self.bits = bits self.bit_block_size = bit_block_size self.encoded_bits = encoded_bits self.code_block_size = code_block_size self.symbols = symbols self.encoded_symbols = encoded_symbols self.timestamp = timestamp @classmethod def from_HDF( cls: Type[CommunicationTransmissionFrame], group: Group, transmitter: Transmitter | None = None, ) -> CommunicationTransmissionFrame: # Recall groups signal = Signal.from_HDF(group["signal"]) symbols = Symbols.from_HDF(group["symbols"]) encoded_symbols = Symbols.from_HDF(group["encoded_symbols"]) # Recall datasets bits = np.array(group["bits"], dtype=np.uint8) encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8) # Recall attributes bit_block_size = group.attrs.get("bit_block_size", 1) code_block_size = group.attrs.get("code_block_size", 1) timestamp = group.attrs.get("timestamp", 0) # Initialize object from recalled state return cls( signal=signal, bits=bits, bit_block_size=bit_block_size, encoded_bits=encoded_bits, code_block_size=code_block_size, symbols=symbols, encoded_symbols=encoded_symbols, timestamp=timestamp, ) def to_HDF(self, group: Group) -> None: # Serialize base class self.signal.to_HDF(group.create_group("signal")) # Serialize groups self.symbols.to_HDF(group.create_group("symbols")) self.encoded_symbols.to_HDF(group.create_group("encoded_symbols")) # Serialize datasets group.create_dataset("bits", data=self.bits) group.create_dataset("encoded_bits", data=self.encoded_bits) # Serialize attributes group.attrs["bit_block_size"] = self.bit_block_size group.attrs["code_block_size"] = self.code_block_size group.attrs["timestamp"] = self.timestamp
[docs] class CommunicationTransmission(Transmission): """Collection of information generated by transmitting over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ frames: List[CommunicationTransmissionFrame] """Individual transmitted communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None ) -> None: """ Args: signal (Signal): Transmitted communication base-band waveform. frames (List[CommunicationTransmissionFrame], optional): Individual transmitted communication frames. """ Transmission.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of transmitted communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def bits(self) -> np.ndarray: """Transmitted bits before FEC encoding. Returns: Numpy array of transmitted bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols @classmethod def from_HDF(cls: Type[CommunicationTransmission], group: Group) -> CommunicationTransmission: # Recall base signal signal = Signal.from_HDF(group["signal"]) # Recall individual detected frames num_frames = group.attrs.get("num_frames", 0) frames = [ CommunicationTransmissionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames) ] return cls(signal=signal, frames=frames) def to_HDF(self, group: Group) -> None: # Serialize attributes group.attrs["num_frames"] = self.num_frames # Serialize base information self.signal.to_HDF(group.create_group("signal")) # Serialize detected frames for f, frame in enumerate(self.frames): frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs] class CommunicationReceptionFrame(HDFSerializable): """A single synchronized frame of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ signal: Signal """Communication base-band waveform.""" decoded_signal: Signal """Communication base-band waveform after MIMO stream decoding.""" symbols: Symbols """Received communication symbols.""" decoded_symbols: Symbols """Received communication symbols after precoding stage.""" timestamp: float """Time at which the frame was transmitted in seconds.""" equalized_symbols: Symbols """Equalized communication symbols.""" encoded_bits: np.ndarray """Received encoded data bits before error correction.""" code_block_size: int """Block size of the forward error correction input.""" decoded_bits: np.ndarray """Received decoded data bits after error correction.""" bit_block_size: int """Block size of the forward error correction output.""" def __init__( self, signal: Signal, decoded_signal: Signal, symbols: Symbols, decoded_symbols: Symbols, timestamp: float, equalized_symbols: Symbols, encoded_bits: np.ndarray, code_block_size: int, decoded_bits: np.ndarray, bit_block_size: int, ) -> None: """ Args: signal (Signal): Received communication base-band waveform. decoded_signal (Signal): Received communication base-band waveform after MIMO stream decoding. symbols (Symbols): Received communication symbols. decoded_symbols (Symbols): Received communication symbols after precoding stage. timestamp (float): Time at which the frame was transmitted in seconds. equalized_symbols (Symbols): Equalized communication symbols. encoded_bits (numpy.ndarray): Received encoded data bits before error correction. code_block_size (int): Block size of the forward error correction input. decoded_bits (numpy.ndarray): Received decoded data bits after error correction. bit_block_size (int): Block size of the forward error correction output. """ self.signal = signal self.decoded_signal = decoded_signal self.symbols = symbols self.decoded_symbols = decoded_symbols self.timestamp = timestamp self.equalized_symbols = equalized_symbols self.encoded_bits = encoded_bits self.code_block_size = code_block_size self.decoded_bits = decoded_bits self.bit_block_size = bit_block_size @classmethod def from_HDF( cls: Type[CommunicationReceptionFrame], group: Group ) -> CommunicationReceptionFrame: # Recall groups signal = Signal.from_HDF(group["signal"]) decoded_signal = Signal.from_HDF(group["decoded_signal"]) symbols = Symbols.from_HDF(group["symbols"]) decoded_symbols = Symbols.from_HDF(group["decoded_symbols"]) equalized_symbols = Symbols.from_HDF(group["equalized_symbols"]) # Recall datasets encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8) decoded_bits = np.array(group["decoded_bits"], dtype=np.uint8) # Recall attributes code_block_size = group.attrs.get("code_block_size", 1) bit_block_size = group.attrs.get("bit_block_size", 1) timestamp = group.attrs.get("timestamp", 0) # Initialize object from recalled state return cls( signal=signal, decoded_signal=decoded_signal, symbols=symbols, decoded_symbols=decoded_symbols, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, code_block_size=code_block_size, decoded_bits=decoded_bits, bit_block_size=bit_block_size, timestamp=timestamp, ) def to_HDF(self, group: Group) -> None: # Serialize groups self.signal.to_HDF(group.create_group("signal")) self.decoded_signal.to_HDF(group.create_group("decoded_signal")) self.symbols.to_HDF(group.create_group("symbols")) self.decoded_symbols.to_HDF(group.create_group("decoded_symbols")) self.equalized_symbols.to_HDF(group.create_group("equalized_symbols")) # Serialize datasets group.create_dataset("encoded_bits", data=self.encoded_bits) group.create_dataset("decoded_bits", data=self.decoded_bits) # Serialize attributes group.attrs["code_block_size"] = self.code_block_size group.attrs["bit_block_size"] = self.bit_block_size group.attrs["timestamp"] = self.timestamp
[docs] class CommunicationReception(Reception): """Collection of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ frames: List[CommunicationReceptionFrame] """Individual received communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None ) -> None: """ Args: signal (Signal): Received communication base-band waveform. frames (List[CommunicationReceptionFrame], optional): Individual received communication frames. """ Reception.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of received communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def encoded_bits(self) -> np.ndarray: """Received bits before FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.encoded_bits) return concatenated_bits @cached_property def bits(self) -> np.ndarray: """Received bits after FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.decoded_bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: """Collect all communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols @cached_property def equalized_symbols(self) -> Symbols: """Collect all equalized communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.equalized_symbols) return symbols @classmethod def from_HDF(cls: Type[CommunicationReception], group: Group) -> CommunicationReception: # Recall base signal signal = Signal.from_HDF(group["signal"]) # Recall individual detected frames num_frames = group.attrs.get("num_frames", 0) frames = [ CommunicationReceptionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames) ] return cls(signal=signal, frames=frames) def to_HDF(self, group: Group) -> None: # Serialize attributes group.attrs["num_frames"] = self.num_frames # Serialize base information self.signal.to_HDF(group.create_group("signal")) # Serialize detected frames for f, frame in enumerate(self.frames): frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs] class BaseModem(Generic[CWT], RandomNode): """Base class for wireless modems transmitting or receiving information over devices. Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface. """ __encoder_manager: EncoderManager __waveform: CWT | None __frame_generator: FrameGenerator @staticmethod def _arg_signature() -> Set[str]: return {"encoding", "waveform", "seed"} def __init__( self, encoding: EncoderManager | None = None, waveform: CWT | None = None, frame_generator: FrameGenerator | None = None, seed: int | None = None, ) -> None: """ Args: encoding (EncoderManager, optional): Bit coding configuration. Encodes communication bit frames during transmission and decodes them during reception. waveform (CWT, optional): The waveform to be transmitted by this modem. seed (int, optional): Seed used to initialize the pseudo-random number generator. """ # Base class initialization RandomNode.__init__(self, seed=seed) self.encoder_manager = EncoderManager() if encoding is None else encoding self.waveform = waveform self.frame_generator = FrameGeneratorStub() if frame_generator is None else frame_generator @property def encoder_manager(self) -> EncoderManager: """Description of the modem's forward error correction. Refer to :doc:`fec` for further information. """ return self.__encoder_manager @encoder_manager.setter def encoder_manager(self, new_manager: EncoderManager) -> None: self.__encoder_manager = new_manager self.__encoder_manager.random_mother = self new_manager.modem = self @property def waveform(self) -> CWT | None: """Description of the communication waveform emitted by this modem. The only mandatory attribute required to transmit or receive over a :class:`modem<BaseModem>`. """ return self.__waveform @waveform.setter def waveform(self, value: CWT | None) -> None: self.__waveform = value if value is not None: value.modem = self value.random_mother = self @property def frame_generator(self) -> FrameGenerator: return self.__frame_generator @frame_generator.setter def frame_generator(self, value: FrameGenerator) -> None: self.__frame_generator = value @property def samples_per_frame(self) -> int: """Number of discrete-time samples per processed communication frame. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.samples_per_frame @property def frame_duration(self) -> float: """Duration of a single communication frame in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property. """ return self.waveform.frame_duration @property def symbol_duration(self) -> float: """Duration of a single communication symbol in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property. """ return self.waveform.symbol_duration @property def sampling_rate(self) -> float: """Sampling rate of the processed waveforms in Hz. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.sampling_rate
BMT = TypeVar("BMT", bound=BaseModem) """Type of base modem."""
[docs] class TransmittingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms transmitting information.""" __bits_source: BitsSource __transmit_symbol_coding: TransmitSymbolCoding __transmit_signal_coding: TransmitSignalCoding def __init__(self, bits_source: BitsSource | None = None, *args, **kwargs) -> None: """ Args: bits_source (BitsSource, optional): Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. """ # Initialize base classes BaseModem.__init__(self, *args, **kwargs) # Initialize clas attributes self.bits_source = RandomBitsSource() if bits_source is None else bits_source self.__transmit_symbol_coding = TransmitSymbolCoding() self.__transmit_signal_coding = TransmitSignalCoding() @property def bits_source(self) -> BitsSource: """Source configuration of communication transmitted by this modem. Returns: A handle to the source configuration. """ return self.__bits_source @bits_source.setter def bits_source(self, value: BitsSource): value.random_mother = self self.__bits_source = value @property def transmit_signal_coding(self) -> TransmitSignalCoding: """Stream MIMO coding configuration during signal transmission.""" return self.__transmit_signal_coding @transmit_signal_coding.setter def transmit_signal_coding(self, value: TransmitSignalCoding) -> None: self.__transmit_signal_coding = value @property def transmit_symbol_coding(self) -> TransmitSymbolCoding: """Complex communication symbol coding configuration during transmission.""" return self.__transmit_symbol_coding @transmit_symbol_coding.setter def transmit_symbol_coding(self, value: TransmitSymbolCoding) -> None: self.__transmit_symbol_coding = value def __map(self, bits: np.ndarray, num_streams: int) -> Symbols: """Map a block of information bits to commuication symbols. Args: bits (numpy.ndarray): The information bits to be mapped. Returns: A series of communication symbols. """ symbols = Symbols() for frame_bits in np.reshape(bits, (num_streams, -1)): symbols.append_stream(self.waveform.map(frame_bits)) return symbols def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal: """Modulates a sequence of MIMO signals into a base-band communication waveform. Args: symbols (Symbols): Communication symbols to be modulated. carrier_frequency (float): Carrier frequency of the communication signal. Returns: The modualted base-band communication frame. """ # For each stream resulting from the initial encoding stage # Place and encode the symbols according to the stream transmit coding configuration frame_samples = np.empty( (symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex128 ) for s, stream_symbols in enumerate(symbols.raw): placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :])) # Modulate each placed symbol stream individually to its base-band signal representation frame_samples[s, :] = self.waveform.modulate(placed_symbols) # Apply the stream transmit coding configuration frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency) return frame_signal
[docs] def _transmit(self, device: TransmitState, duration: float) -> CommunicationTransmission: # By default, the generated signal's duration will be exactly one frame if duration <= 0.0: duration = self.frame_duration # Compute the number of frames to be transmitted within the given duration frame_duration = self.frame_duration num_mimo_frames = int(duration / frame_duration) # Compute the number of parallel symbol streams required to transmit over the given coding config num_input_signal_streams = self.transmit_signal_coding.num_transmit_input_streams( device.num_digital_transmit_ports ) num_input_symbol_streams = self.transmit_symbol_coding.num_transmit_input_streams( num_input_signal_streams ) # Compute the number of serial data bits required to generate a single frame over the given precoding config required_num_data_symbols = int( self.waveform.num_data_symbols * self.transmit_symbol_coding.encode_rate ) required_num_code_bits = ( self.waveform.bits_per_frame(required_num_data_symbols) * num_input_symbol_streams ) required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits) signal = Signal.Empty( self.waveform.sampling_rate, device.num_digital_transmit_ports, carrier_frequency=device.carrier_frequency, ) # Abort if no frame is to be transmitted within the current duration if num_mimo_frames < 1: transmission = CommunicationTransmission(signal) return transmission frames: List[CommunicationTransmissionFrame] = [] for n in range(num_mimo_frames): # Generate plain data bits frame_bits = self.frame_generator.pack_frame(self.bits_source, required_num_data_bits) # Apply forward error correction encoded_bits = self.encoder_manager.encode(frame_bits, required_num_code_bits) # Map bits to communication symbols mapped_symbols = self.__map(encoded_bits, num_input_symbol_streams) # Apply the first symbol precoding cofiguration encoded_symbols = self.transmit_symbol_coding.encode_symbols( StatedSymbols( mapped_symbols.raw, np.ones( ( mapped_symbols.num_streams, 1, mapped_symbols.num_blocks, mapped_symbols.num_symbols, ), dtype=np.complex128, ), ), num_input_signal_streams, ) # Modulate symbols to a base-band signal frame_signal = self.__modulate(encoded_symbols, device.carrier_frequency) # Apply stream encoding configuration encoded_frame_signal = self.transmit_signal_coding.encode_streams(frame_signal, device) # Save results signal.append_samples(encoded_frame_signal) frames.append( CommunicationTransmissionFrame( signal=frame_signal, bits=frame_bits, bit_block_size=self.encoder_manager.bit_block_size, encoded_bits=encoded_bits, code_block_size=self.encoder_manager.code_block_size, symbols=mapped_symbols, encoded_symbols=encoded_symbols, timestamp=n * frame_duration, ) ) # Save the transmitted information transmission = CommunicationTransmission(signal, frames) return transmission
[docs] class TransmittingModem( TransmittingModemBase[CommunicationWaveform], Transmitter[CommunicationTransmission], Serializable, ): """Representation of a wireless modem exclusively transmitting.""" yaml_tag = "TxModem" def __init__( self, *args, selected_transmit_ports: Sequence[int] | None = None, **kwargs ) -> None: """ Args: selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`TransmittingModemBase` for further details. """ # Initialize base classes # Note that the initialization order matters here Transmitter.__init__(self, selected_transmit_ports=selected_transmit_ports) TransmittingModemBase.__init__(self, *args, **kwargs) Serializable.__init__(self) @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0 def _recall_transmission(self, group: Group) -> CommunicationTransmission: return CommunicationTransmission.from_HDF(group)
[docs] class ReceivingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms receiving information.""" __receive_symbol_coding: ReceiveSymbolCoding __receive_signal_coding: ReceiveSignalCoding def __init__(self, *args, **kwargs) -> None: # Initialize base class BaseModem.__init__(self, *args, **kwargs) # Initialize class attributes self.__receive_symbol_coding = ReceiveSymbolCoding() self.__receive_signal_coding = ReceiveSignalCoding() @property def receive_signal_coding(self) -> ReceiveSignalCoding: """Stream MIMO coding configuration during signal reception.""" return self.__receive_signal_coding @receive_signal_coding.setter def receive_signal_coding(self, value: ReceiveSignalCoding) -> None: self.__receive_signal_coding = value @property def receive_symbol_coding(self) -> ReceiveSymbolCoding: """Complex communication symbol coding configuration during reception.""" return self.__receive_symbol_coding @receive_symbol_coding.setter def receive_symbol_coding(self, value: ReceiveSymbolCoding) -> None: self.__receive_symbol_coding = value def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]: """Synchronize a received MIMO base-band stream. Converts the stream into sections representing communication frames. Args: received_signal (Signal): The MIMO signal received over the operated device's RF chain. Returns: A sequence signals representing communication frames and their respective detection indices. """ # Synchronize raw MIMO data into frames frame_start_indices = self.waveform.synchronization.synchronize(received_signal.getitem()) frame_length = self.waveform.samples_per_frame synchronized_signals = [] for frame_start in frame_start_indices: frame_stop = frame_start + frame_length frame_samples = received_signal.getitem( (slice(None, None), slice(frame_start, frame_stop)) ) # Pad the frame if it is too short # This may happen if the last frame is incomplete, or synhronization is not perfect if frame_samples.shape[1] < frame_length: frame_samples = np.pad( frame_samples, ((0, 0), (0, frame_length - frame_samples.shape[1])), mode="constant", ) frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate) synchronized_signals.append(frame_signal) return frame_start_indices, synchronized_signals def __demodulate(self, frame: Signal) -> Symbols: """Demodulates a sequence of synchronized MIMO signals into data symbols. Args: frame (Signal): Synchronized MIMO signal, representing the samples of a full communication frame. Returns: Demodulated frame symbols. """ symbols = Symbols() for stream in frame.getitem(): stream_symbols = self.waveform.demodulate(stream) symbols.append_stream(stream_symbols) return symbols def __unmap(self, symbols: Symbols) -> np.ndarray: """Unmap a set of communication symbols to information bits. Args: symbols (Symbols): The communication symbols to be unmapped Returns: A numpy array containing hard information bits. """ bits = np.empty(0, dtype=np.uint8) for stream in symbols.raw: bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :]))) return bits
[docs] def _receive(self, signal: Signal, device: ReceiveState) -> CommunicationReception: # Resample the signal to match the waveform's requirements signal = signal.resample(self.waveform.sampling_rate) # Decode the signal using the receive signal coding configuration decoded_signal = self.receive_signal_coding.decode_streams(signal, device) # Synchronize incoming signals frame_start_indices, synchronized_signals = self.__synchronize(decoded_signal) # Abort if no frame has been detected if len(synchronized_signals) < 1: reception = CommunicationReception(signal) return reception # Compute the bit generation requirements # Compute the number of serial data bits required to generate a single frame over the given precoding config required_num_data_symbols = int( self.waveform.num_data_symbols * self.receive_symbol_coding.decode_rate ) required_num_code_bits = self.waveform.bits_per_frame( required_num_data_symbols ) * self.receive_symbol_coding.num_receive_output_streams(decoded_signal.num_streams) required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits) # Process each frame independently frames: List[CommunicationReceptionFrame] = [] for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals): # Demodulate raw symbols for each frame independtly symbols = self.__demodulate(frame_signal) # Estimate the channel from each frame demodulation frame_delay = frame_index / self.waveform.sampling_rate stated_symbols = self.waveform.estimate_channel(symbols, frame_delay) # Extract data symbols to be decoded picked_symbols = self.waveform.pick(stated_symbols) # Decode the pre-equalization symbol precoding stage decoded_symbols = self.receive_symbol_coding.decode_symbols(picked_symbols) # Equalize the received symbols for each frame given the estimated channel state equalized_symbols = self.waveform.equalize_symbols(decoded_symbols) # Unmap equalized symbols to information bits encoded_bits = self.__unmap(equalized_symbols) # Apply inverse FEC configuration to correct errors and remove redundancies decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits) # Decode the frame payload_bits = self.frame_generator.unpack_frame(decoded_bits) # Store the received information frames.append( CommunicationReceptionFrame( signal=frame_signal, decoded_signal=frame_signal, symbols=symbols, decoded_symbols=decoded_symbols, timestamp=frame_index * signal.sampling_rate, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, code_block_size=self.encoder_manager.code_block_size, decoded_bits=payload_bits, bit_block_size=self.encoder_manager.bit_block_size, ) ) # Store the received information of all frames reception = CommunicationReception(signal=signal, frames=frames) return reception
[docs] class ReceivingModem( ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable ): """Representation of a wireless modem exclusively receiving.""" yaml_tag = "RxModem" """YAML serialization tag""" def __init__( self, *args, selected_receive_ports: Sequence[int] | None = None, **kwargs ) -> None: """ Args: selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`ReceivingModemBase` for further details. """ # Initialize base classes # Note that the initialization order matters here Receiver.__init__(self, selected_receive_ports=selected_receive_ports) ReceivingModemBase.__init__(self, *args, **kwargs) Serializable.__init__(self) @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0 def _recall_reception(self, group: Group) -> CommunicationReception: return CommunicationReception.from_HDF(group)
[docs] class DuplexModem(TransmittingModem, ReceivingModem): """Representation of a wireless modem simultaneously transmitting and receiving.""" yaml_tag = "Modem" def __init__( self, *args, bits_source: BitsSource | None = None, selected_transmit_ports: Sequence[int] | None = None, selected_receive_ports: Sequence[int] | None = None, **kwargs, ) -> None: """ Args: bits_source (BitsSource, optional): Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details. """ # Initialize base classes ReceivingModem.__init__(self, selected_receive_ports=selected_receive_ports, **kwargs) TransmittingModem.__init__( self, *args, bits_source=bits_source, selected_transmit_ports=selected_transmit_ports, **kwargs, )