Source code for hermespy.modem.evaluators

# -*- coding: utf-8 -*-

from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Sequence

import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import uniform

from hermespy.core import (
    ArtifactTemplate,
    Serializable,
    Evaluator,
    EvaluationTemplate,
    GridDimension,
    PlotVisualization,
    ScalarEvaluationResult,
    StemVisualization,
    VAT,
)
from .modem import TransmittingModem, ReceivingModem

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class CommunicationEvaluator(Evaluator, ABC): """Base class for evaluating communication processes between two modems.""" __transmitting_modem: TransmittingModem # Handle to the transmitting modem __receiving_modem: ReceivingModem # Handle to the receiving modem __plot_surface: bool def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ self.__transmitting_modem = transmitting_modem self.__receiving_modem = receiving_modem self.__plot_surface = plot_surface # Initialize base class Evaluator.__init__(self) @property def transmitting_modem(self) -> TransmittingModem: """Modem transmitting information. Denoted by :math:`(\\alpha)` within the respective equations. """ return self.__transmitting_modem @property def receiving_modem(self) -> ReceivingModem: """Modem receiving information. Denoted by :math:`(\\beta)` within the respective equations. """ return self.__receiving_modem
[docs] def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> ScalarEvaluationResult: return ScalarEvaluationResult.From_Artifacts(grid, artifacts, self, self.__plot_surface)
class ErrorEvaluation(EvaluationTemplate[np.ndarray, StemVisualization], ABC): """Base class for error evaluations between two modems exchanging information.""" @property @abstractmethod def _x_axis_label(self) -> str: """Label of the visualization's x-axis.""" ... # pragma: no cover @property @abstractmethod def _y_axis_label(self) -> str: """Label of the visualization's y-axis.""" ... # pragma: no cover def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> StemVisualization: # Configure axes axes[0, 0].set_ylim([-0.1, 1.1]) axes[0, 0].set_xlabel(self._x_axis_label) axes[0, 0].set_ylabel(self._y_axis_label) axes[0, 0].set_yticks([]) # Plot horizone lines for reference indicators axes[0, 0].axhline(0.0, linestyle="--") axes[0, 0].axhline(1.0, linestyle="--") axes[0, 0].text(-1.0, -0.06, "Correct", fontweight="bold") axes[0, 0].text(-1.0, 1.06, "Error", fontweight="bold") stem = axes[0, 0].stem(np.zeros(self.evaluation.size), basefmt=" ") return StemVisualization(figure, axes, stem) def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None: # Update markers visualization.container.markerline.set_ydata(self.evaluation) # ToDo: Update segemts representing the stem lines # for line, bit in zip(visualization.container.stemlines, self.evaluation.flat): # line.set_ydata(bit)
[docs] class BitErrorArtifact(ArtifactTemplate[np.float_]): """Artifact of a bit error evaluation between two modems exchanging information. Generated by :meth:`artifact()<BitErrorEvaluation.artifact>` of :class:`BitErrorEvaluation`. """ ... # pragma: no cover
[docs] class BitErrorEvaluation(ErrorEvaluation): """Bit error evaluation between two modems exchanging information. Generated by :meth:`evaluate()<BitErrorEvaluator.evaluate>` of :class:`BitErrorEvaluator`. """ @property def title(self) -> str: return "Bit Error Evaluation" @property def _x_axis_label(self) -> str: return "Bit Index" @property def _y_axis_label(self) -> str: return "Bit Error Indicator"
[docs] def artifact(self) -> BitErrorArtifact: ber = np.mean(self.evaluation) return BitErrorArtifact(ber)
[docs] class BitErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate bit errors between two modems exchanging information.""" yaml_tag = "BitErrorEvaluator" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> BitErrorEvaluation: # Retrieve transmitted and received bits transmitted_bits = self.transmitting_modem.transmission.bits received_bits = self.receiving_modem.reception.bits # Pad bit sequences (if required) num_bits = max(len(received_bits), len(transmitted_bits)) padded_transmission = np.append( transmitted_bits, np.zeros(num_bits - len(transmitted_bits)) ) padded_reception = np.append(received_bits, np.zeros(num_bits - len(received_bits))) # Compute bit errors as the positions where both sequences differ. # Note that this requires the sequences to be in 0/1 format! bit_errors = np.abs(padded_transmission - padded_reception) return BitErrorEvaluation(bit_errors)
@property def abbreviation(self) -> str: return "BER" @property def title(self) -> str: return "Bit Error Rate Evaluation" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class BlockErrorArtifact(ArtifactTemplate[np.float_]): """Artifact of a block error evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class BlockErrorEvaluation(ErrorEvaluation): """Block error evaluation of a single communication process between modems.""" @property def title(self) -> str: return "Block Error Evaluation" @property def _x_axis_label(self) -> str: return "Block Index" @property def _y_axis_label(self) -> str: return "Block Error Indicator"
[docs] def artifact(self) -> BlockErrorArtifact: bler = np.mean(self.evaluation) return BlockErrorArtifact(bler)
[docs] class BlockErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate block errors between two modems exchanging information.""" yaml_tag = "BlockErrorEvaluator" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> BlockErrorEvaluation: # Retrieve transmitted and received bits transmitted_bits = self.transmitting_modem.transmission.bits received_bits = self.receiving_modem.reception.bits block_size = self.receiving_modem.encoder_manager.bit_block_size # Pad bit sequences (if required) received_bits = np.append(received_bits, np.zeros(received_bits.shape[0] % block_size)) if transmitted_bits.shape[0] >= received_bits.shape[0]: transmitted_bits = transmitted_bits[: received_bits.shape[0]] else: transmitted_bits = np.append( transmitted_bits, -np.ones(received_bits.shape[0] - transmitted_bits.shape[0]) ) # Compute bit errors as the positions where both sequences differ. # Note that this requires the sequences to be in 0/1 format! bit_errors = np.abs(transmitted_bits - received_bits) block_errors = bit_errors.reshape((-1, block_size)).sum(axis=1) > 0 return BlockErrorEvaluation(block_errors)
@property def title(self) -> str: return "Block Error Rate" @property def abbreviation(self) -> str: return "BLER" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class FrameErrorArtifact(ArtifactTemplate[float]): """Artifact of a frame error evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class FrameErrorEvaluation(ErrorEvaluation): """Frame error evaluation of a single communication process between modems.""" @property def title(self) -> str: return "Frame Error Evaluation" @property def _x_axis_label(self) -> str: return "Frame Index" @property def _y_axis_label(self) -> str: return "Frame Error Indicator"
[docs] def artifact(self) -> FrameErrorArtifact: bler = float(np.mean(self.evaluation)) return FrameErrorArtifact(bler)
[docs] class FrameErrorEvaluator(CommunicationEvaluator, Serializable): """Evaluate frame errors between two modems exchanging information.""" yaml_tag = "FrameErrorEvaluator" """YAML serialization tag""" def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) self.plot_scale = "log" # Plot logarithmically by default
[docs] def evaluate(self) -> FrameErrorEvaluation: # Retrieve transmitted and received bits transmitted_bits = self.transmitting_modem.transmission.bits received_bits = self.receiving_modem.reception.bits frame_size = self.receiving_modem.num_data_bits_per_frame if frame_size < 1: return FrameErrorEvaluation(np.empty(0, dtype=np.int_)) # Pad bit sequences (if required) received_bits = np.append(received_bits, np.zeros(received_bits.shape[0] % frame_size)) if transmitted_bits.shape[0] >= received_bits.shape[0]: transmitted_bits = transmitted_bits[: received_bits.shape[0]] else: transmitted_bits = np.append( transmitted_bits, -np.ones(received_bits.shape[0] - transmitted_bits.shape[0]) ) # Compute bit errors as the positions where both sequences differ. # Note that this requires the sequences to be in 0/1 format! bit_errors = np.abs(transmitted_bits - received_bits) frame_errors = bit_errors.reshape((-1, frame_size)).sum(axis=1) > 0 return FrameErrorEvaluation(frame_errors)
@property def title(self) -> str: return "Frame Error Rate" @property def abbreviation(self) -> str: return "FER" @staticmethod def _scalar_cdf(scalar: float) -> float: return uniform.cdf(scalar)
[docs] class ThroughputArtifact(ArtifactTemplate[float]): """Artifact of a throughput evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class ThroughputEvaluation(EvaluationTemplate[float, PlotVisualization]): """Throughput evaluation between two modems exchanging information.""" def __init__( self, bits_per_frame: int, frame_duration: float, frame_errors: np.ndarray ) -> None: """ Args: bits_per_frame (int): Number of bits per communication frame frame_duration (float): Duration of a single communication frame in seconds frame_errors (np.ndarray): Frame error indicators """ num_frames = len(frame_errors) num_correct_frames = np.sum(np.invert(frame_errors)) throughput = num_correct_frames * bits_per_frame / (num_frames * frame_duration) EvaluationTemplate.__init__(self, throughput) @property def title(self) -> str: return "Data Throughput"
[docs] def artifact(self) -> ThroughputArtifact: return ThroughputArtifact(self.evaluation)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: lines = np.empty_like(axes, dtype=np.object_) return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: pass
[docs] class ThroughputEvaluator(CommunicationEvaluator, Serializable): """Evaluate data throughput between two modems exchanging information.""" yaml_tag = "ThroughputEvaluator" """YAML serialization tag""" __framer_error_evaluator: FrameErrorEvaluator def __init__( self, transmitting_modem: TransmittingModem, receiving_modem: ReceivingModem, plot_surface: bool = True, ) -> None: """ Args: transmitting_modem (TransmittingModem): Modem transmitting information. receiving_modem (ReceivingModem): Modem receiving information. plot_surface (bool, optional): Plot the surface of the evaluation result in two-dimensional grids. Defaults to True. """ # Initialize base class CommunicationEvaluator.__init__(self, transmitting_modem, receiving_modem, plot_surface) # Initialize class attributes self.__framer_error_evaluator = FrameErrorEvaluator(transmitting_modem, receiving_modem)
[docs] def evaluate(self) -> ThroughputEvaluation: # Get the frame errors frame_errors = self.__framer_error_evaluator.evaluate().evaluation.flatten() # Transform frame errors to data throughput bits_per_frame = self.receiving_modem.num_data_bits_per_frame frame_duration = self.receiving_modem.frame_duration return ThroughputEvaluation(bits_per_frame, frame_duration, frame_errors)
@property def title(self) -> str: return "Data Throughput" @property def abbreviation(self) -> str: return "DRX"
[docs] class EVMArtifact(ArtifactTemplate[float]): """Artifact of a error vector magnitude (EVM) evaluation between two modems exchanging information.""" ... # pragma: no cover
[docs] class EVMEvaluation(EvaluationTemplate[float, PlotVisualization]): __transmitted_symbols: np.ndarray __received_symbols: np.ndarray __evm: float def __init__(self, transmitted_symbols: np.ndarray, received_symbols: np.ndarray) -> None: """ Args: transmitted_symbols (np.ndarray): Originally transmitted communication symbols. received_symbols (np.ndarray): Received communication symbols. """ _transmitted_symbols = transmitted_symbols.flatten() _received_symbols = received_symbols.flatten() size = min(transmitted_symbols.size, received_symbols.size) self.__transmitted_symbols = _transmitted_symbols[:size] self.__received_symbols = _received_symbols[:size] self.__evm = np.sqrt( np.mean(np.abs(self.__transmitted_symbols[:size] - self.__received_symbols[:size]) ** 2) ) @property def title(self) -> str: return "Error Vector Magnitude" @property def abbreviation(self) -> str: return "EVM"
[docs] def artifact(self) -> EVMArtifact: return EVMArtifact(self.__evm)
def _prepare_visualization( self, figure: plt.Figure | None, axes: VAT, **kwargs ) -> PlotVisualization: lines = np.empty_like(axes, dtype=np.object_) return PlotVisualization(figure, axes, lines) def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None: pass
[docs] class ConstellationEVM(CommunicationEvaluator): """Evaluate the error vector magnitude (EVM) of a constellation diagram."""
[docs] def evaluate(self) -> EVMEvaluation: return EVMEvaluation( self.transmitting_modem.transmission.symbols.raw, self.receiving_modem.reception.equalized_symbols.raw, )
@property def title(self) -> str: return "Error Vector Magnitude" @property def abbreviation(self) -> str: return "EVM"