# -*- coding: utf-8 -*-
from __future__ import annotations
from __future__ import annotations
from collections.abc import Sequence
import matplotlib.pyplot as plt
import numpy as np
from .device import Receiver, Reception, Transmitter, Transmission
from .hooks import Hook
from .monte_carlo import Artifact, Evaluation, ScalarEvaluationResult, Evaluator, GridDimension
from .visualize import StemVisualization, VAT
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class PowerResult(ScalarEvaluationResult):
"""The result of a received power evaluation.
Final result of power evaluations after all evaluation artifacts have been collected.
"""
__average_powers: np.ndarray
def __init__(
self,
average_powers: np.ndarray,
grid: Sequence[GridDimension],
evaluator: Evaluator,
plot_surface: bool = False,
) -> None:
"""
Args:
average_powers (numpy.ndarray):
The received power in Watts for each antenna stream.
grid (Sequence[GridDimension]):
The grid dimensions of the evaluation.
evaluator (Evaluator):
The evaluator that generated this result.
plot_surface (bool):
Whether two-dimensional evaluations should be plotted as surface plots.
"""
# Initialize the base class
scalars = np.sum(average_powers, axis=-1, keepdims=False)
ScalarEvaluationResult.__init__(self, grid, scalars, evaluator, plot_surface)
# Initialize class members
self.__average_powers = average_powers
@property
def average_powers(self) -> np.ndarray:
"""The received power in Watts for each antenna stream."""
return self.__average_powers
[docs]
def to_array(self) -> np.ndarray:
return self.__average_powers
[docs]
class PowerArtifact(Artifact):
"""The artifact of a received power evaluation.
Generated by the :meth:`artifact<hermespy.core.evaluators.PowerEvaluation.artifact>` of :class:`PowerEvaluation<hermespy.core.evaluators.PowerEvaluation>`.
"""
__power: np.ndarray
def __init__(self, power: np.ndarray) -> None:
"""
Args:
power: The received power in Watts for each antenna stream.
"""
# Initialize the base class
Artifact.__init__(self)
# Initialize class members
self.__power = power
@property
def power(self) -> np.ndarray:
"""The received power in Watts for each antenna stream."""
return self.__power
def __str__(self) -> str:
power_db = 10 * np.log10(np.sum(self.__power))
return f"{power_db:.2} dB"
[docs]
def to_scalar(self) -> float:
return float(np.sum(self.__power))
[docs]
class PowerEvaluation(Evaluation[StemVisualization]):
"""The evaluation of a received power evaluation.
Generated by evaluating power evaluators.
"""
__power: np.ndarray
def __init__(self, power: np.ndarray) -> None:
"""
Args:
power: The received power in Watts for each antenna stream.
"""
# Initialize the base class
super().__init__()
# Initialize class members
self.__power = power
@property
def power(self) -> np.ndarray:
"""The received power indicator in V^2 for each antenna stream."""
return self.__power
[docs]
def artifact(self) -> PowerArtifact:
return PowerArtifact(self.__power)
@property
def title(self) -> str:
return "Received Power"
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> StemVisualization:
ax: plt.Axes = axes.flat[0]
ax.set_xlabel("Antenna Stream")
ax.set_ylabel("Power [W]")
container = ax.stem(np.zeros_like(self.__power))
return StemVisualization(figure, axes, container)
def _update_visualization(self, visualization: StemVisualization, **kwargs) -> None:
visualization.container.markerline.set_ydata(self.__power)
[docs]
class ReceivePowerEvaluator(Evaluator):
"""Estimates the signal power received receivers."""
__receive_hook: Hook[Reception]
__reception: Reception | None
def __init__(self, target: Receiver, plot_scale: str = "log") -> None:
"""
Args:
target (Receiver):
The device or receiver to measure the received power of.
plot_scale (str):
The scale of the plot. Can be ``'linear'`` or ``'log'``.
"""
# Initialize the base class
Evaluator.__init__(self)
# Initialize class members
self.__receive_hook = target.add_receive_callback(self.__receive_callback)
self.__reception = None
self.plot_scale = plot_scale
def __receive_callback(self, reception: Reception) -> None:
"""Callback function notifying the evaluator of a new reception."""
self.__reception = reception
[docs]
def evaluate(self) -> PowerEvaluation:
if self.__reception is None:
raise RuntimeError(
"Power evaluator could not fetch reception. Has the receiver received data?"
)
power = self.__reception.signal.power
return PowerEvaluation(power)
@property
def abbreviation(self) -> str:
return "RxPwr"
@property
def title(self) -> str:
return "Receive Power"
[docs]
def generate_result(self, grid: Sequence[GridDimension], artifacts: np.ndarray) -> PowerResult:
# Find the maximum number of receive ports over all artifacts
max_ports = max(
max(artifact.power.size for artifact in artifacts) for artifacts in artifacts.flat
)
average_powers = np.zeros((*artifacts.shape, max_ports), dtype=np.float64)
for grid_index, artifacts in np.ndenumerate(artifacts):
for artifact in artifacts:
average_powers[grid_index] += artifact.power
num_artifacts = len(artifacts)
if num_artifacts > 0:
average_powers[grid_index] /= len(artifacts)
return PowerResult(average_powers, grid, self)
def __del__(self) -> None:
self.__receive_hook.remove()
[docs]
class TransmitPowerEvaluator(Evaluator):
"""Estimates the signal power transmitted by transmitters."""
__transmit_hook: Hook[Transmission]
__transmission: Transmission | None
def __init__(self, target: Transmitter, plot_scale: str = "log") -> None:
"""
Args:
target (Transmitter):
The device or receiver to measure the received power of.
plot_scale (str):
The scale of the plot. Can be ``'linear'`` or ``'log'``.
"""
# Initialize the base class
Evaluator.__init__(self)
# Initialize class members
self.__transmit_hook = target.add_transmit_callback(self.__transmit_callback)
self.__transmission = None
self.plot_scale = plot_scale
def __transmit_callback(self, transmission: Transmission) -> None:
"""Callback function notifying the evaluator of a new transmission."""
self.__transmission = transmission
[docs]
def evaluate(self) -> PowerEvaluation:
if self.__transmission is None:
raise RuntimeError(
"Power evaluator could not fetch transmission. Has the transmitter transmitted data?"
)
return PowerEvaluation(self.__transmission.signal.power)
@property
def abbreviation(self) -> str:
return "TxPwr"
@property
def title(self) -> str:
return "Transmit Power"
[docs]
def generate_result(self, grid: Sequence[GridDimension], artifacts: np.ndarray) -> PowerResult:
# Find the maximum number of receive ports over all artifacts
max_ports = max(
max(artifact.power.size for artifact in artifacts) for artifacts in artifacts.flat
)
average_powers = np.zeros((*artifacts.shape, max_ports), dtype=np.float64)
for grid_index, artifacts in np.ndenumerate(artifacts):
for artifact in artifacts:
average_powers[grid_index] += artifact.power
num_artifacts = len(artifacts)
if num_artifacts > 0:
average_powers[grid_index] /= len(artifacts)
return PowerResult(average_powers, grid, self)
def __del__(self) -> None:
self.__transmit_hook.remove()