# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from collections.abc import Sequence
from contextlib import nullcontext
from functools import reduce
from math import exp, sqrt
from os import path
from time import perf_counter
from typing import (
Any,
Callable,
Generic,
List,
Mapping,
Optional,
Type,
TypeVar,
Tuple,
Union,
SupportsFloat,
)
from warnings import catch_warnings, simplefilter
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import ray
from matplotlib.axis import Axis
from matplotlib.lines import Line2D
from mpl_toolkits.mplot3d import Axes3D # type: ignore
from ray.util import ActorPool
from rich.console import Console, Group
from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn
from rich.live import Live
from rich.table import Table
from scipy.constants import pi
from scipy.io import savemat
from scipy.stats import norm
from hermespy.tools import lin2db
from .definitions import ConsoleMode
from .logarithmic import LogarithmicSequence, ValueType
from .visualize import PlotVisualization, VAT, Visualizable, VT
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.3.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
MO = TypeVar("MO")
"""Type of Monte Carlo object under investigation.
:meta private:
"""
AT = TypeVar("AT")
"""Type of Monte Carlo evaluation artifact.
:meta private:
"""
FAT = TypeVar("FAT", bound=SupportsFloat)
"""Type of floating point Monte Carlo evaluation artifact.
:meta private:
"""
ET = TypeVar("ET")
"""Type of Monte Carlo evaluation.
:meta private:
"""
SERT = TypeVar("SERT", bound="ScalarEvaluationResult")
"""Type of scalar Monte Carlo evaluation result.
:meta private:
"""
EV = TypeVar("EV", bound="Evaluator")
"""Type of Monte Carlo evalutor.
:meta private:
"""
[docs]
class Artifact(object):
"""Result of an investigated object evaluation.
Generated by :class:`.Evaluator` instances operating on investigated object states.
In other words, :meth:`.Evaluator.evaluate` is expected to return an instance derived of this base class.
Artifacts may, in general represent any sort of object.
However, it is encouraged to provide a scalar floating-point representation for data visualization by implementing
the :meth:`.to_scalar` method.
"""
@abstractmethod
def __str__(self) -> str:
"""String representation of this artifact.
Will be used to visualize the artifact's content in console outputs.
Returns:
str: String representation.
"""
... # pragma: no cover
[docs]
@abstractmethod
def to_scalar(self) -> float | None:
"""Scalar representation of this artifact's content.
Used to evaluate premature stopping criteria for the underlying evaluation.
Returns:
Scalar floating-point representation.
`None` if a conversion to scalar is impossible.
"""
... # pragma: no cover
[docs]
class ArtifactTemplate(Generic[FAT], Artifact):
"""Scalar numerical result of an investigated object evaluation.
Implements the common case of an :class:`.Artifact` representing a scalar numerical value.
"""
__artifact: FAT # Artifact
def __init__(self, artifact: FAT) -> None:
"""
Args:
artifact (AT):
Artifact value.
"""
self.__artifact = artifact
@property
def artifact(self) -> FAT:
"""Evaluation artifact.
Provides direct access to the represented artifact.
Returns:
AT: Copy of the artifact.
"""
return self.__artifact
def __str__(self) -> str:
return f"{self.to_scalar():.3f}"
[docs]
def to_scalar(self) -> float:
return float(self.artifact)
[docs]
class Evaluation(Generic[VT], Visualizable[VT]):
"""Evaluation of a single simulation sample.
Evaluations are generated by :class:`Evaluators <Evaluator>`
during :meth:`Evaluator.evaluate`.
"""
[docs]
@abstractmethod
def artifact(self) -> Artifact:
"""Generate an artifact from this evaluation.
Returns: The evaluation artifact."""
... # pragma: no cover
[docs]
class EvaluationTemplate(Generic[ET, VT], Evaluation[VT], ABC):
"""Template class for simple evaluations containing a single object."""
__evaluation: ET # Evaluation
def __init__(self, evaluation: ET) -> None:
"""
Args:
evaluation (ET): The represented evaluation.
"""
# Initialize base class
super().__init__()
# Initialize class attributes
self.__evaluation = evaluation
@property
def evaluation(self) -> ET:
"""The represented evaluation."""
return self.__evaluation
[docs]
class EvaluationResult(Visualizable[PlotVisualization], ABC):
"""Result of an evaluation routine iterating over a parameter grid.
Evaluation results are generated by :class:`Evaluator Instances <.Evaluator>` as a final
step within the evaluation routine.
"""
__grid: Sequence[GridDimension]
def __init__(self, grid: Sequence[GridDimension], evaluator: Evaluator) -> None:
"""
Args:
grid (Sequence[GridDimension]):
Parameter grid over which the simulation generating this result iterated.
evaluator (Evaluator):
Evaluator that generated this result.
"""
# Initialize base class
Visualizable.__init__(self)
# Initialize class attributes
self.__grid = grid
self.__evaluator = evaluator
@property
def grid(self) -> Sequence[GridDimension]:
"""Paramter grid over which the simulation iterated."""
return self.__grid
@property
def evaluator(self) -> Evaluator:
"""Evaluator that generated this result."""
return self.__evaluator
def __result_to_str(self, value: Any) -> str:
"""Convert a single result value to its string representation.
Args:
value (Any): The value to be converted.
Returns: The values string representation.
"""
if isinstance(value, (int, float, complex)):
if self.evaluator.tick_format == ValueType.DB or self.evaluator.plot_scale == "log":
return f"{lin2db(value):.2g} dB"
return f"{value:.2g}"
return str(value)
[docs]
def print(self, console: Console | None = None) -> None:
"""Print a readable version of this evaluation result.
Args:
console (Console | None):
Rich console to print in.
If not provided, a new one will be generated.
"""
_console = Console() if console is None else console
# Query the array representation of this evaluation result
results = self.to_array()
# Render results into a rich table
table = Table()
for dimension in self.grid:
table.add_column(dimension.title, style="cyan")
table.add_column(self.evaluator.abbreviation)
for grid_section in np.ndindex(*[d.num_sample_points for d in self.grid]):
columns = [g.sample_points[s].title for s, g in zip(grid_section, self.grid)]
result = results[grid_section]
result_str: str
if isinstance(result, np.ndarray):
if result.size == 1:
result_str = self.__result_to_str(result.flat[0])
else:
result_str = ""
for value in result.flat:
result_str += self.__result_to_str(value) + ", "
result_str = result_str[:-2]
else:
result_str = self.__result_to_str(result)
table.add_row(*columns, result_str)
_console.print(table)
[docs]
@abstractmethod
def to_array(self) -> np.ndarray:
"""Convert the evaluation result raw data to an array representation.
Used to store the results in arbitrary binary file formats after simulation execution.
Returns:
The array result representation.
"""
... # pragma: no cover
@staticmethod
def _configure_axis(axis: Axis, tick_format: ValueType = ValueType.LIN) -> None:
"""Subroutine to configure an axis.
Args:
axis (Axis):
The matplotlib axis to be configured.
tick_format (ValueType):
The tick format.
"""
if tick_format is ValueType.DB:
axis.set_major_formatter(ticker.FuncFormatter(lambda x, _: f"{lin2db(x):.2g}dB"))
def _prepare_surface_visualization(self, ax: Axes3D) -> List[Line2D]:
# Configure axes labels and scales
ax.set(
xlabel=self.grid[0].title, ylabel=self.grid[1].title, zlabel=self.evaluator.abbreviation
)
EvaluationResult._configure_axis(ax.xaxis, self.grid[0].tick_format)
EvaluationResult._configure_axis(ax.yaxis, self.grid[1].tick_format)
EvaluationResult._configure_axis(ax.zaxis, self.evaluator.tick_format)
# x_points = np.asarray([s.value for s in self.grid[0].sample_points])
# y_points = np.asarray([s.value for s in self.grid[1].sample_points])
# y, x = np.meshgrid(y_points, x_points)
# ax.plot_surface(x.astype(float), y.astype(float), np.zeros_like(y, dtype=np.float_))
return [] # 3D plotting returns a poly3d collection that is not supported
def _update_surface_visualization(
self, data: np.ndarray, visualization: PlotVisualization
) -> None:
"""Plot two-dimensional simulation results into a three-dimensional axes system."""
x_points = np.asarray([s.value for s in self.grid[0].sample_points])
y_points = np.asarray([s.value for s in self.grid[1].sample_points])
y, x = np.meshgrid(y_points, x_points)
visualization.axes[0, 0].plot_surface(x.astype(float), y.astype(float), data)
def _prepare_multidim_visualization(self, ax: plt.Axes) -> List[Line2D]:
# Abort if the grid contains no data
if len(self.grid) < 1:
return []
# For now, the x-dimension will always be the first simulation iteration dimension
x_dimension = 0
x_points = np.asarray([s.value for s in self.grid[x_dimension].sample_points])
# Configure axes labels
ax.set_xlabel(self.grid[x_dimension].title)
ax.set_ylabel(self.evaluator.abbreviation)
# Configure axes scales
ax.set_yscale(self.evaluator.plot_scale)
ax.set_xscale(self.grid[x_dimension].plot_scale)
# Configure axes ticks
EvaluationResult._configure_axis(ax.xaxis, self.grid[x_dimension].tick_format)
EvaluationResult._configure_axis(ax.yaxis, self.evaluator.tick_format)
subgrid = list(self.grid).copy()
del subgrid[x_dimension]
lines: List[Line2D] = []
section_magnitudes = tuple(s.num_sample_points for s in subgrid)
for section_indices in np.ndindex(section_magnitudes):
# Generate the graph line label
line_label = ""
for i, v in enumerate(section_indices):
line_label += f"{self.grid[i+1].title} = {self.grid[i+1].sample_points[v].title}, "
line_label = line_label[:-2]
if len(line_label) < 1:
line_label = None
# Plot the graph line
lines.extend(
ax.plot(x_points, np.zeros_like(x_points, dtype=np.float_), label=line_label)
)
return lines
def _update_multidim_visualization(
self, scalar_data: np.ndarray, visualization: PlotVisualization
) -> None:
"""Plot multidimensional simulation results into a two-dimensional axes system.
Args:
scalar_data (np.ndarray):
Scalar data to be plotted.
visualization (PlotVisualization):
Visualization to plot into.
"""
x_dimension = 0
subgrid = list(self.grid).copy()
del subgrid[x_dimension]
section_magnitudes = tuple(s.num_sample_points for s in subgrid)
line: Line2D
for section_indices, line in zip(np.ndindex(section_magnitudes), visualization.lines[0, 0]):
# Generate the graph line label
line_label = ""
for i, v in enumerate(section_indices):
line_label += f"{self.grid[i+1].title} = {self.grid[i+1].sample_points[v].title}, "
line_label = line_label[:-2]
if len(line_label) < 1:
line_label = None
# Select the graph line scalars
line_scalars = scalar_data[(..., *section_indices)] # type: ignore
# Update the respective graph line
line.set_ydata(line_scalars)
visualization.axes[0, 0].relim()
visualization.axes[0, 0].autoscale_view()
@staticmethod
def _plot_empty(visualization: PlotVisualization) -> None:
"""Plot an empty notice into the provided axes.
Args:
plot (Plot): Plot to visualize in.
"""
ax: plt.Axes
for ax in visualization.axes.flat:
ax.text(0.5, 0.5, "NO DATA AVAILABLE", horizontalalignment="center")
[docs]
class ScalarEvaluationResult(EvaluationResult):
"""Base class for scalar evaluation results."""
plot_surface: bool
__scalar_results: np.ndarray
__base_dimension_index: int
def __init__(
self,
grid: Sequence[GridDimension],
scalar_results: np.ndarray,
evaluator: Evaluator,
plot_surface: bool = True,
) -> None:
"""
Args:
grid (Sequence[GridDimension]):
Simulation grid.
scalar_results (np.ndarray):
Scalar results generated from collecting samples over the simulation grid.
evaluator (Evaluator):
The evaluator generating the results.
plot_surface (bool, optional):
Enable surface plotting for two-dimensional grids.
Enabled by default.
"""
# Initialize base class
EvaluationResult.__init__(self, grid, evaluator)
# Initialize class attributes
self.__scalar_results = scalar_results
self.plot_surface = plot_surface
self.__base_dimension_index = 0
@property
def title(self) -> str:
# The plotting title should resolve to the represented evaluator's title
return self.evaluator.title
def _prepare_visualization(
self, figure: plt.Figure | None, axes: VAT, **kwargs
) -> PlotVisualization:
if len(self.grid) == 2 and self.plot_surface:
lines = self._prepare_surface_visualization(axes[0, 0])
else:
lines = self._prepare_multidim_visualization(axes[0, 0])
line_array = np.empty_like(axes, dtype=np.object_)
line_array[0, 0] = lines
return PlotVisualization(figure, axes, line_array)
def _update_visualization(self, visualization: PlotVisualization, **kwargs) -> None:
# If the grid contains no data, dont plot anything
if len(self.grid) < 1:
return self._plot_empty(visualization)
# Shuffle grid and respective scalar results so that the selected base dimension is always the first entry
grid = list(self.grid).copy()
grid.insert(0, grid.pop(self.__base_dimension_index))
scalars = np.moveaxis(self.__scalar_results, self.__base_dimension_index, 0)
if len(grid) == 2 and self.plot_surface:
return self._update_surface_visualization(scalars, visualization)
# Multiple dimensions, resort to legend-based multiplots
else:
self._update_multidim_visualization(scalars, visualization)
[docs]
def to_array(self) -> np.ndarray:
return self.__scalar_results
[docs]
@classmethod
def From_Artifacts(
cls: Type[SERT],
grid: Sequence[GridDimension],
artifacts: np.ndarray,
evaluator: Evaluator,
plot_surface: bool = True,
) -> SERT:
"""Generate a scalar evaluation result from a set of artifacts.
Args:
grid (Sequence[GridDimension]):
The simulation grid.
artifacts (np.ndarray):
Numpy object array whose dimensions represent grid dimensions.
evaluator (Evaluator):
The evaluator generating the artifacts.
plot_surface (bool):
Whether to plot the result as a surface plot.
Returns:
The scalar evaluation result.
"""
scalar_results = np.empty(artifacts.shape, dtype=float)
for section_coords in np.ndindex(artifacts.shape):
scalar_results[section_coords] = np.mean(
[artifact.to_scalar() for artifact in artifacts[section_coords]]
)
return cls(grid, scalar_results, evaluator, plot_surface)
[docs]
class Evaluator(ABC):
"""Evaluation routine for investigated object states, extracting performance indicators of interest.
Evaluators represent the process of extracting arbitrary performance indicator samples :math:`X_m` in the form of
:class:`.Artifact` instances from investigated object states.
Once a :class:`.MonteCarloActor` has set its investigated object to a new random state,
it calls the :func:`.evaluate` routines of all configured evaluators,
collecting the resulting respective :class:`.Artifact` instances.
For a given set of :class:`.Artifact` instances,
evaluators are expected to report a :meth:`.confidence_level` which may result in a premature abortion of the
sample collection routine for a single :class:`.GridSection`.
By default, the routine suggested by :footcite:t:`2014:bayer` is applied:
Considering a tolerance :math:`\\mathrm{TOL} \\in \\mathbb{R}_{++}` the confidence in the mean performance indicator
.. math::
\\bar{X}_M = \\frac{1}{M} \\sum_{m = 1}^{M} X_m
is considered sufficient if a threshold :math:`\\delta \\in \\mathbb{R}_{++}`, defined as
.. math::
\\mathrm{P}\\left(\\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\| > \\mathrm{TOL} \\right) \\leq \\delta
has been crossed.
The number of collected actually collected artifacts per :class:`.GridSection` :math:`M \\in [M_{\\mathrm{min}}, M_{\\mathrm{max}}]`
is between a minimum number of required samples :math:`M_{\\mathrm{min}} \\in \\mathbb{R}_{+}` and an upper limit of
:math:`M_{\\mathrm{max}} \\in \\mathbb{R}_{++}`.
"""
tick_format: ValueType
__confidence: float
__tolerance: float
__plot_scale: str # Plot axis scaling
def __init__(self) -> None:
self.confidence = 1.0
self.tolerance = 0.0
self.plot_scale = "linear"
self.tick_format = ValueType.LIN
[docs]
@abstractmethod
def evaluate(self) -> Evaluation:
"""Evaluate the state of an investigated object.
Implements the process of extracting an arbitrary performance indicator, represented by
the returned :class:`.Artifact` :math:`X_m`.
Returns: Artifact :math:`X_m` resulting from the evaluation.
"""
... # pragma: no cover
@property
@abstractmethod
def abbreviation(self) -> str:
"""Short string representation of this evaluator.
Used as a label for console output and plot axes annotations.
"""
... # pragma: no cover
@property
@abstractmethod
def title(self) -> str:
"""Long string representation of this evaluator.
Used as plot title.
"""
... # pragma: no cover
@property
def confidence(self) -> float:
"""Confidence threshold required for premature simulation abortion.
The confidence threshold :math:`\\delta \\in [0, 1]` is the upper bound to the
confidence level
.. math::
\\mathrm{P}\\left(\\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\| > \\mathrm{TOL} \\right)
at which the sample collection for a single :class:`.GridSection` may be prematurely aborted :footcite:p:`2014:bayer`.
Raises:
ValueError: If confidence is lower than zero or greater than one.
"""
return self.__confidence
@confidence.setter
def confidence(self, value: float) -> None:
if value < 0.0 or value > 1.0:
raise ValueError("Confidence level must be in the interval between zero and one")
self.__confidence = value
@property
def tolerance(self) -> float:
"""Tolerance level required for premature simulation abortion.
The tolerance :math:`\\mathrm{TOL} \\in \\mathbb{R}_{++}` is the upper bound to the interval
.. math::
\\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\|
by which the performance indicator estimation :math:`\\bar{X}_M` may diverge from the actual expected
value :math:`\\mathrm{E}\\left[ X \\right]`.
Returns:
float: Non-negative tolerance :math:`\\mathrm{TOL}`.
Raises:
ValueError: If tolerance is negative.
"""
return self.__tolerance
@tolerance.setter
def tolerance(self, value: float) -> None:
if value < 0.0:
raise ValueError("Tolerance must be greater or equal to zero")
self.__tolerance = value
def __str__(self) -> str:
"""String object representation.
Returns:
str: String representation.
"""
return self.abbreviation
@staticmethod
def _scalar_cdf(scalar: float) -> float:
"""Assumed cumulative probability of the scalar representation.
Args:
scalar (float):
The scalar value.
Returns:
float: Cumulative probability between zero and one.
"""
return norm.cdf(scalar)
@property
def plot_scale(self) -> str:
"""Scale of the scalar evaluation plot.
Refer to the `Matplotlib`_ documentation for a list of a accepted values.
Returns:
str: The scale identifier string.
.. _Matplotlib: https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.set_yscale.html
"""
return self.__plot_scale
@plot_scale.setter
def plot_scale(self, value: str) -> None:
self.__plot_scale = value
[docs]
@abstractmethod
def generate_result(
self, grid: Sequence[GridDimension], artifacts: np.ndarray
) -> EvaluationResult:
"""Generates an evaluation result from the artifacts collected over the whole simulation grid.
Args:
grid (Sequence[GridDimension]):
The Simulation grid.
artifacts (np.ndarray):
Numpy object array whose dimensions represent grid dimensions.
Returns:
The evaluation result.
"""
... # pragma: no cover
[docs]
class MonteCarloSample:
"""Single sample of a Monte Carlo simulation."""
__sample_index: int # Index of the sample
# Grid section from which the sample was generated
__grid_section: Tuple[int, ...]
__artifacts: Sequence[Artifact] # Artifacts of evaluation
def __init__(
self, grid_section: Tuple[int, ...], sample_index: int, artifacts: Sequence[Artifact]
) -> None:
"""
Args:
grid_section (Tuple[int, ...]):
Grid section from which the sample was generated.
sample_index (int):
Index of the sample.
In other words this object represents the `sample_index`th sample of the selected `grid_section`.
artifacts (Sequence[Artifact]):
Artifacts of evaluation
"""
self.__grid_section = grid_section
self.__sample_index = sample_index
self.__artifacts = artifacts
@property
def grid_section(self) -> Tuple[int, ...]:
"""Grid section from which this sample was generated.
Returns:
Tuple[int, ...]: Tuple of grid section indices.
"""
return self.__grid_section
@property
def sample_index(self) -> int:
"""Index of the sample this object represents.
Returns:
int: Sample index.
"""
return self.__sample_index
@property
def artifacts(self) -> Sequence[Artifact]:
"""Artifacts resulting from the sample's evaluations.
Returns:
List[Artifact]: List of artifacts.
"""
return self.__artifacts
@property
def num_artifacts(self) -> int:
"""Number of contained artifact objects.
Returns:
int: Number of artifacts.
"""
return len(self.__artifacts)
@property
def artifact_scalars(self) -> np.ndarray:
"""Collect scalar artifact representations.
Returns:
np.ndarray: Vector of scalar artifact representations.
"""
return np.array([artifact.to_scalar() for artifact in self.artifacts], dtype=float)
[docs]
class GridSection(object):
# Berry-Esseen constants ToDo: Check the proper selection here
__C_0: float = 0.4785
__C_1: float = 30.2338
# Section indices within the simulation grid
__coordinates: Tuple[int, ...]
__samples: List[MonteCarloSample] # Set of generated samples
# Confidence level for each evaluator
__evaluator_confidences: np.ndarray
def __init__(self, coordinates: Tuple[int, ...], num_evaluators: int) -> None:
"""
Args:
coordinates (Tuple[int, ...]):
Section indices within the simulation grid.
"""
self.__coordinates = coordinates
self.__samples = list()
self.__num_evaluators = num_evaluators
self.__evaluator_confidences = np.zeros(num_evaluators, dtype=float)
self.__means = np.zeros(num_evaluators, dtype=float)
self.__second_moments = np.zeros(num_evaluators, dtype=float)
self.__third_moments = np.zeros(num_evaluators, dtype=float)
self.__third_moments_abs = np.zeros(num_evaluators, dtype=float)
self.__fourth_moments = np.zeros(num_evaluators, dtype=float)
@property
def coordinates(self) -> Tuple[int, ...]:
"""Grid section coordinates within the simulation grid.
Returns:
Tuple[int, ...]: Section coordinates.
"""
return self.__coordinates
@property
def num_samples(self) -> int:
"""Number of already generated samples within this section.
Returns:
int: Number of samples.
"""
return len(self.__samples)
@property
def samples(self) -> List[MonteCarloSample]:
"""The collected evaluation samples within this grid section.
Returns: List of samples.
"""
return self.__samples
[docs]
def add_samples(
self,
samples: MonteCarloSample | Sequence[MonteCarloSample],
evaluators: Sequence[Evaluator],
) -> None:
"""Add a new sample to this grid section collection.
Args:
samples (Union[MonteCarloSample, Sequence[MonteCarloSample])):
Samples to be added to this section.
evaluators (Sequence[Evaluator]):
References to the evaluators generating the sample artifacts.
Raises:
ValueError: If the number of artifacts in `sample` does not match the initialization.
"""
_samples = samples if isinstance(samples, Sequence) else [samples]
for sample in _samples:
# Make sure the provided number of artifacts is correct
if sample.num_artifacts != self.__num_evaluators:
raise ValueError(
f"Number of sample artifacts ({sample.num_artifacts}) does not match the "
f"configured number of evaluators ({self.__num_evaluators})"
)
# Update confidences
self.__update_confidences(sample.artifact_scalars, evaluators)
# Append sample to the stored list
self.__samples.append(sample)
@property
def confidences(self) -> np.ndarray:
"""Confidence in the estimated evaluations.
Returns: Array indicating probabilities for each evaluator
"""
return self.__evaluator_confidences
[docs]
def confidence_status(self, evaluators: Sequence[Evaluator]) -> bool:
"""Check if each evaluator has reached its required confidence thershold.
Conidence indicates that the simulation for the parameter combination this grid section represents
may be aborted, i.e. no more samples are required.
Args:
evaluators (Sequence[Evaluator]):
Evaluators giving feedback about their cofidence status.
Returns: Confidence indicator.
"""
for evaluator, confidence in zip(evaluators, self.__evaluator_confidences):
if confidence < evaluator.confidence:
return False
return True
def __update_confidences(
self, artifact_scalars: np.ndarray, evaluators: Sequence[Evaluator]
) -> None:
"""Update the confidence estimates for this grid section.
Args:
artifact_scalars (np.ndarray):
Scalar representations of the artifacts generated by the sample.
evaluators (Sequence[Evaluator]):
Evaluators generating the artifacts.
Raises:
ValueError: If the artifact scalars argument is not a vector.
ValueError: If the number of artifact scalars does not match the number of configured evaluators.
"""
# Raise a value error if the scalars argument is not a vector
if artifact_scalars.ndim != 1:
raise ValueError("Artifact scalars must be a vector (on-dimensional array)")
if len(artifact_scalars) != self.__num_evaluators:
raise ValueError(
"Number of artifact scalars does not match the number of configured evaluators"
)
for e, (
scalar,
evaluator,
mean,
second_moment,
third_moment,
third_moment_abs,
fourth_moment,
) in enumerate(
zip(
artifact_scalars,
evaluators,
self.__means,
self.__second_moments,
self.__third_moments,
self.__third_moments_abs,
self.__fourth_moments,
)
):
# Return zero if the evaluator tolerance is set to zero
if evaluator.tolerance == 0.0:
self.__evaluator_confidences[e] = 0.0
continue
n = self.num_samples # Current number of samples
nn = 1 + self.num_samples # New number of samples
# Update the moment estimates
delta = scalar - mean
updated_mean = (n * mean) / nn
updated_second_moment = second_moment + delta**2 * (n) / nn
updated_third_moment = (
third_moment + delta**3 * (n**2 - n) / nn**2 - 3 * second_moment * delta / nn
)
updated_third_moment_abs = third_moment_abs + abs(
delta**3 * (n**2 - n) / nn**2 - 3 * second_moment * delta / nn
)
updated_fourth_moment = (
fourth_moment
+ delta**4 * (n**3 - n**2 + n) / nn**3
+ 6 * delta**2 * second_moment / nn**2
- 4 * delta * third_moment / nn
)
# Store the updated moment estimates for the next update
self.__means[e] = updated_mean
self.__second_moments[e] = updated_second_moment
self.__third_moments[e] = updated_third_moment
self.__third_moments_abs[e] = updated_third_moment_abs
self.__fourth_moments[e] = updated_fourth_moment
# Abort if the second moment indicates no variance or not enough samples have been collected
if updated_second_moment == 0.0 or nn < 2:
self.__evaluator_confidences[e] = 1.0
continue
# Compute moments (equation 4.1)
deviance = sqrt(updated_second_moment / nn)
beta_bar_moment = third_moment / (nn * deviance**3)
beta_hat_moment = third_moment_abs / (nn * deviance**3)
kappa_moment = fourth_moment / (nn * deviance**4) - 3
# Estimate the confidence
sample_sqrt = sqrt(nn)
sigma_tolerance = sample_sqrt * evaluator.tolerance / deviance
sigma_tolerance_squared = sigma_tolerance**2
kappa_term = 4 * (2 / (nn - 1) + kappa_moment / nn)
confidence_bound = 2 * (1 - evaluator._scalar_cdf(sigma_tolerance)) + 2 * min(
self.__C_0, self.__C_1 / (1 + abs(sigma_tolerance)) ** 3
) * beta_bar_moment / sample_sqrt * min(1.0, kappa_term)
# This is necessary to preventa math overflow from the exponential denominator
if kappa_term < 1.0 and sigma_tolerance_squared < 1418.0:
confidence_bound += (
(1 - kappa_term)
* abs(sigma_tolerance_squared - 1)
* abs(beta_hat_moment)
/ (exp(0.5 * sigma_tolerance_squared) * 3 * sqrt(2 * pi * n) * deviance**3)
)
# Store the current confidence estimate
self.__evaluator_confidences[e] = 1.0 - min(1.0, confidence_bound)
[docs]
class ActorRunResult(object):
"""Result object returned by generating a single sample from a simulation runner."""
samples: List[MonteCarloSample] | None
message: str | None
def __init__(
self, samples: List[MonteCarloSample] | None = None, message: str | None = None
) -> None:
"""
Args:
samples (List[MonteCarloSample], optional):
Samples generated by the remote actor run.
message (str, optional):
Message return from the remote actor run.
"""
self.samples = [] if not samples else samples
self.message = message
[docs]
class SampleGrid(object):
"""Grid of simulation samples."""
__sections: np.ndarray
def __init__(
self, grid_configuration: List[GridDimension], evaluators: Sequence[Evaluator]
) -> None:
"""
Args:
grid_configuration (List[GridDimension]):
The simulation grid configuration.
evaluators (Sequence[Evaluator]):
The evaluators generating the artifacts.
"""
self.__sections = np.empty(
[dimension.num_sample_points for dimension in grid_configuration], dtype=np.object_
)
num_evaluators = len(evaluators)
for coordinates in np.ndindex(
*[dimension.num_sample_points for dimension in grid_configuration]
):
coordinate_tuple = tuple(coordinates)
self.__sections[coordinate_tuple] = GridSection(coordinate_tuple, num_evaluators)
def __getitem__(self, coordinates: Tuple[int, ...]) -> GridSection:
return self.__sections[coordinates]
def __iter__(self):
"""Iterating over the sample grid is equivalent to iterating over the sections tree"""
return iter(self.__sections.flat)
[docs]
class UnmatchableException(Exception):
"""An exception that can never get caught."""
...
[docs]
class MonteCarloActor(Generic[MO]):
"""Monte Carlo Simulation Actor.
Actors are essentially workers running in a private process executing simulation tasks.
The result of each individual simulation task is a simulation sample.
"""
catch_exceptions: bool # Catch exceptions during run.
__investigated_object: MO # Copy of the object to be investigated
__grid: Sequence[GridDimension] # Simulation grid dimensions
__evaluators: Sequence[
Evaluator
] # Evaluators used to process the investigated object sample state
__stage_arguments: Mapping[str, Sequence[tuple]]
def __init__(
self,
argument_tuple: Tuple[MO, Sequence[GridDimension], Sequence[Evaluator]],
index: int,
stage_arguments: Mapping[str, Sequence[tuple]] | None = None,
catch_exceptions: bool = True,
) -> None:
"""
Args:
argument_tuple:
Object to be investigated during the simulation runtime.
Dimensions over which the simulation will iterate.
Evaluators used to process the investigated object sample state.
index (int):
Global index of the actor.
stage_arguments (Mapping[str, Sequence[Tuple]], optional):
Arguments for the simulation stages.
catch_exceptions (bool, optional):
Catch exceptions during run.
Enabled by default.
"""
# Assert that stage arguments are valid
_stage_arguments = dict() if stage_arguments is None else stage_arguments
for stage_key in _stage_arguments:
if stage_key not in self.stage_identifiers():
raise ValueError(f"Invalid stage identifier in stage arguments {stage_key}")
investigated_object = argument_tuple[0]
grid = argument_tuple[1]
evaluators = argument_tuple[2]
self.index = index
self.catch_exceptions = catch_exceptions
self.__investigated_object = investigated_object
self.__grid = grid
self.__evaluators = evaluators
self.__stage_identifiers = self.stage_identifiers()
self.__stage_executors = self.stage_executors()
self.__num_stages = len(self.__stage_executors)
self.__stage_arguments = _stage_arguments
@property
def _investigated_object(self) -> MO:
"""State of the Investigated Object.
Returns:
Mo: Investigated object.
"""
return self.__investigated_object # pragma: no cover
def __execute_stages(self, start: int, stop: int, artifacts: list[list[Artifact]]) -> None:
"""Recursive subroutine of run, collecting artifacts from the simulation stage parametrizations.
Args:
start (int): Index of the first stage to be executed
stop (int): Index of the last stage to be executed
"""
# Abort and collect artifacts if the end of the stage list is reached
if start > stop:
artifacts.append([evaluator.evaluate().artifact() for evaluator in self.__evaluators])
return
# Execute the next stage
stage_identifier = self.__stage_identifiers[start]
stage_executor = self.__stage_executors[start]
stage_arguments = self.__stage_arguments.get(stage_identifier, [tuple()])
for arguments in stage_arguments:
# Execute the stage with the provided arguments
stage_executor(*arguments)
# Proceed to the next stage
self.__execute_stages(start + 1, stop, artifacts)
[docs]
def run(self, program: List[Tuple[int, ...]]) -> ActorRunResult:
"""Run the simulation actor.
Args:
program (List[Tuple[int, ...]]):
A list of simulation grid section indices for which to collect samples.
Returns:
A list of generated :class:`MonteCarloSample`s.
Contains the same number of entries as `program`.
"""
result = ActorRunResult()
# Catch any exception during actor running
try:
# Intially, re-configure the full grid
recent_section_indices = np.array(program[0], dtype=int)
for d, i in enumerate(recent_section_indices):
self.__grid[d].configure_point(i)
# Run through the program steps
for section_indices in program:
# Detect the grid dimensions where sections changed, i.e. which require re-configuration
section_index_array = np.asarray(section_indices, dtype=int)
reconfigured_dimensions = np.argwhere(
section_index_array != recent_section_indices
).flatten()
# Reconfigure the dimensions
# Not that for the first grid_section this has already been done
for d in reconfigured_dimensions:
self.__grid[d].configure_point(section_index_array[d])
# Detect the first and last impacted simulation stage depending on the reconfigured dimensions
first_impact = self.__num_stages
last_impact = 0
for d in reconfigured_dimensions:
grid_dimension = self.__grid[d]
if grid_dimension.first_impact is None:
first_impact = 0
elif grid_dimension.first_impact in self.__stage_identifiers:
first_impact = min(
first_impact,
self.__stage_identifiers.index(grid_dimension.first_impact),
)
if grid_dimension.last_impact is None:
last_impact = self.__num_stages - 1
elif grid_dimension.last_impact in self.__stage_identifiers:
last_impact = max(
last_impact, self.__stage_identifiers.index(grid_dimension.last_impact)
)
if first_impact >= self.__num_stages:
first_impact = 0
if last_impact <= 0:
last_impact = self.__num_stages - 1
artifacts: list[list[Artifact]] = []
self.__execute_stages(first_impact, last_impact, artifacts)
result.samples.extend(
MonteCarloSample(section_indices, a, artifact)
for a, artifact in enumerate(artifacts)
)
# Update the recent section for the next iteration
recent_section_indices = section_index_array
except Exception if self.catch_exceptions else UnmatchableException as e:
result.message = str(e)
return result
[docs]
@staticmethod
@abstractmethod
def stage_identifiers() -> List[str]:
"""List of simulation stage identifiers.
Simulations stages will be executed in the order specified here.
Returns:
List of function identifiers for simulation stage execution routines.
"""
... # pragma: no cover
[docs]
@abstractmethod
def stage_executors(self) -> List[Callable]:
"""List of simulation stage execution callbacks.
Simulations stages will be executed in the order specified here.
Returns:
List of function callbacks for simulation stage execution routines.
"""
... # pragma: no cover
[docs]
class MonteCarloResult(object):
"""Result of a Monte Carlo simulation."""
__grid: Sequence[GridDimension]
__evaluators: Sequence[Evaluator]
__performance_time: float # Absolute time required to compute the simulation
__results: Sequence[EvaluationResult]
def __init__(
self,
grid: Sequence[GridDimension],
evaluators: Sequence[Evaluator],
sample_grid: SampleGrid,
performance_time: float,
) -> None:
"""
Args:
grid (Sequence[GridDimension]):
Dimensions over which the simulation has swept.
evaluators (Sequence[Evaluator]):
Evaluators used to evaluated the simulation artifacts.
sample_grid (SampleGrid):
Grid containing evaluation artifacts collected over the grid iterations.
performance_time (float):
Time required to compute the simulation.
Raises:
ValueError:
If the dimensions of `samples` do not match the supplied sweeping dimensions and evaluators.
"""
self.__grid = grid
self.__evaluators = evaluators
self.__performance_time = performance_time
self.__results = []
for evaluator_idx, evaluator in enumerate(evaluators):
# Collect artifacts for the respective evaluator
evaluator_artifacts = np.empty(tuple([d.num_sample_points for d in grid]), dtype=object)
section: GridSection
for section in sample_grid:
evaluator_artifacts[section.coordinates] = [
sample.artifacts[evaluator_idx] for sample in section.samples
]
self.__results.append(evaluator.generate_result(grid, evaluator_artifacts))
@property
def performance_time(self) -> float:
"""Simulation runtime.
Returns:
Simulation runtime in seconds.
"""
return self.__performance_time
[docs]
def plot(self) -> List[PlotVisualization]:
"""Plot evaluation figures for all contained evaluator artifacts.
Returns: Container of all generated plots.
"""
return [result.visualize() for result in self.__results]
[docs]
def save_to_matlab(self, file: str) -> None:
"""Save simulation results to a matlab file.
Args:
file (str):
File location to which the results should be saved.
"""
mat_dict = {
"dimensions": [d.title for d in self.__grid],
"evaluators": [evaluator.abbreviation for evaluator in self.__evaluators],
"performance_time": self.__performance_time,
}
# Append evaluation array representions
for r, result in enumerate(self.__results):
mat_dict[f"result_{r}"] = result.to_array()
# Append evaluation array representions
for r, result in enumerate(self.__results):
mat_dict[f"result_{r}"] = result.to_array()
for dimension in self.__grid:
mat_dict[dimension.title] = dimension.sample_points
# Save results in matlab file
savemat(file, mat_dict)
[docs]
def print(self, console: Console | None = None) -> None:
"""Print a text representation of the simulation result.
Args:
console (Console, optional):
Rich console to print to.
If not provided, a new one will be initialized.
"""
_console = Console() if console is None else console
for result in self.evaluation_results:
result.print(_console)
@property
def evaluation_results(self) -> Sequence[EvaluationResult]:
"""Access individual evaluation results.
Returns: List of evaluation results.
"""
return self.__results
[docs]
class SamplePoint(object):
"""Sample point of a single grid dimension.
A single :class:`.GridDimension` holds a sequence of sample points
accesible by the :attr:`sample_points<.GridDimension.sample_points>` property.
During simulation runtime, the simulation will dynamically reconfigure
the scenario selecting a single sample point out of each :class:`.GridDimension`
per generated simulation sample.
"""
__value: Any
__title: str | None
def __init__(self, value: Any, title: str | None = None) -> None:
"""
Args:
value (Any):
Sample point value with which to configure the grid dimension.
title (str, optional):
String representation of this sample point.
If not specified, the simulation will attempt to infer an adequate representation.
"""
self.__value = value
self.__title = title
@property
def value(self) -> Any:
"""Sample point value with which to configure the grid dimension"""
return self.__value
@property
def title(self) -> str:
"""String representation of this sample point"""
if self.__title is not None:
return self.__title
if type(self.__value).__str__ is not object.__str__:
return str(self.__value)
if isinstance(self.__value, float):
return f"{self.__value:.2g}"
if isinstance(self.__value, int):
return f"{self.__value}"
# Return the values class name if its not representable otherwise
return self.__value.__class__.__name__
[docs]
class ScalarDimension(ABC):
"""Base class for objects that can be configured by scalar values.
When a property of type :class:`ScalarDimension` is defined as a simulation parameter :class:`GridDimension`,
the simulation will automatically configure the object with the scalar value of the sample point
during simulation runtime.
The configuration operation is represented by the lshift operator `<<`.
"""
@abstractmethod
def __lshift__(self, scalar: float) -> None:
"""Configure the object with a scalar value.
Args:
scalar (float): Scalar value to configure the object with.
"""
... # pragma: no cover
@property
@abstractmethod
def title(self) -> str:
"""Title of the scalar dimension.
Displayed in plots and tables during simulation runtime.
"""
... # pragma: no cover
[docs]
class GridDimension(object):
"""Single axis within the simulation grid.
A grid dimension represents a single simulation parameter that is to
be varied during simulation runtime to observe its effects on the evaluated
performance indicators.
The values the represented parameter is configured to are
:class:`SamplePoints<.SamplePoint>`.
"""
__considered_objects: Tuple[Any, ...]
__dimension: str
__sample_points: List[SamplePoint]
__title: str | None
__setter_lambdas: Tuple[Callable, ...]
__plot_scale: str
__tick_format: ValueType
__first_impact: str | None
__last_impact: str | None
def __init__(
self,
considered_objects: Any | Tuple[Any, ...],
dimension: str,
sample_points: Sequence[SamplePoint | Any],
title: str | None = None,
plot_scale: str | None = None,
tick_format: Optional[ValueType] = None,
) -> None:
"""
Args:
considered_objects (Union[Any, Tuple[Any, ...]]):
The considered objects of this grid section.
dimension (str):
Path to the attribute.
sample_points (List[Any]):
Sections the grid is sampled at.
title (str, optional):
Title of this dimension.
If not specified, the attribute string is assumed.
plot_scale (str, optional):
Scale of the axis within plots.
tick_format (ValueType, optional):
Format of the tick labels.
Linear by default.
Raises:
ValueError:
If the selected `dimension` does not exist within the `considered_object`.
"""
_considered_objects = (
considered_objects if isinstance(considered_objects, tuple) else (considered_objects,)
)
self.__considered_objects = tuple()
property_path = dimension.split(".")
object_path = property_path[:-1]
property_name = property_path[-1]
# Infer plot scale of the x-axis
if plot_scale is None:
if isinstance(sample_points, LogarithmicSequence):
self.plot_scale = "log"
else:
self.plot_scale = "linear"
else:
self.plot_scale = plot_scale
# Infer tick format of the x-axis
if tick_format is None:
if isinstance(sample_points, LogarithmicSequence):
self.tick_format = ValueType.DB
else:
self.tick_format = ValueType.LIN
else:
self.tick_format = tick_format
self.__setter_lambdas = tuple()
self.__dimension = dimension
self.__sample_points = [
s if isinstance(s, SamplePoint) else SamplePoint(s) for s in sample_points
]
self.__title = title
self.__first_impact = None
self.__last_impact = None
for considered_object in _considered_objects:
# Make sure the dimension exists
try:
dimension_mother_object = reduce(
lambda obj, attr: getattr(obj, attr), object_path, considered_object
)
dimension_registration: RegisteredDimension = getattr(
type(dimension_mother_object), property_name
)
dimension_value = getattr(dimension_mother_object, property_name)
except AttributeError:
raise ValueError(
"Dimension '" + dimension + "' does not exist within the investigated object"
)
if len(self.__sample_points) < 1:
raise ValueError("A simulation grid dimension must have at least one sample point")
# Update impacts if the dimension is registered as a PyMonte simulation dimension
if RegisteredDimension.is_registered(dimension_registration):
first_impact = dimension_registration.first_impact
last_impact = dimension_registration.last_impact
if self.__first_impact and first_impact != self.__first_impact:
raise ValueError(
"Diverging impacts on multi-object grid dimension initialization"
)
if self.__last_impact and last_impact != self.__last_impact:
raise ValueError(
"Diverging impacts on multi-object grid dimension initialization"
)
self.__first_impact = first_impact
self.__last_impact = last_impact
# Updated the depicted title if the dimension offers an option and it wasn't exactly specified
if title is None and dimension_registration.title is not None:
self.__title = dimension_registration.title
self.__considered_objects += (considered_object,)
# If the dimension value is a scalar dimension, we can directly use the lshift operator to configure
# the object with the sample point values, given that the sample points are scalars as well
if isinstance(dimension_value, ScalarDimension) and np.all(
np.vectorize(np.isscalar)(sample_points)
):
self.__setter_lambdas += (dimension_value.__lshift__,)
self.__title = dimension_value.title
# Otherwise, the dimension value is a regular attribute and we need to create a setter lambda
else:
self.__setter_lambdas += (
self.__create_setter_lambda(considered_object, dimension),
)
@property
def considered_objects(self) -> Tuple[Any, ...]:
"""Considered objects of this grid section."""
return self.__considered_objects
@property
def dimension(self) -> str:
"""Dimension property name."""
return self.__dimension
@property
def sample_points(self) -> List[SamplePoint]:
"""Points at which this grid dimension is sampled."""
return self.__sample_points
@property
def num_sample_points(self) -> int:
"""Number of dimension sample points.
Returns:
int:
Number of sample points.
"""
return len(self.__sample_points)
@property
def first_impact(self) -> str | None:
"""Index of the first impacted simulation pipeline stage.
Returns:
Pipeline stage index.
`None`, if the stage is unknown.
"""
return self.__first_impact
@property
def last_impact(self) -> str | None:
"""Index of the last impacted simulation pipeline stage.
Returns:
Pipeline stage index.
`None`, if the stage is unknown.
"""
return self.__last_impact
@property
def title(self) -> str:
"""Title of the dimension.
Returns:
The title string.
"""
return self.__dimension if self.__title is None else self.__title
@title.setter
def title(self, value: str) -> None:
if value is None or len(value) == 0:
self.__title = None
else:
self.__title = value
@property
def plot_scale(self) -> str:
"""Scale of the scalar evaluation plot.
Refer to the `Matplotlib`_ documentation for a list of a accepted values.
Returns:
str: The scale identifier string.
.. _Matplotlib: https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.set_yscale.html
"""
return self.__plot_scale
@plot_scale.setter
def plot_scale(self, value: str) -> None:
self.__plot_scale = value
@property
def tick_format(self) -> ValueType:
"""Axis tick format of the scalar evaluation plot.
Returns: The tick format.
"""
return self.__tick_format
@tick_format.setter
def tick_format(self, value: ValueType) -> None:
self.__tick_format = value
@staticmethod
def __create_setter_lambda(considered_object: Any, dimension: str) -> Callable:
"""Generate a setter lambda callback for a selected grid dimension.
Args:
considered_object (Any):
The considered object root.
dimension (str):
String representation of dimension location relative to the investigated object.
Returns:
Callable: The setter lambda.
"""
stages = dimension.split(".")
object_reference = reduce(
lambda obj, attr: getattr(obj, attr), stages[:-1], considered_object
)
# Return a lambda to the function if the reference is callable
function_reference = getattr(object_reference, stages[-1])
if callable(function_reference):
return lambda args: function_reference(args)
# Return a setting lambda if the reference is not callable
# Implicitly we assume that every non-callable reference is an attribute
return lambda args: setattr(object_reference, stages[-1], args)
[docs]
class RegisteredDimension(property):
"""Register a class property getter as a PyMonte simulation dimension.
Registered properties may specify their simulation stage impacts and therefore significantly
increase simulation runtime in cases where computationally demanding section re-calculations
can be reduced.
"""
__first_impact: str | None
__last_impact: str | None
__title: str | None
def __init__(
self,
_property: property,
first_impact: str | None = None,
last_impact: str | None = None,
title: str | None = None,
) -> None:
"""
Args:
first_impact (str, optional):
Name of the first simulation stage within the simulation pipeline
which is impacted by manipulating this property.
If not specified, the initial stage is assumed.
last_impact (str, optional):
Name of the last simulation stage within the simulation pipeline
which is impacted by manipulating this property.
If not specified, the final stage is assumed.
title (str, optional):
Displayed title of the dimension.
If not specified, the dimension's name will be assumed.
"""
self.__first_impact = first_impact
self.__last_impact = last_impact
self.__title = title
property.__init__(
self,
getattr(_property, "fget", None),
getattr(_property, "fset", None),
getattr(_property, "fdel", None),
getattr(_property, "doc", None),
)
[docs]
@classmethod
def is_registered(cls, object: Any) -> bool:
"""Check if any object is a registered PyMonte simulation dimension.
Args:
object (Any):
The object in question.
Returns:
A boolean indicator.
"""
return isinstance(object, cls)
@property
def first_impact(self) -> str | None:
"""First impact of the dimension within the simulation pipeline."""
return self.__first_impact
@property
def last_impact(self) -> str | None:
"""Last impact of the dimension within the simulation pipeline."""
return self.__last_impact
@property
def title(self) -> str | None:
"""Displayed title of the dimension."""
return self.__title
[docs]
def getter(self, fget: Callable[[Any], Any]) -> RegisteredDimension:
updated_property = property.getter(self, fget)
return RegisteredDimension(
updated_property, self.first_impact, self.last_impact, self.title
)
[docs]
def setter(self, fset: Callable[[Any, Any], None]) -> RegisteredDimension:
updated_property = property(self.fget, fset, self.fdel, self.__doc__)
return RegisteredDimension(
updated_property, self.first_impact, self.last_impact, self.title
)
[docs]
def deleter(self, fdel: Callable[[Any], None]) -> RegisteredDimension:
updated_property = property.deleter(self, fdel)
return RegisteredDimension(
updated_property, self.first_impact, self.last_impact, self.title
)
[docs]
def register(*args, **kwargs) -> Callable[[Any], RegisteredDimension]:
"""Shorthand to register a property as a MonteCarlo dimension.
Args:
_property (property): The property to be registered.
"""
return lambda _property: RegisteredDimension(_property, *args, **kwargs)
[docs]
class MonteCarlo(Generic[MO]):
"""Grid of parameters over which to iterate the simulation."""
# Interval between result logs in seconds
__progress_log_interval: float
# Maximum number of samples per grid element
__num_samples: int
# Minimum number of samples per grid element
__min_num_samples: int
# Number of dedicated actors spawned during simulation
__num_actors: int
__investigated_object: MO # The object to be investigated
# Simulation grid dimensions which make up the grid
__dimensions: List[GridDimension]
# Evaluators used to process the investigated object sample state
__evaluators: List[Evaluator]
__console: Console # Console the simulation writes to
# Printing behaviour of the simulation during runtime
__console_mode: ConsoleMode
# Number of samples per section block
__section_block_size: int | None
# Number of CPUs reserved for a single actor
__cpus_per_actor: int
runtime_env: bool
# Catch exceptions occuring during simulation runtime
catch_exceptions: bool
def __init__(
self,
investigated_object: MO,
num_samples: int,
evaluators: Sequence[Evaluator] | None = None,
min_num_samples: int = -1,
num_actors: int | None = None,
console: Console | None = None,
console_mode: ConsoleMode = ConsoleMode.INTERACTIVE,
section_block_size: Optional[int] = None,
ray_address: str | None = None,
cpus_per_actor: int = 1,
runtime_env: bool = False,
catch_exceptions: bool = True,
progress_log_interval: float = 1.0,
) -> None:
"""
Args:
investigated_object (MO):
Object to be investigated during the simulation runtime.
num_samples (int):
Number of generated samples per grid element.
evaluators (Set[MonteCarloEvaluators[MO]]):
Evaluators used to process the investigated object sample state.
min_num_samples (int, optional):
Minimum number of generated samples per grid element.
num_actors (int, optional):
Number of dedicated actors spawned during simulation.
By default, the number of actors will be the number of available CPU cores.
console (Console, optional):
Console the simulation writes to.
console_mode (ConsoleMode, optional):
The printing behaviour of the simulation during runtime.
section_block_size (int, optional):
Number of samples per section block.
By default, the size of the simulation grid is selected.
ray_address (str, optional):
The address of the ray head node.
If None is provided, the head node will be launched in this machine.
cpus_per_actor (int, optional):
Number of CPU cores reserved per actor.
One by default.
runtime_env (bool, optional):
Create a virtual environment on each host.
Disabled by default.
catch_exceptions (bool, optional):
Catch exceptions occuring during simulation runtime.
Enabled by default.
progress_log_interval (float, optional):
Interval between result logs in seconds.
1 second by default.
"""
self.runtime_env = runtime_env
# Initialize ray if it hasn't been initialized yet. Required to query ideal number of actors
if not ray.is_initialized():
runtime_env_info = {"py_modules": self._py_modules(), "pip": self._pip_packages()}
with catch_warnings():
simplefilter("ignore")
ray.init(
address=ray_address,
runtime_env=runtime_env_info if self.runtime_env else None,
logging_level=logging.ERROR,
)
self.__dimensions = []
self.__investigated_object = investigated_object
self.__evaluators = [] if evaluators is None else list(evaluators)
self.num_samples = num_samples
self.min_num_samples = min_num_samples if min_num_samples >= 0 else int(0.5 * num_samples)
self.__console = Console() if console is None else console
self.__console_mode = console_mode
self.section_block_size = section_block_size
self.cpus_per_actor = cpus_per_actor
self.num_actors = num_actors
self.catch_exceptions = catch_exceptions
self.__progress_log_interval = progress_log_interval
[docs]
def simulate(
self,
actor: Type[MonteCarloActor],
additional_dimensions: set[GridDimension] | None = None,
stage_arguments: Mapping[str, Any] | None = None,
) -> MonteCarloResult:
"""Launch the Monte Carlo simulation.
Args:
actor (Type[MonteCarloActor]):
The actor from which to generate the simulation samples.
additional_dimensions (Set[GridDimension], optional):
Additional dimensions to be added to the simulation grid.
stage_arguments (Mapping[str, Any], optional):
Arguments to be passed to the simulation stages.
If the argument is a sequence, the respective stage will iterate over the sequence.
Returns: A `MonteCarloResult` dataclass containing the simulation results.
"""
# Generate start timestamp
start_time = perf_counter()
# Print meta-information and greeting
if self.__console_mode != ConsoleMode.SILENT:
self.console.log(
f"Launched simulation campaign with {self.num_actors} dedicated actors"
)
# Add additional dimensions to configured simulation grid
_dimensions = self.__dimensions.copy()
if additional_dimensions is not None:
_dimensions.extend(additional_dimensions)
# Sort dimensions after impact in descending order
def sort(dimension: GridDimension) -> int:
if dimension.first_impact not in actor.stage_identifiers():
return 0
return actor.stage_identifiers().index(dimension.first_impact)
self.__dimensions.sort(key=sort)
max_num_samples = self.num_samples
dimension_str = f"{max_num_samples}"
for dimension in self.__dimensions:
max_num_samples *= dimension.num_sample_points
dimension_str += f" x {dimension.num_sample_points}"
if self.__console_mode != ConsoleMode.SILENT:
self.console.log(
f"Generating a maximum of {max_num_samples} = "
+ dimension_str
+ f" samples inspected by {len(self.__evaluators)} evaluators\n"
)
# Render simulation grid table
if self.__console_mode != ConsoleMode.SILENT and len(self.__dimensions) > 0:
dimension_table = Table(title="Simulation Grid", title_justify="left")
dimension_table.add_column("Dimension", style="cyan")
dimension_table.add_column("Sections", style="green")
for dimension in self.__dimensions:
section_str = ""
for sample_point in dimension.sample_points:
section_str += sample_point.title + " "
dimension_table.add_row(dimension.title, section_str)
self.console.print(dimension_table)
self.console.print()
# Initialize the sample grid to store the results
sample_grid = SampleGrid(self.__dimensions, self.__evaluators)
# Launch actors and queue the first tasks
with self.console.status("Launching Actor Pool...", spinner="dots") if self.__console_mode == ConsoleMode.INTERACTIVE else nullcontext(): # type: ignore
# Generate the actor pool
actors = [actor.options(num_cpus=self.cpus_per_actor).remote((self.__investigated_object, self.__dimensions, self.__evaluators), a, stage_arguments, self.catch_exceptions) for a in range(self.num_actors)] # type: ignore[attr-defined]
actor_pool = ActorPool(actors) # type: ignore
# Generate section sample containers and meta-information
grid_task_count = np.zeros(
[dimension.num_sample_points for dimension in self.__dimensions], dtype=int
)
grid_active_mask = np.ones(
[dimension.num_sample_points for dimension in self.__dimensions], dtype=bool
)
# Submit initial actor tasks
# 2 # A little overhead in task submission might speed things up? Not clear atm.
task_overhead = 0
for _ in range(self.num_actors + task_overhead):
_ = self.__queue_next(actor_pool, sample_grid, grid_active_mask, grid_task_count)
# Initialize progress bar
progress = Progress(
SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn(), transient=True
)
task1 = progress.add_task("Computing", total=max_num_samples)
last_progress_plot_time = 0.0
num_result_rows = min(10, self.section_block_size)
# Display results in a live table
status_group = Group("", progress)
with Live(status_group, console=self.console) if self.__console_mode == ConsoleMode.INTERACTIVE else nullcontext(): # type: ignore
# Keep executing until all samples are computed
while actor_pool.has_next():
# Receive samples from the queue
samples = self.__receive_next(
actor_pool, sample_grid, grid_active_mask, grid_task_count
)
# Queue next task and compute retrieve progress
self.__queue_next(actor_pool, sample_grid, grid_active_mask, grid_task_count)
# Update result log if enough time has passed
progress_plot_time = perf_counter()
if progress_plot_time - last_progress_plot_time > self.__progress_log_interval:
last_progress_plot_time = progress_plot_time
# Compute absolute progress
absolute_progress = 0
for section in sample_grid:
absolute_progress += section.num_samples
# Update progress bar visualization
if self.__console_mode == ConsoleMode.INTERACTIVE:
progress.update(task1, completed=absolute_progress)
result_rows: List[List[str]] = []
for sample in samples[:num_result_rows]:
results_row: List[str] = []
for dimension, section_idx in zip(
self.__dimensions, sample.grid_section
):
results_row.append(dimension.sample_points[section_idx].title)
results_row.append(str(sample_grid[sample.grid_section].num_samples))
for artifact in sample.artifacts:
results_row.append(str(artifact))
result_rows.append(results_row)
# Render results table
results_table = Table(min_width=self.console.measure(progress).minimum)
for dimension in self.__dimensions:
results_table.add_column(dimension.title, style="cyan")
results_table.add_column("#", style="blue")
for evaluator in self.__evaluators:
results_table.add_column(evaluator.abbreviation, style="green")
for result_row in result_rows:
results_table.add_row(*result_row)
status_group.renderables[0] = results_table
elif self.__console_mode == ConsoleMode.LINEAR:
self.console.log(f"Progress: {100*absolute_progress/max_num_samples:.3f}")
# Abort exectuion loop prematurely if all sections are flagged inactive
# Some results might be lost, but who cares? Speed! Speed! Speed!
# if absolute_progress >= self.max_num_samples:
# break
# Measure elapsed time
stop_time = perf_counter()
performance_time = stop_time - start_time
# Print finish notifier
if self.__console_mode != ConsoleMode.SILENT:
self.console.print()
self.console.log(f"Simulation finished after {performance_time:.2f} seconds")
# Compute the result
result = MonteCarloResult(
self.__dimensions, self.__evaluators, sample_grid, performance_time
)
# Print results
if self.__console_mode != ConsoleMode.SILENT:
self.console.print("")
result.print(self.console)
return result
def __receive_next(
self,
pool: ActorPool,
grid: SampleGrid,
grid_active_mask: np.ndarray,
grid_task_count: np.ndarray,
) -> List[MonteCarloSample]:
# Retrieve result from pool
runResult: ActorRunResult = pool.get_next_unordered(timeout=None)
# Display run message if the result
if runResult.message:
self.console.log(runResult.message)
# Save result
for sample in runResult.samples:
# Retrieve the respective grid section and add sample
grid_section: GridSection = grid[sample.grid_section]
grid_section.add_samples(sample, self.__evaluators)
# Update task counter
task_count = max(0, grid_task_count[grid_section.coordinates] - 1)
grid_task_count[grid_section.coordinates] = task_count
# Check for stopping criteria
if grid_active_mask[grid_section.coordinates]:
# Abort section if the number of samples is expected to be met
if grid_section.num_samples + task_count >= self.num_samples:
grid_active_mask[grid_section.coordinates] = False # pragma no cover
# Abort section if the confidence threshold has been reached
elif grid_section.num_samples >= self.min_num_samples:
grid_active_mask[grid_section.coordinates] = not grid_section.confidence_status(
self.__evaluators
)
return runResult.samples
def __queue_next(
self,
pool: ActorPool,
grid: SampleGrid,
grid_active_mask: np.ndarray,
grid_task_count: np.ndarray,
):
# Query active sections and respective task counts
active_sections = np.argwhere(grid_active_mask)
active_sections_task_count = grid_task_count[grid_active_mask]
program: List[Tuple[int, ...]] = []
for section_coordinates, task_count in zip(
active_sections[: self.section_block_size, :],
active_sections_task_count.flat[: self.section_block_size],
):
section_coordinates = tuple(section_coordinates)
program.append(section_coordinates)
task_count += 1
grid_task_count[section_coordinates] = task_count
if task_count + grid[section_coordinates].num_samples >= self.num_samples:
grid_active_mask[section_coordinates] = False
break
# ToDo: Enhance routine to always submit section_block_size amount of indices per program
if len(program) > 0:
pool.submit(lambda a, p: a.run.remote(p), program)
@property
def investigated_object(self) -> Any:
"""The object to be investigated during the simulation runtime."""
return self.__investigated_object
[docs]
def new_dimension(
self, dimension: str, sample_points: List[Any], *args: Any, **kwargs
) -> GridDimension:
"""Add a dimension to the simulation grid.
Must be a property of the investigated object.
Args:
dimension (str):
String representation of dimension location relative to the investigated object.
sample_points (List[Any]):
List points at which the dimension will be sampled into a grid.
The type of points must be identical to the grid arguments / type.
*args (Tuple[Any], optional):
References to the object the dimension belongs to.
Resolved to the investigated object by default,
but may be an attribute or sub-attribute of the investigated object.
**kwargs:
Additional initialization arguments passed to :class:`GridDimension`.
Returns:
The newly created dimension object.
"""
considered_objects = (self.__investigated_object,) if len(args) < 1 else args
grid_dimension = GridDimension(considered_objects, dimension, sample_points, **kwargs)
self.add_dimension(grid_dimension)
return grid_dimension
[docs]
def add_dimension(self, dimension: GridDimension) -> None:
"""Add a new dimension to the simulation grid.
Args:
dimension:
Dimension to be added.
Raises:
ValueError:
If the `dimension` already exists within the grid.
"""
if dimension in self.__dimensions:
raise ValueError("Dimension instance already registered within the grid")
self.__dimensions.append(dimension)
[docs]
def remove_dimension(self, dimension: GridDimension) -> None:
"""Remove an existing dimension from the simulation grid.
Args:
dimension (GridDimension):
The dimension to be removed.
Raises:
ValueError: If the `dimension` does not exist.
"""
if dimension not in self.__dimensions:
raise ValueError("Dimension not registered within the current simulation grid")
self.__dimensions.remove(dimension)
@property
def dimensions(self) -> List[GridDimension]:
"""Simulation grid dimensions which make up the grid."""
return self.__dimensions.copy()
[docs]
def add_evaluator(self, evaluator: Evaluator) -> None:
"""Add new evaluator to the Monte Carlo simulation.
Args:
evaluator (Evaluator[MO]):
The evaluator to be added.
"""
self.__evaluators.append(evaluator)
@property
def evaluators(self) -> List[Evaluator]:
"""Evaluators used to process the investigated object sample state."""
return self.__evaluators.copy()
@property
def num_samples(self) -> int:
"""Number of samples per simulation grid element.
Returns:
int: Number of samples
Raises:
ValueError: If number of samples is smaller than one.
"""
return self.__num_samples
@num_samples.setter
def num_samples(self, value: int) -> None:
"""Set number of samples per simulation grid element."""
if value < 1:
raise ValueError("Number of samples must be greater than zero")
self.__num_samples = value
@property
def min_num_samples(self) -> int:
"""Minimum number of samples per simulation grid element.
Returns:
int: Number of samples
Raises:
ValueError: If number of samples is smaller than zero.
"""
return self.__min_num_samples
@min_num_samples.setter
def min_num_samples(self, value: int) -> None:
"""Set minimum number of samples per simulation grid element."""
if value < 0.0:
raise ValueError("Number of samples must be greater or equal to zero")
self.__min_num_samples = value
@property
def max_num_samples(self) -> int:
"""Maximum number of samples over the whole simulation.
Returns:
int: Number of samples.
"""
num_samples = self.num_samples
for dimension in self.__dimensions:
num_samples *= dimension.num_sample_points
return num_samples
@property
def num_actors(self) -> int:
"""Number of dedicated actors spawned during simulation runs.
Returns:
int: Number of actors.
Raises:
ValueError: If the number of actors is smaller than zero.
"""
# Return the number of available CPU cores as default value
if self.__num_actors is not None:
return self.__num_actors
# Otherwise, the number of actors depends on the number of available CPUs and
# the cpu requirements per actor
return max(1, int(ray.available_resources().get("CPU", 1) / self.cpus_per_actor))
@num_actors.setter
def num_actors(self, value: Optional[int]) -> None:
"""Set number of dedicated actors spawned during simulation runs."""
if value is None:
self.__num_actors = None
elif value < 1:
raise ValueError("Number of actors must be greater or equal to one")
else:
self.__num_actors = value
@property
def console(self) -> Console:
"""Console the Simulation writes to.
Returns:
Console: Handle to the console.
"""
return self.__console
@console.setter
def console(self, value: Console) -> None:
self.__console = value
@property
def section_block_size(self) -> int:
"""Number of generated samples per section block.
Returns:
int: Number of samples per block.
Raises:
ValueError:
If the block size is smaller than one.
"""
if self.__section_block_size is not None:
return self.__section_block_size
size = 1
for dimension in self.__dimensions:
size *= dimension.num_sample_points
return size
@section_block_size.setter
def section_block_size(self, value: Optional[int]) -> None:
if value is not None and value < 1:
raise ValueError("Section block size must be greater or equal to one")
self.__section_block_size = value
@property
def cpus_per_actor(self) -> int:
"""Number of CPU cores reserved for each actor.
Returns:
Number of cores.
Raises:
ValueError: If the number of cores is smaller than one
"""
return self.__cpus_per_actor
@cpus_per_actor.setter
def cpus_per_actor(self, num: int) -> None:
if num < 1:
raise ValueError("Number if CPU cores per actor must be greater or equal to one")
self.__cpus_per_actor = num
@property
def console_mode(self) -> ConsoleMode:
"""Console mode during simulation runtime.
Returms: The current console mode.
"""
return self.__console_mode
@console_mode.setter
def console_mode(self, value: Union[ConsoleMode, str]) -> None:
# Convert string arguments to iterable
if isinstance(value, str):
value = ConsoleMode[value]
self.__console_mode = value
@staticmethod
def _py_modules() -> List[str]:
"""List of python modules required by remote workers.
In order to deploy Ray to computing clusters, dependencies listed here
will be installed on remote nodes.
Returns:
List of module names.
"""
return [path.join(path.dirname(path.realpath(__file__)), "..")]
@staticmethod
def _pip_packages() -> List[str]:
"""List of python packages required by remote workers.
In order to deploy Ray to computing clusters, dependencies listed here
will be installed on remote nodes.
Returns:
List of package names.
"""
return ["ray", "numpy", "scipy", "matplotlib", "rich"]