Source code for hermespy.modem.evaluators

# -*- coding: utf-8 -*-

from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence

import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import uniform

from hermespy.core import (
    ArtifactTemplate,
    Serializable,
    Evaluator,
    EvaluationTemplate,
    GridDimension,
    Hook,
    PlotVisualization,
    ScalarEvaluationResult,
    StemVisualization,
    VAT,
)
from .modem import (
    CommunicationReception,
    CommunicationTransmission,
    TransmittingModem,
    ReceivingModem,
)

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.4.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class CommunicationEvaluator(Evaluator, ABC): """Base class for evaluating communication processes between two modems.""" __transmitting_modem: TransmittingModem __receiving_modem: ReceivingModem __transmit_hook: Hook[CommunicationTransmission] __receive_hook: Hook[CommunicationReception] __transmission: CommunicationTransmission | None __reception: CommunicationReception | None __plot_surface: bool def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Communication modem transmitting information. receiving_modem (ReceivingModem): Communication modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ # Initialize base class Evaluator.__init__(self) # Initialize class attributes self.__transmitting_modem = transmitting_modem self.__receiving_modem = receiving_modem self.__transmission = None self.__reception = None self.__plot_surface = plot_surface # Register callbacks for new transmissions and receptions self.__transmit_hook = transmitting_modem.add_transmit_callback(self.__transmit_callback) self.__receive_hook = receiving_modem.add_receive_callback(self.__receive_callback) @property def transmitting_modem(self) -> TransmittingModem: """Communication modem transmitting information.""" return self.__transmitting_modem @property def receiving_modem(self) -> ReceivingModem: """Communication modem receiving information.""" return self.__receiving_modem def __transmit_callback(self, transmission: CommunicationTransmission) -> None: """Callback function notifying the evaluator of a new transmission.""" self.__transmission = transmission def __receive_callback(self, reception: CommunicationReception) -> None: """Callback function notifying the evaluator of a new reception.""" self.__reception = reception def _fetch_dsp_results(self) -> tuple[CommunicationTransmission, CommunicationReception]: """Fetches the cached communication transmission and reception results. Raises: RuntimeError: If the transmission or reception results are not available. Returns: Tuple of transmission and reception. """ if self.__transmission is None: raise RuntimeError( "Communication evaluator could not fetch transmission. Has the modem transmitted data?" ) if self.__reception is None: raise RuntimeError( "Communication evaluator could not fetch reception. Has the modem received data?" ) return self.__transmission, self.__reception
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> ScalarEvaluationResult: return ScalarEvaluationResult.From_Artifacts(grid, artifacts, self, self.__plot_surface)
def __del__(self) -> None: """Destructor of the communication evaluator. Ensures that the hooks are removed when the evaluator is deleted. """ self.__transmit_hook.remove() self.__receive_hook.remove()
class ErrorEvaluation(EvaluationTemplate[np.ndarray, StemVisualization], ABC): """Base class for error evaluations between two modems exchanging information.""" @property @abstractmethod def _x_axis_label(self) -> str: """Label of the visualization's x-axis.""" ... # pragma: no cover @property @abstractmethod def _y_axis_label(self) -> str: """Label of the visualization's y-axis.""" ... # pragma: no cover def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> StemVisualization: # Configure axes axes[0, 0].set_ylim([-0.1, 1.1]) axes[0, 0].set_xlabel(self._x_axis_label) axes[0, 0].set_ylabel(self._y_axis_label) axes[0, 0].set_yticks([]) # Plot horizone lines for reference indicators axes[0, 0].axhline(0.0, linestyle="--") axes[0, 0].axhline(1.0, linestyle="--") axes[0, 0].text(-1.0, -0.06, "Correct", fontweight="bold") axes[0, 0].text(-1.0, 1.06, "Error", fontweight="bold") stem = axes[0, 0].stem(np.zeros(self.evaluation.size), basefmt=" ") return StemVisualization(figure, axes, stem) def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None: # Update markers visualization.container.markerline.set_ydata(self.evaluation) # ToDo: Update segemts representing the stem lines # for line, bit in zip(visualization.container.stemlines, self.evaluation.flat): # line.set_ydata(bit)
[docs] class BitErrorArtifact(ArtifactTemplate[np.float64]): """Artifact of a bit error evaluation between two modems exchanging information. Generated by :meth:`artifact()<BitErrorEvaluation.artifact>` of :class:`BitErrorEvaluation`. """ ... # pragma: no cover
[docs] class BitErrorEvaluation(ErrorEvaluation): """Bit error evaluation between two modems exchanging information. Generated by :meth:`evaluate()<BitErrorEvaluator.evaluate>` of :class:`BitErrorEvaluator`. """ @property def title(self) -> str: return "Bit Error Evaluation" @property def _x_axis_label(self) -> str: return "Bit Index" @property def _y_axis_label(self) -> str: return "Bit Error Indicator"
[docs] def artifact(self) -> BitErrorArtifact: ber = np.mean(self.evaluation) return BitErrorArtifact(ber)
[docs] class BitErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate bit errors between two modems exchanging information.""" yaml_tag = "BitErrorEvaluator" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> BitErrorEvaluation: # Retrieve transmitted and received bits transmission, reception = self._fetch_dsp_results() transmitted_bits = transmission.bits received_bits = reception.bits # Pad bit sequences (if required) num_bits = max(len(received_bits), len(transmitted_bits)) padded_transmission = np.append( transmitted_bits, np.zeros(num_bits - len(transmitted_bits)) ) padded_reception = np.append(received_bits, np.zeros(num_bits - len(received_bits))) # Compute bit errors as the positions where both sequences differ. # Note that this requires the sequences to be in 0/1 format! bit_errors = np.abs(padded_transmission - padded_reception) return BitErrorEvaluation(bit_errors)
@property def abbreviation(self) -> str: return "BER" @property def title(self) -> str: return "Bit Error Rate Evaluation" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class BlockErrorArtifact(ArtifactTemplate[np.float64]): """Artifact of a block error evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class BlockErrorEvaluation(ErrorEvaluation): """Block error evaluation of a single communication process between modems.""" @property def title(self) -> str: return "Block Error Evaluation" @property def _x_axis_label(self) -> str: return "Block Index" @property def _y_axis_label(self) -> str: return "Block Error Indicator"
[docs] def artifact(self) -> BlockErrorArtifact: bler = np.mean(self.evaluation) return BlockErrorArtifact(bler)
[docs] class BlockErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate block errors between two modems exchanging information.""" yaml_tag = "BlockErrorEvaluator" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> BlockErrorEvaluation: # Retrieve transmittend and received data transmission, reception = self._fetch_dsp_results() # Compare the decoded bit streams of each communication frame partioned into blocks into blocks # Every block with at least one bit error is considered a block error and increases the error counter num_tx_blocks = 0 for tx_frame in transmission.frames: num_tx_blocks += tx_frame.bits.size // tx_frame.bit_block_size num_rx_blocks = 0 for rx_frame in reception.frames: num_rx_blocks += rx_frame.decoded_bits.size // rx_frame.bit_block_size block_errors = np.ones(max(num_tx_blocks, num_rx_blocks), dtype=bool) b = 0 for tx_frame, rx_frame in zip(transmission.frames, reception.frames): tx_blocks = tx_frame.bits.reshape((-1, tx_frame.bit_block_size)) rx_blocks = rx_frame.decoded_bits.reshape((-1, rx_frame.bit_block_size)) num_blocks = max(tx_blocks.shape[0], rx_blocks.shape[0]) block_errors[b : b + num_blocks] = np.any(tx_blocks != rx_blocks, axis=1) b += num_blocks return BlockErrorEvaluation(block_errors)
@property def title(self) -> str: return "Block Error Rate" @property def abbreviation(self) -> str: return "BLER" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class FrameErrorArtifact(ArtifactTemplate[float]): """Artifact of a frame error evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class FrameErrorEvaluation(ErrorEvaluation): """Frame error evaluation of a single communication process between modems.""" @property def title(self) -> str: return "Frame Error Evaluation" @property def _x_axis_label(self) -> str: return "Frame Index" @property def _y_axis_label(self) -> str: return "Frame Error Indicator"
[docs] def artifact(self) -> FrameErrorArtifact: bler = float(np.mean(self.evaluation)) return FrameErrorArtifact(bler)
[docs] class FrameErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate frame errors between two modems exchanging information.""" yaml_tag = "FrameErrorEvaluator" """YAML serialization tag""" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> FrameErrorEvaluation: # Retrieve transmitted and received information transmission, reception = self._fetch_dsp_results() # The initial number of frame errors is the difference in transmitted and received frames, # since every dropped frame is considered an error frame_errors = np.ones(max(transmission.num_frames, reception.num_frames), dtype=bool) # Compare the decoded bit streams of each communication frame # If they differ, increase the frame errror count for f, (tx_frame, rx_frame) in enumerate(zip(transmission.frames, reception.frames)): frame_errors[f] = not np.array_equiv(tx_frame.bits, rx_frame.decoded_bits) return FrameErrorEvaluation(frame_errors)
@property def title(self) -> str: return "Frame Error Rate" @property def abbreviation(self) -> str: return "FER" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class ThroughputArtifact(ArtifactTemplate[float]): """Artifact of a throughput evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class ThroughputEvaluation(EvaluationTemplate[float, PlotVisualization]): """Throughput evaluation between two modems exchanging information.""" def __init__( self, bits_per_frame: int, frame_duration: float, frame_errors: np.ndarray ) -> None: """ Args: bits_per_frame (int): Number of bits per communication frame frame_duration (float): Duration of a single communication frame in seconds frame_errors (numpy.ndarray): Frame error indicators """ num_frames = len(frame_errors) num_correct_frames = np.sum(np.invert(frame_errors)) throughput = num_correct_frames * bits_per_frame / (num_frames * frame_duration) EvaluationTemplate.__init__(self, throughput) @property def title(self) -> str: return "Data Throughput"
[docs] def artifact(self) -> ThroughputArtifact: return ThroughputArtifact(self.evaluation)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: lines = np.empty_like(axes, dtype=np.object_) return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: pass
[docs] class ThroughputEvaluator(CommunicationEvaluator, Serializable): """Evaluate data throughput between two modems exchanging information.""" yaml_tag = "ThroughputEvaluator" """YAML serialization tag""" __framer_error_evaluator: FrameErrorEvaluator def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ # Initialize base class CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) # Initialize class attributes self.__framer_error_evaluator = FrameErrorEvaluator(transmitting_modem, receiving_modem)
[docs] def evaluate(self) -> ThroughputEvaluation: # Get the frame errors frame_errors = self.__framer_error_evaluator.evaluate().evaluation.flatten() _, reception = self._fetch_dsp_results() # Transform frame errors to data throughput bits_per_frame = reception.frames[0].decoded_bits.size frame_duration = reception.frames[0].signal.duration return ThroughputEvaluation(bits_per_frame, frame_duration, frame_errors)
@property def title(self) -> str: return "Data Throughput" @property def abbreviation(self) -> str: return "DRX"
[docs] class EVMArtifact(ArtifactTemplate[float]): """Artifact of a error vector magnitude (EVM) evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class EVMEvaluation(EvaluationTemplate[float, PlotVisualization]): __transmitted_symbols: np.ndarray __received_symbols: np.ndarray __evm: float def __init__(self, transmitted_symbols: np.ndarray, received_symbols: np.ndarray) -> None: """ Args: transmitted_symbols (numpy.ndarray): Originally transmitted communication symbols. received_symbols (numpy.ndarray): Received communication symbols. """ _transmitted_symbols = transmitted_symbols.flatten() _received_symbols = received_symbols.flatten() size = min(transmitted_symbols.size, received_symbols.size) self.__transmitted_symbols = _transmitted_symbols[:size] self.__received_symbols = _received_symbols[:size] self.__evm = np.sqrt( np.mean(np.abs(self.__transmitted_symbols[:size] - self.__received_symbols[:size]) ** 2) ) @property def title(self) -> str: return "Error Vector Magnitude" @property def abbreviation(self) -> str: return "EVM"
[docs] def artifact(self) -> EVMArtifact: return EVMArtifact(self.__evm)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: lines = np.empty_like(axes, dtype=np.object_) return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: pass
[docs] class ConstellationEVM(CommunicationEvaluator): """Evaluate the error vector magnitude (EVM) of a constellation diagram."""
[docs] def evaluate(self) -> EVMEvaluation: # Retrieve transmitted and received symbols transmission, reception = self._fetch_dsp_results() return EVMEvaluation(transmission.symbols.raw, reception.equalized_symbols.raw)
@property def title(self) -> str: return "Error Vector Magnitude" @property def abbreviation(self) -> str: return "EVM"