Source code for hermespy.modem.modem

# -*- coding: utf-8 -*-

from __future__ import annotations
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Generic, List, Set, Sequence, Tuple, Type

import numpy as np
from h5py import Group

from hermespy.fec import EncoderManager
from hermespy.core import (
    HDFSerializable,
    RandomNode,
    Transmission,
    Reception,
    Serializable,
    Signal,
    Device,
    Transmitter,
    Receiver,
)
from hermespy.precoding import ReceiveStreamCoding, TransmitStreamCoding
from .precoding import SymbolPrecoding
from .bits_source import BitsSource, RandomBitsSource
from .symbols import StatedSymbols, Symbols
from .waveform import CommunicationWaveform, CWT

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler", "Tobias Kronauer"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class CommunicationTransmissionFrame(Transmission): """A single synchronized frame of information generated by transmittgin over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ signal: Signal """Communication base-band waveform.""" bits: np.ndarray """Communication data bits.""" encoded_bits: np.ndarray """Transmitted bits after FEC encoding.""" symbols: Symbols """Communication data symbols.""" encoded_symbols: Symbols """Communication data symbols after symbol encoding.""" timestamp: float """Time at which the frame was transmitted in seconds.""" def __init__( self, signal: Signal, bits: np.ndarray, encoded_bits: np.ndarray, symbols: Symbols, encoded_symbols: Symbols, timestamp: float, ) -> None: """ Args: signal (Signal): Transmitted communication base-band waveform. bits (np.ndarray): Transmitted communication data bits. encoded_bits (np.ndarray): Transmitted communication bits after FEC encoding. symbols (Symbols): Transmitted communication data symbols. encoded_symbols (Symbols): Transmitted communication data symbols after symbol encoding. timestamp (float): Time at which the frame was transmitted in seconds. """ Transmission.__init__(self, signal=signal) self.bits = bits self.encoded_bits = encoded_bits self.symbols = symbols self.encoded_symbols = encoded_symbols self.timestamp = timestamp @classmethod def from_HDF( cls: Type[CommunicationTransmissionFrame], group: Group, transmitter: Transmitter | None = None, ) -> CommunicationTransmissionFrame: # Recall groups signal = Signal.from_HDF(group["signal"]) symbols = Symbols.from_HDF(group["symbols"]) encoded_symbols = Symbols.from_HDF(group["encoded_symbols"]) # Recall datasets bits = np.array(group["bits"], dtype=np.uint8) encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8) # Recall attributes timestamp = group.attrs.get("timestamp", 0) # Initialize object from recalled state return cls( signal=signal, bits=bits, encoded_bits=encoded_bits, symbols=symbols, encoded_symbols=encoded_symbols, timestamp=timestamp, ) def to_HDF(self, group: Group) -> None: # Serialize base class self.signal.to_HDF(group.create_group("signal")) # Serialize groups self.symbols.to_HDF(group.create_group("symbols")) self.encoded_symbols.to_HDF(group.create_group("encoded_symbols")) # Serialize datasets group.create_dataset("bits", data=self.bits) group.create_dataset("encoded_bits", data=self.encoded_bits) # Serialize attributes group.attrs["timestamp"] = self.timestamp
[docs] class CommunicationTransmission(Transmission): """Collection of information generated by transmitting over a modem. Returned when calling the :meth:`transmit<hermespy.core.device.Transmitter.transmit>` method of a :class:`TransmittingModem<hermespy.modem.modem.TransmittingModem>` instance. """ frames: List[CommunicationTransmissionFrame] """Individual transmitted communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationTransmissionFrame] | None = None ) -> None: """ Args: signal (Signal): Transmitted communication base-band waveform. frames (List[CommunicationTransmissionFrame], optional): Individual transmitted communication frames. """ Transmission.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of transmitted communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def bits(self) -> np.ndarray: """Transmitted bits before FEC encoding. Returns: Numpy array of transmitted bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols @classmethod def from_HDF(cls: Type[CommunicationTransmission], group: Group) -> CommunicationTransmission: # Recall base signal signal = Signal.from_HDF(group["signal"]) # Recall individual detected frames num_frames = group.attrs.get("num_frames", 0) frames = [ CommunicationTransmissionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames) ] return cls(signal=signal, frames=frames) def to_HDF(self, group: Group) -> None: # Serialize attributes group.attrs["num_frames"] = self.num_frames # Serialize base information self.signal.to_HDF(group.create_group("signal")) # Serialize detected frames for f, frame in enumerate(self.frames): frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs] class CommunicationReceptionFrame(HDFSerializable): """A single synchronized frame of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ signal: Signal """Communication base-band waveform.""" decoded_signal: Signal """Communication base-band waveform after MIMO stream decoding.""" symbols: Symbols """Received communication symbols.""" decoded_symbols: Symbols """Received communication symbols after precoding stage.""" timestamp: float """Time at which the frame was transmitted in seconds.""" equalized_symbols: Symbols """Equalized communication symbols.""" encoded_bits: np.ndarray """Received encoded data bits before error correction.""" decoded_bits: np.ndarray """Received decoded data bits after error correction.""" def __init__( self, signal: Signal, decoded_signal: Signal, symbols: Symbols, decoded_symbols: Symbols, timestamp: float, equalized_symbols: Symbols, encoded_bits: np.ndarray, decoded_bits: np.ndarray, ) -> None: self.signal = signal self.decoded_signal = decoded_signal self.symbols = symbols self.decoded_symbols = decoded_symbols self.timestamp = timestamp self.equalized_symbols = equalized_symbols self.encoded_bits = encoded_bits self.decoded_bits = decoded_bits @classmethod def from_HDF( cls: Type[CommunicationReceptionFrame], group: Group ) -> CommunicationReceptionFrame: # Recall groups signal = Signal.from_HDF(group["signal"]) decoded_signal = Signal.from_HDF(group["decoded_signal"]) symbols = Symbols.from_HDF(group["symbols"]) decoded_symbols = Symbols.from_HDF(group["decoded_symbols"]) equalized_symbols = Symbols.from_HDF(group["equalized_symbols"]) # Recall datasets encoded_bits = np.array(group["encoded_bits"], dtype=np.uint8) decoded_bits = np.array(group["decoded_bits"], dtype=np.uint8) # Recall attributes timestamp = group.attrs.get("timestamp", 0) # Initialize object from recalled state return cls( signal=signal, decoded_signal=decoded_signal, symbols=symbols, decoded_symbols=decoded_symbols, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, decoded_bits=decoded_bits, timestamp=timestamp, ) def to_HDF(self, group: Group) -> None: # Serialize groups self.signal.to_HDF(group.create_group("signal")) self.decoded_signal.to_HDF(group.create_group("decoded_signal")) self.symbols.to_HDF(group.create_group("symbols")) self.decoded_symbols.to_HDF(group.create_group("decoded_symbols")) self.equalized_symbols.to_HDF(group.create_group("equalized_symbols")) # Serialize datasets group.create_dataset("encoded_bits", data=self.encoded_bits) group.create_dataset("decoded_bits", data=self.decoded_bits) # Serialize attributes group.attrs["timestamp"] = self.timestamp
[docs] class CommunicationReception(Reception): """Collection of information generated by receiving over a modem. Returned when calling the :meth:`receive<hermespy.core.device.Receiver.receive>` method of a :class:`ReceivingModem<hermespy.modem.modem.ReceivingModem>` instance. """ frames: List[CommunicationReceptionFrame] """Individual received communication frames.""" def __init__( self, signal: Signal, frames: List[CommunicationReceptionFrame] | None = None ) -> None: """ Args: signal (Signal): Received communication base-band waveform. frames (List[CommunicationReceptionFrame], optional): Individual received communication frames. """ Reception.__init__(self, signal=signal) self.frames = [] if frames is None else frames @property def num_frames(self) -> int: """Number of received communication frames. Returns: Number of frames. """ return len(self.frames) @cached_property def encoded_bits(self) -> np.ndarray: """Received bits before FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.encoded_bits) return concatenated_bits @cached_property def bits(self) -> np.ndarray: """Received bits after FEC decoding. Returns: Numpy array containing received bits. """ concatenated_bits = np.empty(0, dtype=np.uint8) for frame in self.frames: concatenated_bits = np.append(concatenated_bits, frame.decoded_bits) return concatenated_bits @cached_property def symbols(self) -> Symbols: """Collect all communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.symbols) return symbols @cached_property def equalized_symbols(self) -> Symbols: """Collect all equalized communication symbols.""" symbols = Symbols() for frame in self.frames: symbols.append_symbols(frame.equalized_symbols) return symbols @classmethod def from_HDF(cls: Type[CommunicationReception], group: Group) -> CommunicationReception: # Recall base signal signal = Signal.from_HDF(group["signal"]) # Recall individual detected frames num_frames = group.attrs.get("num_frames", 0) frames = [ CommunicationReceptionFrame.from_HDF(group[f"frame_{f:02d}"]) for f in range(num_frames) ] return cls(signal=signal, frames=frames) def to_HDF(self, group: Group) -> None: # Serialize attributes group.attrs["num_frames"] = self.num_frames # Serialize base information self.signal.to_HDF(group.create_group("signal")) # Serialize detected frames for f, frame in enumerate(self.frames): frame.to_HDF(group.create_group(f"frame_{f:02d}"))
[docs] class BaseModem(ABC, Generic[CWT], RandomNode): """Base class for wireless modems transmitting or receiving information over devices. Configure a :class:`TransmittingModem`, :class:`ReceivingModem` or the convenience wrappers :class:`SimplexLink` and :class:`DuplexModem` implementing this abstract interface. """ __encoder_manager: EncoderManager __precoding: SymbolPrecoding __waveform: CWT | None @staticmethod def _arg_signature() -> Set[str]: return {"encoding", "precoding", "waveform", "seed"} def __init__( self, encoding: EncoderManager | None = None, precoding: SymbolPrecoding | None = None, waveform: CWT | None = None, seed: int | None = None, ) -> None: """ Args: encoding (EncoderManager, optional): Bit coding configuration. Encodes communication bit frames during transmission and decodes them during reception. precoding (SymbolPrecoding, optional): Modulation symbol coding configuration. waveform (CWT, optional): The waveform to be transmitted by this modem. seed (int, optional): Seed used to initialize the pseudo-random number generator. """ # Base class initialization RandomNode.__init__(self, seed=seed) self.encoder_manager = EncoderManager() if encoding is None else encoding self.precoding = SymbolPrecoding(modem=self) if precoding is None else precoding self.waveform = waveform @property @abstractmethod def transmitting_device(self) -> Device | None: """Tranmsitting device operated by the modem. :py:obj:`None` if the device is unspecified. """ ... # pragma no cover @property @abstractmethod def receiving_device(self) -> Device | None: """Receiving device operated by the modem. :py:obj:`None` if the device is unspecified. """ ... # pragma no cover @property def encoder_manager(self) -> EncoderManager: """Description of the modem's forward error correction. Refer to :doc:`fec` for further information. """ return self.__encoder_manager @encoder_manager.setter def encoder_manager(self, new_manager: EncoderManager) -> None: self.__encoder_manager = new_manager self.__encoder_manager.random_mother = self new_manager.modem = self @property def waveform(self) -> CWT | None: """Description of the communication waveform emitted by this modem. The only mandatory attribute required to transmit or receive over a :class:`modem<BaseModem>`. """ return self.__waveform @waveform.setter def waveform(self, value: CWT | None) -> None: self.__waveform = value if value is not None: value.modem = self value.random_mother = self @property def precoding(self) -> SymbolPrecoding: """Description of the modem's precoding on a symbol level.""" return self.__precoding @precoding.setter def precoding(self, coding: SymbolPrecoding) -> None: self.__precoding = coding self.__precoding.modem = self def _bit_requirements(self) -> Tuple[int, int]: """Compute the bit generation requirements of the modem for the given configuration. Returns: Tuple of required data bits and required code bits. Raises: RuntimeError: If the symbol precoding rate does not match the waveform configuration. """ if self.waveform.num_data_symbols % self.precoding.rate.denominator != 0: raise RuntimeError( f"Symbol precoding rate does not match the waveform configuration ({self.waveform.num_data_symbols} % {self.precoding.rate.denominator} != 0)" ) required_num_data_symbols = int(self.waveform.num_data_symbols * self.precoding.rate) required_num_code_bits = ( self.waveform.bits_per_frame(required_num_data_symbols) * self.precoding.num_input_streams ) required_num_data_bits = self.encoder_manager.required_num_data_bits(required_num_code_bits) return required_num_data_bits, required_num_code_bits @property def num_data_bits_per_frame(self) -> int: """Compute the number of required data bits to generate a single frame. The number of bits depends on the configured :attr:`waveform<BaseModem.waveform>`, as well as the :attr:`encoder_manager<BaseModem.encoder_manager>` and :attr:`precoding<BaseModem.precoding>`. """ required_num_data_bits, _ = self._bit_requirements() return required_num_data_bits @property def samples_per_frame(self) -> int: """Number of discrete-time samples per processed communication frame. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.samples_per_frame @property def frame_duration(self) -> float: """Duration of a single communication frame in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`frame_duration<hermespy.modem.waveform.CommunicationWaveform.frame_duration>` property. """ return self.waveform.frame_duration @property def symbol_duration(self) -> float: """Duration of a single communication symbol in seconds. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`symbol_duration<hermespy.modem.waveform.CommunicationWaveform.symbol_duration>` property. """ return self.waveform.symbol_duration @property def sampling_rate(self) -> float: """Sampling rate of the processed waveforms in Hz. Convenience wrapper for the :attr:`waveform<BaseModem.waveform>` :attr:`sampling_rate<hermespy.modem.waveform.CommunicationWaveform.sampling_rate>` property. """ return self.waveform.sampling_rate @property def num_transmit_ports(self) -> int: """Number of transmit antenna ports.""" return 0 # pragma: no cover @property def num_receive_ports(self) -> int: """Number of receive antenna ports.""" return 0 # pragma: no cover
[docs] class TransmittingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms transmitting information.""" __bits_source: BitsSource __transmit_stream_coding: TransmitStreamCoding def __init__(self, bits_source: BitsSource | None = None, *args, **kwargs) -> None: """ Args: bits_source (BitsSource, optional): Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. """ # Initialize base classes BaseModem.__init__(self, *args, **kwargs) # Initialize clas attributes self.bits_source = RandomBitsSource() if bits_source is None else bits_source self.__transmit_stream_coding = TransmitStreamCoding(modem=self) @property def bits_source(self) -> BitsSource: """Source configuration of communication transmitted by this modem. Returns: A handle to the source configuration. """ return self.__bits_source @bits_source.setter def bits_source(self, value: BitsSource): value.random_mother = self self.__bits_source = value @property def num_transmit_ports(self) -> int: return ( 0 if self.transmitting_device is None else self.transmitting_device.num_transmit_ports ) @property def transmit_stream_coding(self) -> TransmitStreamCoding: """Stream MIMO coding configuration during signal transmission. Returns: Handle to the coding configuration. """ return self.__transmit_stream_coding @transmit_stream_coding.setter def transmit_stream_coding(self, value: TransmitStreamCoding) -> None: self.__transmit_stream_coding = value value.modem = self def __map(self, bits: np.ndarray, num_streams: int) -> Symbols: """Map a block of information bits to commuication symbols. Args: bits (np.ndarray): The information bits to be mapped. Returns: A series of communication symbols. """ symbols = Symbols() for frame_bits in np.reshape(bits, (num_streams, -1)): symbols.append_stream(self.waveform.map(frame_bits)) return symbols def __modulate(self, symbols: Symbols, carrier_frequency: float) -> Signal: """Modulates a sequence of MIMO signals into a base-band communication waveform. Args: symbols (Symbols): Communication symbols to be modulated. carrier_frequency (float): Carrier frequency of the communication signal. Returns: The modualted base-band communication frame. """ # For each stream resulting from the initial encoding stage # Place and encode the symbols according to the stream transmit coding configuration frame_samples = np.empty( (symbols.num_streams, self.waveform.samples_per_frame), dtype=np.complex_ ) for s, stream_symbols in enumerate(symbols.raw): placed_symbols = self.waveform.place(Symbols(stream_symbols[np.newaxis, :, :])) # Modulate each placed symbol stream individually to its base-band signal representation frame_samples[s, :] = self.waveform.modulate(placed_symbols) # Apply the stream transmit coding configuration frame_signal = Signal.Create(frame_samples, self.waveform.sampling_rate, carrier_frequency) return frame_signal
[docs] def _transmit(self, duration: float = -1.0) -> CommunicationTransmission: """Returns an array with the complex base-band samples of a waveform generator. The signal may be distorted by RF impairments. Args: duration (float, optional): Length of signal in seconds. Returns: Transmitted information. """ # By default, the drop duration will be exactly one frame if duration <= 0.0: duration = self.frame_duration # Infer required parameters carrier_frequency = ( self.transmitting_device.carrier_frequency if self.transmitting_device is not None else 0.0 ) frame_duration = self.frame_duration num_mimo_frames = int(duration / frame_duration) (required_num_data_bits, required_num_code_bits) = self._bit_requirements() # Ultimately, the number of resulting output streams is determined by the stream coding configuration if len(self.transmit_stream_coding) > 0: num_output_streams = self.transmit_stream_coding.num_output_streams # If not stream coding configuration is available, the number of output streams is determined by the precoding configuration elif len(self.precoding) > 0: num_output_streams = self.precoding.num_output_streams # The default number of output streams is one else: num_output_streams = 1 # Assert that the number of output streams matches the antenna count if num_output_streams != self.num_transmit_ports: raise RuntimeError( f"Modem MIMO configuration generates invalid number of antenna streams ({num_output_streams} instead of {self.num_transmit_ports})" ) signal = Signal.Empty( self.waveform.sampling_rate, num_output_streams, carrier_frequency=carrier_frequency ) # Abort if no frame is to be transmitted within the current duration if num_mimo_frames < 1: transmission = CommunicationTransmission(signal) return transmission frames: List[CommunicationTransmissionFrame] = [] for n in range(num_mimo_frames): # Generate plain data bits data_bits = self.bits_source.generate_bits(required_num_data_bits) # Apply forward error correction encoded_bits = self.encoder_manager.encode(data_bits, required_num_code_bits) # Map bits to communication symbols mapped_symbols = self.__map(encoded_bits, self.precoding.num_input_streams) # Apply the first symbol precoding cofiguration encoded_symbols = self.precoding.encode( StatedSymbols( mapped_symbols.raw, np.ones( ( mapped_symbols.num_streams, 1, mapped_symbols.num_blocks, mapped_symbols.num_symbols, ), dtype=np.complex_, ), ) ) # Modulate symbols to a base-band signal frame_signal = self.__modulate(encoded_symbols, carrier_frequency) # Apply stream encoding configuration encoded_frame_signal = self.__transmit_stream_coding.encode(frame_signal) # Save results signal.append_samples(encoded_frame_signal) frames.append( CommunicationTransmissionFrame( signal=frame_signal, bits=data_bits, encoded_bits=encoded_bits, symbols=mapped_symbols, encoded_symbols=encoded_symbols, timestamp=n * frame_duration, ) ) # Save the transmitted information transmission = CommunicationTransmission(signal, frames) return transmission
[docs] class TransmittingModem( TransmittingModemBase[CommunicationWaveform], Transmitter[CommunicationTransmission], Serializable, ): """Representation of a wireless modem exclusively transmitting.""" yaml_tag = "TxModem" def __init__( self, *args, device: Device | None = None, selected_transmit_ports: Sequence[int] | None = None, **kwargs, ) -> None: """ Args: device (Device, optional): Device operated by the modem. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`TransmittingModemBase` for further details. """ # Initialize base classes # Note that the initialization order matters here Transmitter.__init__(self, selected_transmit_ports=selected_transmit_ports) TransmittingModemBase.__init__(self, *args, **kwargs) Serializable.__init__(self) self.device = device @property def transmitting_device(self) -> Device | None: # The transmitting device resolves to the operated device return self.device @property def receiving_device(self) -> Device | None: return None @Transmitter.device.setter # type: ignore def device(self, value: Device) -> None: if Transmitter.device.fget(self) is not None: # type: ignore self.device.transmitters.remove(self) if value is not None and self not in value.transmitters: value.transmitters.add(self) @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0 def _recall_transmission(self, group: Group) -> CommunicationTransmission: return CommunicationTransmission.from_HDF(group)
[docs] class ReceivingModemBase(Generic[CWT], BaseModem[CWT]): """Base class of signal processing algorithms receiving information.""" __receive_stream_coding: ReceiveStreamCoding def __init__(self, *args, **kwargs) -> None: self.__receive_stream_coding = ReceiveStreamCoding(modem=self) BaseModem.__init__(self, *args, **kwargs) @property def receive_stream_coding(self) -> ReceiveStreamCoding: """Stream MIMO coding configuration during signal reception. Returns: Handle to the coding configuration. """ return self.__receive_stream_coding @receive_stream_coding.setter def receive_stream_coding(self, value: ReceiveStreamCoding) -> None: self.__receive_stream_coding = value value.modem = self def __synchronize(self, received_signal: Signal) -> Tuple[List[int], List[Signal]]: """Synchronize a received MIMO base-band stream. Converts the stream into sections representing communication frames. Args: received_signal (Signal): The MIMO signal received over the operated device's RF chain. Returns: A sequence signals representing communication frames and their respective detection indices. """ # Synchronize raw MIMO data into frames frame_start_indices = self.waveform.synchronization.synchronize(received_signal[:, :]) frame_length = self.waveform.samples_per_frame synchronized_signals = [] for frame_start in frame_start_indices: frame_stop = frame_start + frame_length frame_samples = received_signal[:, frame_start:frame_stop] # Pad the frame if it is too short # This may happen if the last frame is incomplete, or synhronization is not perfect if frame_samples.shape[1] < frame_length: frame_samples = np.pad( frame_samples, ((0, 0), (0, frame_length - frame_samples.shape[1])), mode="constant", ) frame_signal = Signal.Create(frame_samples, received_signal.sampling_rate) synchronized_signals.append(frame_signal) return frame_start_indices, synchronized_signals def __demodulate(self, frame: Signal) -> Symbols: """Demodulates a sequence of synchronized MIMO signals into data symbols. Args: frame (Signal): Synchronized MIMO signal, representing the samples of a full communication frame. Returns: Demodulated frame symbols. """ symbols = Symbols() for stream in frame[:]: stream_symbols = self.waveform.demodulate(stream) symbols.append_stream(stream_symbols) return symbols def __unmap(self, symbols: Symbols) -> np.ndarray: """Unmap a set of communication symbols to information bits. Args: symbols (Symbols): The communication symbols to be unmapped Returns: A numpy array containing hard information bits. """ bits = np.empty(0, dtype=np.uint8) for stream in symbols.raw: bits = np.append(bits, self.waveform.unmap(Symbols(stream[np.newaxis, :, :]))) return bits
[docs] def _receive(self, signal: Signal) -> CommunicationReception: # Resample the signal to match the waveform's requirements signal = signal.resample(self.waveform.sampling_rate) # Synchronize incoming signals frame_start_indices, synchronized_signals = self.__synchronize(signal) # Abort if no frame has been detected if len(synchronized_signals) < 1: reception = CommunicationReception(signal) return reception # Compute the bit generation requirements required_num_data_bits, _ = self._bit_requirements() # Process each frame independently frames: List[CommunicationReceptionFrame] = [] for frame_index, frame_signal in zip(frame_start_indices, synchronized_signals): # Apply the stream transmit decoding configuration decoded_frame_signal = self.__receive_stream_coding.decode(frame_signal) # Demodulate raw symbols for each frame independtly symbols = self.__demodulate(decoded_frame_signal) # Estimate the channel from each frame demodulation frame_delay = frame_index / self.waveform.sampling_rate stated_symbols = self.waveform.estimate_channel(symbols, frame_delay) picked_symbols = self.waveform.pick(stated_symbols) # Decode the pre-equalization symbol precoding stage decoded_symbols = self.precoding.decode(picked_symbols) # Equalize the received symbols for each frame given the estimated channel state equalized_symbols = self.waveform.equalize_symbols(decoded_symbols) # Unmap equalized symbols to information bits encoded_bits = self.__unmap(equalized_symbols) # Apply inverse FEC configuration to correct errors and remove redundancies decoded_bits = self.encoder_manager.decode(encoded_bits, required_num_data_bits) # Store the received information frames.append( CommunicationReceptionFrame( signal=frame_signal, decoded_signal=decoded_frame_signal, symbols=symbols, decoded_symbols=decoded_symbols, timestamp=frame_index * signal.sampling_rate, equalized_symbols=equalized_symbols, encoded_bits=encoded_bits, decoded_bits=decoded_bits, ) ) # Store the received information of all frames reception = CommunicationReception(signal=signal, frames=frames) return reception
[docs] class ReceivingModem( ReceivingModemBase[CommunicationWaveform], Receiver[CommunicationReception], Serializable ): """Representation of a wireless modem exclusively receiving.""" yaml_tag = "RxModem" """YAML serialization tag""" def __init__( self, *args, device: Device | None = None, selected_receive_ports: Sequence[int] | None = None, **kwargs, ) -> None: """ Args: device (Device, optional): Device operated by the modem. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available antenna ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`ReceivingModemBase` for further details. """ # Initialize base classes # Note that the initialization order matters here Receiver.__init__(self, selected_receive_ports=selected_receive_ports) ReceivingModemBase.__init__(self, *args, **kwargs) Serializable.__init__(self) self.device = device @property def transmitting_device(self) -> Device | None: return None @property def receiving_device(self) -> Device | None: # The receiving device resolves to the operated device return self.device @Receiver.device.setter # type: ignore def device(self, value: Device) -> None: if Receiver.device.fget(self) is not None: # type: ignore self.device.receivers.remove(self) if value is not None and self not in value.receivers: value.receivers.add(self) @property def num_receive_ports(self) -> int: if self.receiving_device is None: return 0 else: return self.receiving_device.num_receive_ports @property def power(self) -> float: return self.waveform.power if self.waveform is not None else 0.0 def _recall_reception(self, group: Group) -> CommunicationReception: return CommunicationReception.from_HDF(group)
[docs] class DuplexModem(TransmittingModem, ReceivingModem): """Representation of a wireless modem simultaneously transmitting and receiving.""" yaml_tag = "Modem" def __init__( self, *args, bits_source: BitsSource | None = None, selected_transmit_ports: Sequence[int] | None = None, selected_receive_ports: Sequence[int] | None = None, **kwargs, ) -> None: """ Args: bits_source (BitsSource, optional): Source configuration of communication bits transmitted by this modem. Bits are randomly generated by default. selected_transmit_ports (Sequence[int] | None): Indices of antenna ports selected for transmission from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. selected_receive_ports (Sequence[int] | None): Indices of antenna ports selected for reception from the operated :class:`Device's<Device>` antenna array. If not specified, all available ports will be considered. *args, \**kwargs: Modem initialization parameters. Refer to :class:`TransmittingModem` and :class:`ReceivingModem` for further details. """ # Initialize base classes ReceivingModem.__init__(self, selected_receive_ports=selected_receive_ports, **kwargs) TransmittingModem.__init__( self, *args, bits_source=bits_source, selected_transmit_ports=selected_transmit_ports, **kwargs, ) @property def transmitting_device(self) -> Device | None: return TransmittingModem.transmitting_device.fget(self) # type: ignore @property def receiving_device(self) -> Device | None: return ReceivingModem.receiving_device.fget(self) # type: ignore @TransmittingModem.device.setter # type: ignore def device(self, value: Device) -> None: TransmittingModem.device.fset(self, value) ReceivingModem.device.fset(self, value)