Source code for hermespy.core.monte_carlo

# -*- coding: utf-8 -*-
"""
=======
PyMonte
=======

PyMonte is a stand-alone core module of HermesPy,
enabling efficient and flexible Monte Carlo simulations over arbitrary configuration parameter combinations.
By wrapping the core of the `Ray`_ project,
any object serializable by the `pickle`_ standard module can become a system model for a Monte Carlo style simulation
campaign.

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   subgraph gridsection[Grid Section]

      parameter_a(Parameter)
      parameter_b(Parameter)
   end

   object((Investigated Object))
   evaluator_a{{Evaluator}}
   evaluator_b{{Evaluator}}
   evaluator_c{{Evaluator}}

   subgraph sample[Sample]

       artifact_a[(Artifact)]
       artifact_b[(Artifact)]
       artifact_c[(Artifact)]
   end

   parameter_a --> object
   parameter_b --> object
   object ---> evaluator_a ---> artifact_a
   object ---> evaluator_b ---> artifact_b
   object ---> evaluator_c ---> artifact_c


Monte Carlo simulations usually sweep over multiple combinations of multiple parameters settings,
configuring the underlying system model and generating simulation samples from independent realizations
of the model state.
PyMonte refers to a single parameter combination as :class:`.GridSection`,
with the set of all parameter combinations making up the simulation grid.
Each settable property of the investigated object is treated as a potential simulation parameter within the grid,
i.e. each settable property can be represented by an axis within the multidimensional simulation grid.

:class:`.Evaluator` instances extract performance indicators from each investigated object realization, referred to as :class:`.Artifact`.
A set of artifacts drawn from the same investigated object realization make up a single :class:`.MonteCarloSample`.
During the execution of PyMonte simulations between :math:`M_\\mathrm{min}` and :math:`M_\\mathrm{max}`
are generated from investigated object realizations for each grid section.
The sample generation for each grid section may be aborted prematurely if all evaluators have reached a configured
confidence threshold
Refer to :footcite:t:`2014:bayer` for a detailed description of the implemented algorithm.

.. mermaid::

   %%{init: {'theme': 'dark'}}%%
   flowchart LR

   controller{Simulation Controller}

   gridsection_a[Grid Section]
   gridsection_b[Grid Section]

   sample_a[Sample]
   sample_b[Sample]

   subgraph actor_a[Actor #1]

       object_a((Investigated Object))
   end

   subgraph actor_b[Actor #N]

       object_b((Investigated Object))
   end

   controller --> gridsection_a --> actor_a --> sample_a
   controller --> gridsection_b --> actor_b --> sample_b


The actual simulation workload distribution is visualized in the previous flowchart.
Using `Ray`_, PyMonte spawns a number of :class:`.MonteCarloActor` containers,
with the number of actors depending on the available resources (i.e. number of CPU cores) detected.
A central simulation controller schedules the workload by assigning :class:`.GridSection` indices as tasks
to the actors, which return the resulting simulation Samples after the simulation iteration is completed.

.. _Ray: https://www.ray.io/
.. _pickle: https://docs.python.org/3/library/pickle.html
"""

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from collections.abc import Sequence
from contextlib import nullcontext
from functools import reduce
from math import exp, sqrt
from os import path
from time import perf_counter
from typing import (
    Any,
    Callable,
    Generic,
    List,
    Optional,
    Type,
    TypeVar,
    Tuple,
    Union,
    SupportsFloat,
)
from warnings import catch_warnings, simplefilter

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import ray
from matplotlib.axis import Axis
from mpl_toolkits.mplot3d import Axes3D  # type: ignore
from ray.util import ActorPool
from rich.console import Console, Group
from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn
from rich.live import Live
from rich.table import Table
from scipy.constants import pi
from scipy.io import savemat
from scipy.stats import norm

from hermespy.tools import lin2db
from .executable import Executable
from .definitions import ConsoleMode
from .logarithmic import LogarithmicSequence, ValueType
from .visualize import VAT, Visualizable

__author__ = "Jan Adler"
__copyright__ = "Copyright 2023, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.2.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


MO = TypeVar("MO")
"""Type of Monte Carlo object under investigation.

:meta private:
"""

AT = TypeVar("AT")
"""Type of Monte Carlo evaluation artifact.

:meta private:
"""

FAT = TypeVar("FAT", bound=SupportsFloat)
"""Type of floating point Monte Carlo evaluation artifact.

:meta private:
"""

ET = TypeVar("ET")
"""Type of Monte Carlo evaluation.

:meta private:
"""

SERT = TypeVar("SERT", bound="ScalarEvaluationResult")
"""Type of scalar Monte Carlo evaluation result.

:meta private:
"""

EV = TypeVar("EV", bound="Evaluator")
"""Type of Monte Carlo evalutor.

:meta private:
"""


[docs] class Artifact(object): """Result of an investigated object evaluation. Generated by :class:`.Evaluator` instances operating on investigated object states. In other words, :meth:`.Evaluator.evaluate` is expected to return an instance derived of this base class. Artifacts may, in general represent any sort of object. However, it is encouraged to provide a scalar floating-point representation for data visualization by implementing the :meth:`.to_scalar` method. """ @abstractmethod def __str__(self) -> str: """String representation of this artifact. Will be used to visualize the artifact's content in console outputs. Returns: str: String representation. """ ... # pragma: no cover
[docs] @abstractmethod def to_scalar(self) -> float | None: """Scalar representation of this artifact's content. Used to evaluate premature stopping criteria for the underlying evaluation. Returns: Scalar floating-point representation. `None` if a conversion to scalar is impossible. """ ... # pragma: no cover
[docs] class ArtifactTemplate(Generic[FAT], Artifact): """Scalar numerical result of an investigated object evaluation. Implements the common case of an :class:`.Artifact` representing a scalar numerical value. """ __artifact: FAT # Artifact def __init__(self, artifact: FAT) -> None: """ Args: artifact (AT): Artifact value. """ self.__artifact = artifact @property def artifact(self) -> FAT: """Evaluation artifact. Provides direct access to the represented artifact. Returns: AT: Copy of the artifact. """ return self.__artifact def __str__(self) -> str: return f"{self.to_scalar():.3f}"
[docs] def to_scalar(self) -> float: return float(self.artifact)
[docs] class Evaluation(Visualizable): """Evaluation of a single simulation sample. Evaluations are generated by :class:`Evaluators <Evaluator>` during :meth:`Evaluator.evaluate`. """
[docs] @abstractmethod def artifact(self) -> Artifact: """Generate an artifact from this evaluation. Returns: The evaluation artifact.""" ... # pragma: no cover
[docs] class EvaluationTemplate(Generic[ET], Evaluation, ABC): evaluation: ET def __init__(self, evaluation: ET) -> None: self.evaluation = evaluation
[docs] class EvaluationResult(Visualizable, ABC): """Result of an evaluation routine iterating over a parameter grid. Evaluation results are generated by :class:`Evaluator Instances <.Evaluator>` as a final step within the evaluation routine. """ __grid: Sequence[GridDimension] def __init__(self, grid: Sequence[GridDimension], evaluator: Evaluator) -> None: """ Args: grid (Sequence[GridDimension]): Parameter grid over which the simulation generating this result iterated. """ # Initialize base class Visualizable.__init__(self) # Initialize class attributes self.__grid = grid self.__evaluator = evaluator @property def grid(self) -> Sequence[GridDimension]: """Paramter grid over which the simulation iterated.""" return self.__grid @property def evaluator(self) -> Evaluator: """Evaluator that generated this result.""" return self.__evaluator def __result_to_str(self, value: Any) -> str: """Convert a single result value to its string representation. Args: value (Any): The value to be converted. Returns: The values string representation. """ if isinstance(value, (int, float, complex)): if self.evaluator.tick_format == ValueType.DB or self.evaluator.plot_scale == "log": return f"{lin2db(value):.2g} dB" return f"{value:.2g}" return str(value)
[docs] def print(self, console: Console | None = None) -> None: """Print a readable version of this evaluation result. Args: console (Console | None): Rich console to print in. If not provided, a new one will be generated. """ _console = Console() if console is None else console # Query the array representation of this evaluation result results = self.to_array() # Render results into a rich table table = Table() for dimension in self.grid: table.add_column(dimension.title, style="cyan") table.add_column(self.evaluator.abbreviation) for grid_section in np.ndindex(*[d.num_sample_points for d in self.grid]): columns = [g.sample_points[s].title for s, g in zip(grid_section, self.grid)] result = results[grid_section] result_str: str if isinstance(result, np.ndarray): if result.size == 1: result_str = self.__result_to_str(result.flat[0]) else: result_str = "" for value in result.flat: result_str += self.__result_to_str(value) + ", " result_str = result_str[:-2] else: result_str = self.__result_to_str(result) table.add_row(*columns, result_str) _console.print(table)
[docs] @abstractmethod def to_array(self) -> np.ndarray: """Convert the evaluation result raw data to an array representation. Used to store the results in arbitrary binary file formats after simulation execution. Returns: The array result representation. """ ... # pragma: no cover
@staticmethod def __configure_axis(axis: Axis, tick_format: ValueType = ValueType.LIN) -> None: """Subroutine to configure an axis. Args: axis (Axis): The matplotlib axis to be configured. tick_format (ValueType): The tick format. """ if tick_format is ValueType.DB: axis.set_major_formatter(ticker.FuncFormatter(lambda x, _: f"{lin2db(x):.2g}dB")) def _plot_linear(self, scalar_data: np.ndarray, axes: VAT) -> None: """Visualize result as a linear plot. Args: sample_points (Sequence[SamplePoint]): Grid coordinates to visualize. scalar_data (np.ndarray): Results at the respective coordinates. axes (plt.Axes): Axes to plot in. Must be initialized as a 2D axes. Raises: RuntimeError: If the result's grid contains more than one dimension. """ # If the grid contains more than one dimension, raise an error if len(self.grid) != 1: raise RuntimeError("Cannot plot multidimensional results in a linear plot") ax: plt.Axes = axes.flat[0] # Configure axes labels ax.set_xlabel(self.grid[0].title) ax.set_ylabel(self.evaluator.abbreviation) # Configure axes scales ax.set_xscale(self.grid[0].plot_scale) ax.set_yscale(self.evaluator.plot_scale) EvaluationResult.__configure_axis(ax.xaxis, self.grid[0].tick_format) EvaluationResult.__configure_axis(ax.yaxis, self.evaluator.tick_format) # Plot final result sample_values = np.asarray([s.value for s in self.grid[0].sample_points]) ax.plot(sample_values, scalar_data) def _plot_surface(self, data: np.ndarray, axes: VAT) -> None: """Visualize result as a surface plot. Args: data (np.ndarray): Results at the respective coordinates. axes (plt.Axes): Axes to plot in. Must be initialized as a 3D axes. Raises: RuntimeError: If the result's grid contains more than two dimensions. """ # If the grid contains more than two dimensions, raise an error if len(self.grid) != 2: raise RuntimeError("Cannot plot multidimensional results in a surface plot") ax: Axes3D = axes.flat[0] # Configure axes labels and scales ax.set( xlabel=self.grid[0].title, ylabel=self.grid[1].title, zlabel=self.evaluator.abbreviation ) EvaluationResult.__configure_axis(ax.xaxis, self.grid[0].tick_format) EvaluationResult.__configure_axis(ax.yaxis, self.grid[1].tick_format) EvaluationResult.__configure_axis(ax.zaxis, self.evaluator.tick_format) x_points = np.asarray([s.value for s in self.grid[0].sample_points]) y_points = np.asarray([s.value for s in self.grid[1].sample_points]) y, x = np.meshgrid(y_points, x_points) ax.plot_surface(x.astype(float), y.astype(float), data) def _plot_multidim( self, scalar_data: np.ndarray, x_dimension: int, y_label: str, x_scale: str, y_scale: str, axes: VAT, ) -> None: """Plot multidimensional simulation results into a two-dimensional axes system. Args: scalar_data (np.ndarray): Scalar data to be plotted. x_dimension (int): The dimension index featured on the plot's x-axis. y_label (str): The plot's y-axis label x_scale (str): The plot's x-axis scale. y_scale (str): The plot's y-axis scale. axes (plt.Axes): Axes to plot in. Must be initialized as a 2D axes. """ ax: plt.Axes = axes.flat[0] # Configure axes labels ax.set_xlabel(self.grid[x_dimension].title) ax.set_ylabel(y_label) # Configure axes scales ax.set_yscale(y_scale) ax.set_xscale(x_scale) subgrid = list(self.grid).copy() del subgrid[x_dimension] x_points = np.asarray([s.value for s in self.grid[x_dimension].sample_points]) section_magnitudes = tuple(s.num_sample_points for s in subgrid) for section_indices in np.ndindex(section_magnitudes): # Generate the graph line label line_label = "" for i, v in enumerate(section_indices): line_label += f"{self.grid[i+1].title} = {self.grid[i+1].sample_points[v].title}, " line_label = line_label[:-2] if len(line_label) < 1: line_label = None # Select the graph line scalars line_scalars = scalar_data[(..., *section_indices)] # type: ignore # Plot the graph line ax.plot(x_points, line_scalars, label=line_label) if len(section_magnitudes) > 0 and section_magnitudes[0] > 1: with Executable.style_context(): ax.legend() @staticmethod def _plot_empty(axes: VAT) -> None: """Plot an empty notice into the provided axes. Args: axes (VAT): The axes to plot into. """ ax: plt.Axes for ax in axes.flat: ax.text(0.5, 0.5, "NO DATA AVAILABLE", horizontalalignment="center")
[docs] class ScalarEvaluationResult(EvaluationResult): """Base class for scalar evaluation results.""" plot_surface: bool __scalar_results: np.ndarray __base_dimension_index: int def __init__( self, grid: Sequence[GridDimension], scalar_results: np.ndarray, evaluator: Evaluator, plot_surface: bool = True, ) -> None: """ Args: grid (Sequence[GridDimension]): Simulation grid. scalar_results (np.ndarray): Scalar results generated from collecting samples over the simulation grid. evaluator (Evaluator): The evaluator generating the results. plot_surface (bool, optional): Enable surface plotting for two-dimensional grids. Enabled by default. """ # Initialize base class EvaluationResult.__init__(self, grid, evaluator) # Initialize class attributes self.__scalar_results = scalar_results self.plot_surface = plot_surface self.__base_dimension_index = 0 @property def title(self) -> str: # The plotting title should resolve to the represented evaluator's title return self.evaluator.title def _new_axes(self, **kwargs) -> Tuple[plt.Figure, VAT]: # Generate 3D axes with surface plotting enabled if len(self.grid) == 2 and self.plot_surface: fig, axes = plt.subplots(1, 1, squeeze=False, subplot_kw={"projection": "3d"}) # Generate 2D axes otherwise else: fig, axes = plt.subplots(1, 1, squeeze=False) return fig, axes def _plot(self, axes: VAT) -> None: # If the grid contains no data, dont plot anything if len(self.grid) < 1: self._plot_empty(axes) return # Shuffle grid and respective scalar results so that the selected base dimension is always the first entry grid = list(self.grid).copy() grid.insert(0, grid.pop(self.__base_dimension_index)) scalars = np.moveaxis(self.__scalar_results, self.__base_dimension_index, 0) # A single axis plot, the simple case if len(grid) < 2: self._plot_linear(scalars, axes) # Two dimensions, with surface plotting enabled elif len(grid) == 2 and self.plot_surface: self._plot_surface(scalars, axes) # Multiple dimensions, resort to legend-based multiplots else: self._plot_multidim( scalars, 0, self.evaluator.abbreviation, grid[0].plot_scale, self.evaluator.plot_scale, axes, )
[docs] def to_array(self) -> np.ndarray: return self.__scalar_results
[docs] @classmethod def From_Artifacts( cls: Type[SERT], grid: Sequence[GridDimension], artifacts: np.ndarray, evaluator: Evaluator, plot_surface: bool = True, ) -> SERT: """Generate a scalar evaluation result from a set of artifacts. Args: grid (Sequence[GridDimension]): The simulation grid. artifacts (np.ndarray): Numpy object array whose dimensions represent grid dimensions. evaluator (Evaluator): The evaluator generating the artifacts. plot_surface (bool): Whether to plot the result as a surface plot. Returns: The scalar evaluation result. """ scalar_results = np.empty(artifacts.shape, dtype=float) for section_coords in np.ndindex(artifacts.shape): scalar_results[section_coords] = np.mean( [artifact.to_scalar() for artifact in artifacts[section_coords]] ) return cls(grid, scalar_results, evaluator, plot_surface)
[docs] class Evaluator(ABC): """Evaluation routine for investigated object states, extracting performance indicators of interest. Evaluators represent the process of extracting arbitrary performance indicator samples :math:`X_m` in the form of :class:`.Artifact` instances from investigated object states. Once a :class:`.MonteCarloActor` has set its investigated object to a new random state, it calls the :func:`.evaluate` routines of all configured evaluators, collecting the resulting respective :class:`.Artifact` instances. For a given set of :class:`.Artifact` instances, evaluators are expected to report a :meth:`.confidence_level` which may result in a premature abortion of the sample collection routine for a single :class:`.GridSection`. By default, the routine suggested by :footcite:t:`2014:bayer` is applied: Considering a tolerance :math:`\\mathrm{TOL} \\in \\mathbb{R}_{++}` the confidence in the mean performance indicator .. math:: \\bar{X}_M = \\frac{1}{M} \\sum_{m = 1}^{M} X_m is considered sufficient if a threshold :math:`\\delta \\in \\mathbb{R}_{++}`, defined as .. math:: \\mathrm{P}\\left(\\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\| > \\mathrm{TOL} \\right) \\leq \\delta has been crossed. The number of collected actually collected artifacts per :class:`.GridSection` :math:`M \\in [M_{\\mathrm{min}}, M_{\\mathrm{max}}]` is between a minimum number of required samples :math:`M_{\\mathrm{min}} \\in \\mathbb{R}_{+}` and an upper limit of :math:`M_{\\mathrm{max}} \\in \\mathbb{R}_{++}`. """ tick_format: ValueType __confidence: float __tolerance: float __plot_scale: str # Plot axis scaling def __init__(self) -> None: self.confidence = 1.0 self.tolerance = 0.0 self.plot_scale = "linear" self.tick_format = ValueType.LIN
[docs] @abstractmethod def evaluate(self) -> Evaluation: """Evaluate the state of an investigated object. Implements the process of extracting an arbitrary performance indicator, represented by the returned :class:`.Artifact` :math:`X_m`. Returns: Artifact :math:`X_m` resulting from the evaluation. """ ... # pragma: no cover
@property @abstractmethod def abbreviation(self) -> str: """Short string representation of this evaluator. Used as a label for console output and plot axes annotations. """ ... # pragma: no cover @property @abstractmethod def title(self) -> str: """Long string representation of this evaluator. Used as plot title. """ ... # pragma: no cover @property def confidence(self) -> float: """Confidence threshold required for premature simulation abortion. The confidence threshold :math:`\\delta \\in [0, 1]` is the upper bound to the confidence level .. math:: \\mathrm{P}\\left(\\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\| > \\mathrm{TOL} \\right) at which the sample collection for a single :class:`.GridSection` may be prematurely aborted :footcite:p:`2014:bayer`. Raises: ValueError: If confidence is lower than zero or greater than one. """ return self.__confidence @confidence.setter def confidence(self, value: float) -> None: if value < 0.0 or value > 1.0: raise ValueError("Confidence level must be in the interval between zero and one") self.__confidence = value @property def tolerance(self) -> float: """Tolerance level required for premature simulation abortion. The tolerance :math:`\\mathrm{TOL} \\in \\mathbb{R}_{++}` is the upper bound to the interval .. math:: \\left\\| \\bar{X}_M - \\mathrm{E}\\left[ X \\right] \\right\\| by which the performance indicator estimation :math:`\\bar{X}_M` may diverge from the actual expected value :math:`\\mathrm{E}\\left[ X \\right]`. Returns: float: Non-negative tolerance :math:`\\mathrm{TOL}`. Raises: ValueError: If tolerance is negative. """ return self.__tolerance @tolerance.setter def tolerance(self, value: float) -> None: if value < 0.0: raise ValueError("Tolerance must be greater or equal to zero") self.__tolerance = value def __str__(self) -> str: """String object representation. Returns: str: String representation. """ return self.abbreviation @staticmethod def _scalar_cdf(scalar: float) -> float: """Assumed cumulative probability of the scalar representation. Args: scalar (float): The scalar value. Returns: float: Cumulative probability between zero and one. """ return norm.cdf(scalar) @property def plot_scale(self) -> str: """Scale of the scalar evaluation plot. Refer to the `Matplotlib`_ documentation for a list of a accepted values. Returns: str: The scale identifier string. .. _Matplotlib: https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.set_yscale.html """ return self.__plot_scale @plot_scale.setter def plot_scale(self, value: str) -> None: self.__plot_scale = value
[docs] @abstractmethod def generate_result( self, grid: Sequence[GridDimension], artifacts: np.ndarray ) -> EvaluationResult: """Generates an evaluation result from the artifacts collected over the whole simulation grid. Args: grid (Sequence[GridDimension]): The Simulation grid. artifacts (np.ndarray): Numpy object array whose dimensions represent grid dimensions. Returns: The evaluation result. """ ... # pragma: no cover
[docs] class MonteCarloSample: """Single sample of a Monte Carlo simulation.""" __sample_index: int # Index of the sample # Grid section from which the sample was generated __grid_section: Tuple[int, ...] __artifacts: Sequence[Artifact] # Artifacts of evaluation def __init__( self, grid_section: Tuple[int, ...], sample_index: int, artifacts: Sequence[Artifact] ) -> None: """ Args: grid_section (Tuple[int, ...]): Grid section from which the sample was generated. sample_index (int): Index of the sample. In other words this object represents the `sample_index`th sample of the selected `grid_section`. artifacts (Sequence[Artifact]): Artifacts of evaluation """ self.__grid_section = grid_section self.__sample_index = sample_index self.__artifacts = artifacts @property def grid_section(self) -> Tuple[int, ...]: """Grid section from which this sample was generated. Returns: Tuple[int, ...]: Tuple of grid section indices. """ return self.__grid_section @property def sample_index(self) -> int: """Index of the sample this object represents. Returns: int: Sample index. """ return self.__sample_index @property def artifacts(self) -> Sequence[Artifact]: """Artifacts resulting from the sample's evaluations. Returns: List[Artifact]: List of artifacts. """ return self.__artifacts @property def num_artifacts(self) -> int: """Number of contained artifact objects. Returns: int: Number of artifacts. """ return len(self.__artifacts) @property def artifact_scalars(self) -> np.ndarray: """Collect scalar artifact representations. Returns: np.ndarray: Vector of scalar artifact representations. """ return np.array([artifact.to_scalar() for artifact in self.artifacts], dtype=float)
[docs] class GridSection(object): # Berry-Esseen constants ToDo: Check the proper selection here __C_0: float = 0.4785 __C_1: float = 30.2338 # Section indices within the simulation grid __coordinates: Tuple[int, ...] __samples: List[MonteCarloSample] # Set of generated samples # Confidence level for each evaluator __evaluator_confidences: np.ndarray def __init__(self, coordinates: Tuple[int, ...], num_evaluators: int) -> None: """ Args: coordinates (Tuple[int, ...]): Section indices within the simulation grid. """ self.__coordinates = coordinates self.__samples = list() self.__num_evaluators = num_evaluators self.__evaluator_confidences = np.zeros(num_evaluators, dtype=float) self.__means = np.zeros(num_evaluators, dtype=float) self.__second_moments = np.zeros(num_evaluators, dtype=float) self.__third_moments = np.zeros(num_evaluators, dtype=float) self.__third_moments_abs = np.zeros(num_evaluators, dtype=float) self.__fourth_moments = np.zeros(num_evaluators, dtype=float) @property def coordinates(self) -> Tuple[int, ...]: """Grid section coordinates within the simulation grid. Returns: Tuple[int, ...]: Section coordinates. """ return self.__coordinates @property def num_samples(self) -> int: """Number of already generated samples within this section. Returns: int: Number of samples. """ return len(self.__samples) @property def samples(self) -> List[MonteCarloSample]: """The collected evaluation samples within this grid section. Returns: List of samples. """ return self.__samples
[docs] def add_samples( self, samples: MonteCarloSample | Sequence[MonteCarloSample], evaluators: Sequence[Evaluator], ) -> None: """Add a new sample to this grid section collection. Args: samples (Union[MonteCarloSample, Sequence[MonteCarloSample])): Samples to be added to this section. evaluators (Sequence[Evaluator]): References to the evaluators generating the sample artifacts. Raises: ValueError: If the number of artifacts in `sample` does not match the initialization. """ _samples = samples if isinstance(samples, Sequence) else [samples] for sample in _samples: # Make sure the provided number of artifacts is correct if sample.num_artifacts != self.__num_evaluators: raise ValueError( f"Number of sample artifacts ({sample.num_artifacts}) does not match the " f"configured number of evaluators ({self.__num_evaluators})" ) # Update confidences self.__update_confidences(sample.artifact_scalars, evaluators) # Append sample to the stored list self.__samples.append(sample)
@property def confidences(self) -> np.ndarray: """Confidence in the estimated evaluations. Returns: Array indicating probabilities for each evaluator """ return self.__evaluator_confidences
[docs] def confidence_status(self, evaluators: Sequence[Evaluator]) -> bool: """Check if each evaluator has reached its required confidence thershold. Conidence indicates that the simulation for the parameter combination this grid section represents may be aborted, i.e. no more samples are required. Args: evaluators (Sequence[Evaluator]): Evaluators giving feedback about their cofidence status. Returns: Confidence indicator. """ for evaluator, confidence in zip(evaluators, self.__evaluator_confidences): if confidence < evaluator.confidence: return False return True
def __update_confidences( self, artifact_scalars: np.ndarray, evaluators: Sequence[Evaluator] ) -> None: """Update the confidence estimates for this grid section. Args: artifact_scalars (np.ndarray): Scalar representations of the artifacts generated by the sample. evaluators (Sequence[Evaluator]): Evaluators generating the artifacts. Raises: ValueError: If the artifact scalars argument is not a vector. ValueError: If the number of artifact scalars does not match the number of configured evaluators. """ # Raise a value error if the scalars argument is not a vector if artifact_scalars.ndim != 1: raise ValueError("Artifact scalars must be a vector (on-dimensional array)") if len(artifact_scalars) != self.__num_evaluators: raise ValueError( "Number of artifact scalars does not match the number of configured evaluators" ) for e, ( scalar, evaluator, mean, second_moment, third_moment, third_moment_abs, fourth_moment, ) in enumerate( zip( artifact_scalars, evaluators, self.__means, self.__second_moments, self.__third_moments, self.__third_moments_abs, self.__fourth_moments, ) ): # Return zero if the evaluator tolerance is set to zero if evaluator.tolerance == 0.0: self.__evaluator_confidences[e] = 0.0 continue n = self.num_samples # Current number of samples nn = 1 + self.num_samples # New number of samples # Update the moment estimates delta = scalar - mean updated_mean = (n * mean) / nn updated_second_moment = second_moment + delta**2 * (n) / nn updated_third_moment = ( third_moment + delta**3 * (n**2 - n) / nn**2 - 3 * second_moment * delta / nn ) updated_third_moment_abs = third_moment_abs + abs( delta**3 * (n**2 - n) / nn**2 - 3 * second_moment * delta / nn ) updated_fourth_moment = ( fourth_moment + delta**4 * (n**3 - n**2 + n) / nn**3 + 6 * delta**2 * second_moment / nn**2 - 4 * delta * third_moment / nn ) # Store the updated moment estimates for the next update self.__means[e] = updated_mean self.__second_moments[e] = updated_second_moment self.__third_moments[e] = updated_third_moment self.__third_moments_abs[e] = updated_third_moment_abs self.__fourth_moments[e] = updated_fourth_moment # Abort if the second moment indicates no variance or not enough samples have been collected if updated_second_moment == 0.0 or nn < 2: self.__evaluator_confidences[e] = 1.0 continue # Compute moments (equation 4.1) deviance = sqrt(updated_second_moment / nn) beta_bar_moment = third_moment / (nn * deviance**3) beta_hat_moment = third_moment_abs / (nn * deviance**3) kappa_moment = fourth_moment / (nn * deviance**4) - 3 # Estimate the confidence sample_sqrt = sqrt(nn) sigma_tolerance = sample_sqrt * evaluator.tolerance / deviance sigma_tolerance_squared = sigma_tolerance**2 kappa_term = 4 * (2 / (nn - 1) + kappa_moment / nn) confidence_bound = 2 * (1 - evaluator._scalar_cdf(sigma_tolerance)) + 2 * min( self.__C_0, self.__C_1 / (1 + abs(sigma_tolerance)) ** 3 ) * beta_bar_moment / sample_sqrt * min(1.0, kappa_term) # This is necessary to preventa math overflow from the exponential denominator if kappa_term < 1.0 and sigma_tolerance_squared < 1418.0: confidence_bound += ( (1 - kappa_term) * abs(sigma_tolerance_squared - 1) * abs(beta_hat_moment) / (exp(0.5 * sigma_tolerance_squared) * 3 * sqrt(2 * pi * n) * deviance**3) ) # Store the current confidence estimate self.__evaluator_confidences[e] = 1.0 - min(1.0, confidence_bound)
[docs] class ActorRunResult(object): """Result object returned by generating a single sample from a simulation runner.""" samples: List[MonteCarloSample] | None message: str | None def __init__( self, samples: List[MonteCarloSample] | None = None, message: str | None = None ) -> None: """ Args: samples (List[MonteCarloSample], optional): Samples generated by the remote actor run. message (str, optional): Message return from the remote actor run. """ self.samples = [] if not samples else samples self.message = message
[docs] class SampleGrid(object): """Grid of simulation samples.""" __sections: np.ndarray def __init__( self, grid_configuration: List[GridDimension], evaluators: Sequence[Evaluator] ) -> None: """ Args: grid_configuration (List[GridDimension]): The simulation grid configuration. evaluators (Sequence[Evaluator]): The evaluators generating the artifacts. """ self.__sections = np.empty( [dimension.num_sample_points for dimension in grid_configuration], dtype=np.object_ ) num_evaluators = len(evaluators) for coordinates in np.ndindex( *[dimension.num_sample_points for dimension in grid_configuration] ): coordinate_tuple = tuple(coordinates) self.__sections[coordinate_tuple] = GridSection(coordinate_tuple, num_evaluators) def __getitem__(self, coordinates: Tuple[int, ...]) -> GridSection: return self.__sections[coordinates] def __iter__(self): """Iterating over the sample grid is equivalent to iterating over the sections tree""" return iter(self.__sections.flat)
[docs] class MonteCarloActor(Generic[MO]): """Monte Carlo Simulation Actor. Actors are essentially workers running in a private process executing simulation tasks. The result of each individual simulation task is a simulation sample. """ catch_exceptions: bool # Catch exceptions during run. __investigated_object: MO # Copy of the object to be investigated __grid: Sequence[GridDimension] # Simulation grid dimensions __evaluators: Sequence[ Evaluator ] # Evaluators used to process the investigated object sample state def __init__( self, argument_tuple: Tuple[MO, Sequence[GridDimension], Sequence[Evaluator]], index: int, catch_exceptions: bool = True, ) -> None: """ Args: argument_tuple: Object to be investigated during the simulation runtime. Dimensions over which the simulation will iterate. Evaluators used to process the investigated object sample state. index (int): Global index of the actor. catch_exceptions (bool, optional): Catch exceptions during run. Enabled by default. """ investigated_object = argument_tuple[0] grid = argument_tuple[1] evaluators = argument_tuple[2] self.index = index self.catch_exceptions = catch_exceptions self.__investigated_object = investigated_object self.__grid = grid self.__evaluators = evaluators self.__stage_identifiers = self.stage_identifiers() self.__stage_executors = self.stage_executors() self.__num_stages = len(self.__stage_executors) @property def _investigated_object(self) -> MO: """State of the Investigated Object. Returns: Mo: Investigated object. """ return self.__investigated_object # pragma: no cover
[docs] def run(self, program: List[Tuple[int, ...]]) -> ActorRunResult: """Run the simulation actor. Args: program (List[Tuple[int, ...]]): A list of simulation grid section indices for which to collect samples. Returns: A list of generated :class:`MonteCarloSample`s. Contains the same number of entries as `program`. """ result = ActorRunResult() # Catch any exception during actor running try: # Intially, re-configure the full grid recent_section_indices = np.array(program[0], dtype=int) for d, i in enumerate(recent_section_indices): self.__grid[d].configure_point(i) # Run through the program steps for section_indices in program: # Detect the grid dimensions where sections changed, i.e. which require re-configuration section_index_array = np.asarray(section_indices, dtype=int) reconfigured_dimensions = np.argwhere( section_index_array != recent_section_indices ).flatten() # Reconfigure the dimensions # Not that for the first grid_section this has already been done for d in reconfigured_dimensions: self.__grid[d].configure_point(section_index_array[d]) # Detect the first and last impacted simulation stage depending on the reconfigured dimensions first_impact = self.__num_stages last_impact = 0 for d in reconfigured_dimensions: grid_dimension = self.__grid[d] if grid_dimension.first_impact is None: first_impact = 0 elif grid_dimension.first_impact in self.__stage_identifiers: first_impact = min( first_impact, self.__stage_identifiers.index(grid_dimension.first_impact), ) if grid_dimension.last_impact is None: last_impact = self.__num_stages - 1 elif grid_dimension.last_impact in self.__stage_identifiers: last_impact = max( last_impact, self.__stage_identifiers.index(grid_dimension.last_impact) ) if first_impact >= self.__num_stages: first_impact = 0 if last_impact <= 0: last_impact = self.__num_stages - 1 # Execute impacted simulation stages # Note that for the first grid_section all stages are executed for stage in self.__stage_executors[first_impact : 1 + last_impact]: stage() # Collect evaluation artifacts artifacts = [evaluator.evaluate().artifact() for evaluator in self.__evaluators] # Save the samples result.samples.append(MonteCarloSample(section_indices, 0, artifacts)) # Update the recent section for the next iteration recent_section_indices = section_index_array except Exception as e: if self.catch_exceptions: result.message = str(e) else: raise e # pragma: no cover return result
[docs] @staticmethod @abstractmethod def stage_identifiers() -> List[str]: """List of simulation stage identifiers. Simulations stages will be executed in the order specified here. Returns: List of function identifiers for simulation stage execution routines. """ ... # pragma: no cover
[docs] @abstractmethod def stage_executors(self) -> List[Callable]: """List of simulation stage execution callbacks. Simulations stages will be executed in the order specified here. Returns: List of function callbacks for simulation stage execution routines. """ ... # pragma: no cover
[docs] class MonteCarloResult(object): """Result of a Monte Carlo simulation.""" __grid: Sequence[GridDimension] __evaluators: Sequence[Evaluator] __performance_time: float # Absolute time required to compute the simulation __results: Sequence[EvaluationResult] def __init__( self, grid: Sequence[GridDimension], evaluators: Sequence[Evaluator], sample_grid: SampleGrid, performance_time: float, ) -> None: """ Args: grid (Sequence[GridDimension]): Dimensions over which the simulation has swept. evaluators (Sequence[Evaluator]): Evaluators used to evaluated the simulation artifacts. sample_grid (SampleGrid): Grid containing evaluation artifacts collected over the grid iterations. performance_time (float): Time required to compute the simulation. Raises: ValueError: If the dimensions of `samples` do not match the supplied sweeping dimensions and evaluators. """ self.__grid = grid self.__evaluators = evaluators self.__performance_time = performance_time self.__results = [] for evaluator_idx, evaluator in enumerate(evaluators): # Collect artifacts for the respective evaluator evaluator_artifacts = np.empty(tuple([d.num_sample_points for d in grid]), dtype=object) section: GridSection for section in sample_grid: evaluator_artifacts[section.coordinates] = [ sample.artifacts[evaluator_idx] for sample in section.samples ] self.__results.append(evaluator.generate_result(grid, evaluator_artifacts)) @property def performance_time(self) -> float: """Simulation runtime. Returns: Simulation runtime in seconds. """ return self.__performance_time
[docs] def plot(self) -> List[plt.FigureBase]: """Plot evaluation figures for all contained evaluator artifacts. Returns: List of handles to all created Matplotlib figures. """ return [result.plot() for result in self.__results]
[docs] def save_to_matlab(self, file: str) -> None: """Save simulation results to a matlab file. Args: file (str): File location to which the results should be saved. """ mat_dict = { "dimensions": [d.title for d in self.__grid], "evaluators": [evaluator.abbreviation for evaluator in self.__evaluators], "performance_time": self.__performance_time, } # Append evaluation array representions for r, result in enumerate(self.__results): mat_dict[f"result_{r}"] = result.to_array() # Append evaluation array representions for r, result in enumerate(self.__results): mat_dict[f"result_{r}"] = result.to_array() for dimension in self.__grid: mat_dict[dimension.title] = dimension.sample_points # Save results in matlab file savemat(file, mat_dict)
[docs] def print(self, console: Console | None = None) -> None: """Print a text representation of the simulation result. Args: console (Console, optional): Rich console to print to. If not provided, a new one will be initialized. """ _console = Console() if console is None else console for result in self.evaluation_results: result.print(_console)
@property def evaluation_results(self) -> Sequence[EvaluationResult]: """Access individual evaluation results. Returns: List of evaluation results. """ return self.__results
[docs] class SamplePoint(object): """Sample point of a single grid dimension. A single :class:`.GridDimension` holds a sequence of sample points accesible by the :attr:`sample_points<.GridDimension.sample_points>` property. During simulation runtime, the simulation will dynamically reconfigure the scenario selecting a single sample point out of each :class:`.GridDimension` per generated simulation sample. """ __value: Any __title: str | None def __init__(self, value: Any, title: str | None = None) -> None: """ Args: value (Any): Sample point value with which to configure the grid dimension. title (str, optional): String representation of this sample point. If not specified, the simulation will attempt to infer an adequate representation. """ self.__value = value self.__title = title @property def value(self) -> Any: """Sample point value with which to configure the grid dimension""" return self.__value @property def title(self) -> str: """String representation of this sample point""" if self.__title is not None: return self.__title if type(self.__value).__str__ is not object.__str__: return str(self.__value) if isinstance(self.__value, float): return f"{self.__value:.2g}" if isinstance(self.__value, int): return f"{self.__value}" # Return the values class name if its not representable otherwise return self.__value.__class__.__name__
[docs] class GridDimension(object): """Single axis within the simulation grid. A grid dimension represents a single simulation parameter that is to be varied during simulation runtime to observe its effects on the evaluated performance indicators. The values the represented parameter is configured to are :class:`SamplePoints<.SamplePoint>`. """ __considered_objects: Tuple[Any, ...] __dimension: str __sample_points: List[SamplePoint] __title: str | None __setter_lambdas: Tuple[Callable, ...] __plot_scale: str __tick_format: ValueType __first_impact: str | None __last_impact: str | None def __init__( self, considered_objects: Any | Tuple[Any, ...], dimension: str, sample_points: Sequence[SamplePoint | Any], title: str | None = None, plot_scale: str | None = None, tick_format: Optional[ValueType] = None, ) -> None: """ Args: considered_objects (Union[Any, Tuple[Any, ...]]): The considered objects of this grid section. dimension (str): Path to the attribute. sample_points (List[Any]): Sections the grid is sampled at. title (str, optional): Title of this dimension. If not specified, the attribute string is assumed. plot_scale (str, optional): Scale of the axis within plots. tick_format (ValueType, optional): Format of the tick labels. Linear by default. Raises: ValueError: If the selected `dimension` does not exist within the `considered_object`. """ _considered_objects = ( considered_objects if isinstance(considered_objects, tuple) else (considered_objects,) ) self.__considered_objects = tuple() property_path = dimension.split(".") object_path = property_path[:-1] property_name = property_path[-1] # Infer plot scale of the x-axis if plot_scale is None: if isinstance(sample_points, LogarithmicSequence): self.plot_scale = "log" else: self.plot_scale = "linear" else: self.plot_scale = plot_scale # Infer tick format of the x-axis if tick_format is None: if isinstance(sample_points, LogarithmicSequence): self.tick_format = ValueType.DB else: self.tick_format = ValueType.LIN else: self.tick_format = tick_format self.__setter_lambdas = tuple() self.__dimension = dimension self.__sample_points = [ s if isinstance(s, SamplePoint) else SamplePoint(s) for s in sample_points ] self.__title = title self.__first_impact = None self.__last_impact = None for considered_object in _considered_objects: # Make sure the dimension exists try: dimension_object = reduce( lambda obj, attr: getattr(obj, attr), object_path, considered_object ) dimension_class = type(dimension_object) dimension_property: RegisteredDimension = getattr(dimension_class, property_name) except AttributeError: raise ValueError( "Dimension '" + dimension + "' does not exist within the investigated object" ) if len(self.__sample_points) < 1: raise ValueError("A simulation grid dimension must have at least one sample point") # Update impacts if the dimension is registered as a PyMonte simulation dimension if RegisteredDimension.is_registered(dimension_property): first_impact = dimension_property.first_impact last_impact = dimension_property.last_impact if self.__first_impact and first_impact != self.__first_impact: raise ValueError( "Diverging impacts on multi-object grid dimension initialization" ) if self.__last_impact and last_impact != self.__last_impact: raise ValueError( "Diverging impacts on multi-object grid dimension initialization" ) self.__first_impact = first_impact self.__last_impact = last_impact # Updated the depicted title if the dimension offers an option and it wasn't exactly specified if title is None and dimension_property.title is not None: self.__title = dimension_property.title self.__considered_objects += (considered_object,) self.__setter_lambdas += (self.__create_setter_lambda(considered_object, dimension),) @property def considered_objects(self) -> Tuple[Any, ...]: """Considered objects of this grid section.""" return self.__considered_objects @property def dimension(self) -> str: """Dimension property name.""" return self.__dimension @property def sample_points(self) -> List[SamplePoint]: """Points at which this grid dimension is sampled.""" return self.__sample_points @property def num_sample_points(self) -> int: """Number of dimension sample points. Returns: int: Number of sample points. """ return len(self.__sample_points)
[docs] def configure_point(self, point_index: int) -> None: """Configure a specific sample point. Args: point_index (int): Index of the sample point to configure. Raises: ValueError: For invalid indexes. """ if point_index < 0 or point_index >= len(self.__sample_points): raise ValueError( f"Index {point_index} is out of the range for grid dimension '{self.title}'" ) for setter_lambda in self.__setter_lambdas: setter_lambda(self.__sample_points[point_index].value)
@property def first_impact(self) -> str | None: """Index of the first impacted simulation pipeline stage. Returns: Pipeline stage index. `None`, if the stage is unknown. """ return self.__first_impact @property def last_impact(self) -> str | None: """Index of the last impacted simulation pipeline stage. Returns: Pipeline stage index. `None`, if the stage is unknown. """ return self.__last_impact @property def title(self) -> str: """Title of the dimension. Returns: The title string. """ return self.__dimension if self.__title is None else self.__title @title.setter def title(self, value: str) -> None: if value is None or len(value) == 0: self.__title = None else: self.__title = value @property def plot_scale(self) -> str: """Scale of the scalar evaluation plot. Refer to the `Matplotlib`_ documentation for a list of a accepted values. Returns: str: The scale identifier string. .. _Matplotlib: https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.set_yscale.html """ return self.__plot_scale @plot_scale.setter def plot_scale(self, value: str) -> None: self.__plot_scale = value @property def tick_format(self) -> ValueType: """Axis tick format of the scalar evaluation plot. Returns: The tick format. """ return self.__tick_format @tick_format.setter def tick_format(self, value: ValueType) -> None: self.__tick_format = value @staticmethod def __create_setter_lambda(considered_object: Any, dimension: str) -> Callable: """Generate a setter lambda callback for a selected grid dimension. Args: considered_object (Any): The considered object root. dimension (str): String representation of dimension location relative to the investigated object. Returns: Callable: The setter lambda. """ stages = dimension.split(".") object_reference = reduce( lambda obj, attr: getattr(obj, attr), stages[:-1], considered_object ) # Return a lambda to the function if the reference is callable function_reference = getattr(object_reference, stages[-1]) if callable(function_reference): return lambda args: function_reference(args) # Return a setting lambda if the reference is not callable # Implicitly we assume that every non-callable reference is an attribute return lambda args: setattr(object_reference, stages[-1], args)
[docs] class RegisteredDimension(property): """Register a class property getter as a PyMonte simulation dimension. Registered properties may specify their simulation stage impacts and therefore significantly increase simulation runtime in cases where computationally demanding section re-calculations can be reduced. """ __first_impact: str | None __last_impact: str | None __title: str | None def __init__( self, _property: property, first_impact: str | None = None, last_impact: str | None = None, title: str | None = None, ) -> None: """ Args: first_impact (str, optional): Name of the first simulation stage within the simulation pipeline which is impacted by manipulating this property. If not specified, the initial stage is assumed. last_impact (str, optional): Name of the last simulation stage within the simulation pipeline which is impacted by manipulating this property. If not specified, the final stage is assumed. title (str, optional): Displayed title of the dimension. If not specified, the dimension's name will be assumed. """ self.__first_impact = first_impact self.__last_impact = last_impact self.__title = title property.__init__( self, getattr(_property, "fget", None), getattr(_property, "fset", None), getattr(_property, "fdel", None), getattr(_property, "doc", None), )
[docs] @classmethod def is_registered(cls, object: Any) -> bool: """Check if any object is a registered PyMonte simulation dimension. Args: object (Any): The object in question. Returns: A boolean indicator. """ return isinstance(object, cls)
@property def first_impact(self) -> str | None: """First impact of the dimension within the simulation pipeline.""" return self.__first_impact @property def last_impact(self) -> str | None: """Last impact of the dimension within the simulation pipeline.""" return self.__last_impact @property def title(self) -> str | None: """Displayed title of the dimension.""" return self.__title
[docs] def getter(self, fget: Callable[[Any], Any]) -> RegisteredDimension: updated_property = property.getter(self, fget) return RegisteredDimension( updated_property, self.first_impact, self.last_impact, self.title )
[docs] def setter(self, fset: Callable[[Any, Any], None]) -> RegisteredDimension: updated_property = property(self.fget, fset, self.fdel, self.__doc__) return RegisteredDimension( updated_property, self.first_impact, self.last_impact, self.title )
[docs] def deleter(self, fdel: Callable[[Any], None]) -> RegisteredDimension: updated_property = property.deleter(self, fdel) return RegisteredDimension( updated_property, self.first_impact, self.last_impact, self.title )
[docs] def register(*args, **kwargs) -> Callable[[Any], RegisteredDimension]: """Shorthand to register a property as a MonteCarlo dimension. Args: _property (property): The property to be registered. """ return lambda _property: RegisteredDimension(_property, *args, **kwargs)
[docs] class MonteCarlo(Generic[MO]): """Grid of parameters over which to iterate the simulation.""" # Interval between result logs in seconds __progress_log_interval: float # Maximum number of samples per grid element __num_samples: int # Minimum number of samples per grid element __min_num_samples: int # Number of dedicated actors spawned during simulation __num_actors: int __investigated_object: MO # The object to be investigated # Simulation grid dimensions which make up the grid __dimensions: List[GridDimension] # Evaluators used to process the investigated object sample state __evaluators: List[Evaluator] __console: Console # Console the simulation writes to # Printing behaviour of the simulation during runtime __console_mode: ConsoleMode # Number of samples per section block __section_block_size: int | None # Cache simulation results in a database during runtime __database_caching: bool # Number of CPUs reserved for a single actor __cpus_per_actor: int runtime_env: bool # Catch exceptions occuring during simulation runtime catch_exceptions: bool def __init__( self, investigated_object: MO, num_samples: int, evaluators: Sequence[Evaluator] | None = None, min_num_samples: int = -1, num_actors: int | None = None, console: Console | None = None, console_mode: ConsoleMode = ConsoleMode.INTERACTIVE, section_block_size: Optional[int] = None, ray_address: str | None = None, cpus_per_actor: int = 1, runtime_env: bool = False, catch_exceptions: bool = True, progress_log_interval: float = 1.0, ) -> None: """ Args: investigated_object (MO): Object to be investigated during the simulation runtime. num_samples (int): Number of generated samples per grid element. evaluators (Set[MonteCarloEvaluators[MO]]): Evaluators used to process the investigated object sample state. min_num_samples (int, optional): Minimum number of generated samples per grid element. num_actors (int, optional): Number of dedicated actors spawned during simulation. By default, the number of actors will be the number of available CPU cores. console (Console, optional): Console the simulation writes to. console_mode (ConsoleMode, optional): The printing behaviour of the simulation during runtime. section_block_size (int, optional): Number of samples per section block. By default, the size of the simulation grid is selected. ray_address (str, optional): The address of the ray head node. If None is provided, the head node will be launched in this machine. cpus_per_actor (int, optional): Number of CPU cores reserved per actor. One by default. runtime_env (bool, optional): Create a virtual environment on each host. Disabled by default. catch_exceptions (bool, optional): Catch exceptions occuring during simulation runtime. Enabled by default. progress_log_interval (float, optional): Interval between result logs in seconds. 1 second by default. """ self.runtime_env = runtime_env # Initialize ray if it hasn't been initialized yet. Required to query ideal number of actors if not ray.is_initialized(): runtime_env_info = {"py_modules": self._py_modules(), "pip": self._pip_packages()} with catch_warnings(): simplefilter("ignore") ray.init( address=ray_address, runtime_env=runtime_env_info if self.runtime_env else None, logging_level=logging.ERROR, ) self.__dimensions = [] self.__investigated_object = investigated_object self.__evaluators = [] if evaluators is None else list(evaluators) self.num_samples = num_samples self.min_num_samples = min_num_samples if min_num_samples >= 0 else int(0.5 * num_samples) self.__console = Console() if console is None else console self.__console_mode = console_mode self.section_block_size = section_block_size self.cpus_per_actor = cpus_per_actor self.num_actors = num_actors self.catch_exceptions = catch_exceptions self.__progress_log_interval = progress_log_interval
[docs] def simulate(self, actor: Type[MonteCarloActor]) -> MonteCarloResult: """Launch the Monte Carlo simulation. Args: actor (Type[MonteCarloActor]): The actor from which to generate the simulation samples. Returns: np.ndarray: Generated samples. """ # Generate start timestamp start_time = perf_counter() # Print meta-information and greeting if self.__console_mode != ConsoleMode.SILENT: self.console.log( f"Launched simulation campaign with {self.num_actors} dedicated actors" ) # Sort dimensions after impact in descending order def sort(dimension: GridDimension) -> int: if dimension.first_impact not in actor.stage_identifiers(): return 0 return actor.stage_identifiers().index(dimension.first_impact) self.__dimensions.sort(key=sort) max_num_samples = self.num_samples dimension_str = f"{max_num_samples}" for dimension in self.__dimensions: max_num_samples *= dimension.num_sample_points dimension_str += f" x {dimension.num_sample_points}" if self.__console_mode != ConsoleMode.SILENT: self.console.log( f"Generating a maximum of {max_num_samples} = " + dimension_str + f" samples inspected by {len(self.__evaluators)} evaluators\n" ) # Render simulation grid table if self.__console_mode != ConsoleMode.SILENT and len(self.__dimensions) > 0: dimension_table = Table(title="Simulation Grid", title_justify="left") dimension_table.add_column("Dimension", style="cyan") dimension_table.add_column("Sections", style="green") for dimension in self.__dimensions: section_str = "" for sample_point in dimension.sample_points: section_str += sample_point.title + " " dimension_table.add_row(dimension.title, section_str) self.console.print(dimension_table) self.console.print() # Initialize the sample grid to store the results sample_grid = SampleGrid(self.__dimensions, self.__evaluators) # Launch actors and queue the first tasks with self.console.status("Launching Actor Pool...", spinner="dots") if self.__console_mode == ConsoleMode.INTERACTIVE else nullcontext(): # type: ignore # Generate the actor pool actor_pool = ActorPool([actor.options(num_cpus=self.cpus_per_actor).remote((self.__investigated_object, self.__dimensions, self.__evaluators), a, self.catch_exceptions) for a in range(self.num_actors)]) # type: ignore # Generate section sample containers and meta-information grid_task_count = np.zeros( [dimension.num_sample_points for dimension in self.__dimensions], dtype=int ) grid_active_mask = np.ones( [dimension.num_sample_points for dimension in self.__dimensions], dtype=bool ) # Submit initial actor tasks # 2 # A little overhead in task submission might speed things up? Not clear atm. task_overhead = 0 for _ in range(self.num_actors + task_overhead): _ = self.__queue_next(actor_pool, sample_grid, grid_active_mask, grid_task_count) # Initialize progress bar progress = Progress( SpinnerColumn(), *Progress.get_default_columns(), TimeElapsedColumn(), transient=True ) task1 = progress.add_task("Computing", total=max_num_samples) last_progress_plot_time = 0.0 num_result_rows = min(10, self.section_block_size) # Display results in a live table status_group = Group("", progress) with Live(status_group, console=self.console) if self.__console_mode == ConsoleMode.INTERACTIVE else nullcontext(): # type: ignore # Keep executing until all samples are computed while actor_pool.has_next(): # Receive samples from the queue samples = self.__receive_next( actor_pool, sample_grid, grid_active_mask, grid_task_count ) # Queue next task and compute retrieve progress self.__queue_next(actor_pool, sample_grid, grid_active_mask, grid_task_count) # Update result log if enough time has passed progress_plot_time = perf_counter() if progress_plot_time - last_progress_plot_time > self.__progress_log_interval: last_progress_plot_time = progress_plot_time # Compute absolute progress absolute_progress = 0 for section in sample_grid: absolute_progress += section.num_samples # Update progress bar visualization if self.__console_mode == ConsoleMode.INTERACTIVE: progress.update(task1, completed=absolute_progress) result_rows: List[List[str]] = [] for sample in samples[:num_result_rows]: results_row: List[str] = [] for dimension, section_idx in zip( self.__dimensions, sample.grid_section ): results_row.append(dimension.sample_points[section_idx].title) results_row.append(str(sample_grid[sample.grid_section].num_samples)) for artifact in sample.artifacts: results_row.append(str(artifact)) result_rows.append(results_row) # Render results table results_table = Table(min_width=self.console.measure(progress).minimum) for dimension in self.__dimensions: results_table.add_column(dimension.title, style="cyan") results_table.add_column("#", style="blue") for evaluator in self.__evaluators: results_table.add_column(evaluator.abbreviation, style="green") for result_row in result_rows: results_table.add_row(*result_row) status_group.renderables[0] = results_table elif self.__console_mode == ConsoleMode.LINEAR: self.console.log(f"Progress: {100*absolute_progress/max_num_samples:.3f}") # Abort exectuion loop prematurely if all sections are flagged inactive # Some results might be lost, but who cares? Speed! Speed! Speed! # if absolute_progress >= self.max_num_samples: # break # Measure elapsed time stop_time = perf_counter() performance_time = stop_time - start_time # Print finish notifier if self.__console_mode != ConsoleMode.SILENT: self.console.print() self.console.log(f"Simulation finished after {performance_time:.2f} seconds") # Compute the result result = MonteCarloResult( self.__dimensions, self.__evaluators, sample_grid, performance_time ) # Print results if self.__console_mode != ConsoleMode.SILENT: self.console.print("") result.print(self.console) return result
def __receive_next( self, pool: ActorPool, grid: SampleGrid, grid_active_mask: np.ndarray, grid_task_count: np.ndarray, ) -> List[MonteCarloSample]: # Retrieve result from pool runResult: ActorRunResult = pool.get_next_unordered(timeout=None) # Display run message if the result if runResult.message: self.console.log(runResult.message) # Save result for sample in runResult.samples: # Retrieve the respective grid section and add sample grid_section: GridSection = grid[sample.grid_section] grid_section.add_samples(sample, self.__evaluators) # Update task counter task_count = max(0, grid_task_count[grid_section.coordinates] - 1) grid_task_count[grid_section.coordinates] = task_count # Check for stopping criteria if grid_active_mask[grid_section.coordinates]: # Abort section if the number of samples is expected to be met if grid_section.num_samples + task_count >= self.num_samples: grid_active_mask[grid_section.coordinates] = False # pragma no cover # Abort section if the confidence threshold has been reached elif grid_section.num_samples >= self.min_num_samples: grid_active_mask[grid_section.coordinates] = not grid_section.confidence_status( self.__evaluators ) return runResult.samples def __queue_next( self, pool: ActorPool, grid: SampleGrid, grid_active_mask: np.ndarray, grid_task_count: np.ndarray, ): # Query active sections and respective task counts active_sections = np.argwhere(grid_active_mask) active_sections_task_count = grid_task_count[grid_active_mask] program: List[Tuple[int, ...]] = [] for section_coordinates, task_count in zip( active_sections[: self.section_block_size, :], active_sections_task_count.flat[: self.section_block_size], ): section_coordinates = tuple(section_coordinates) program.append(section_coordinates) task_count += 1 grid_task_count[section_coordinates] = task_count if task_count + grid[section_coordinates].num_samples >= self.num_samples: grid_active_mask[section_coordinates] = False break # ToDo: Enhance routine to always submit section_block_size amount of indices per program if len(program) > 0: pool.submit(lambda a, p: a.run.remote(p), program) @property def investigated_object(self) -> Any: """The object to be investigated during the simulation runtime.""" return self.__investigated_object
[docs] def new_dimension( self, dimension: str, sample_points: List[Any], *args: Any, **kwargs ) -> GridDimension: """Add a dimension to the simulation grid. Must be a property of the investigated object. Args: dimension (str): String representation of dimension location relative to the investigated object. sample_points (List[Any]): List points at which the dimension will be sampled into a grid. The type of points must be identical to the grid arguments / type. *args (Tuple[Any], optional): References to the object the dimension belongs to. Resolved to the investigated object by default, but may be an attribute or sub-attribute of the investigated object. **kwargs: Additional initialization arguments passed to :class:`GridDimension`. Returns: The newly created dimension object. """ considered_objects = (self.__investigated_object,) if len(args) < 1 else args grid_dimension = GridDimension(considered_objects, dimension, sample_points, **kwargs) self.add_dimension(grid_dimension) return grid_dimension
[docs] def add_dimension(self, dimension: GridDimension) -> None: """Add a new dimension to the simulation grid. Args: dimension: Dimension to be added. Raises: ValueError: If the `dimension` already exists within the grid. """ if dimension in self.__dimensions: raise ValueError("Dimension instance already registered within the grid") self.__dimensions.append(dimension)
[docs] def remove_dimension(self, dimension: GridDimension) -> None: """Remove an existing dimension from the simulation grid. Args: dimension (GridDimension): The dimension to be removed. Raises: ValueError: If the `dimension` does not exist. """ if dimension not in self.__dimensions: raise ValueError("Dimension not registered within the current simulation grid") self.__dimensions.remove(dimension)
@property def dimensions(self) -> List[GridDimension]: """Simulation grid dimensions which make up the grid.""" return self.__dimensions.copy()
[docs] def add_evaluator(self, evaluator: Evaluator) -> None: """Add new evaluator to the Monte Carlo simulation. Args: evaluator (Evaluator[MO]): The evaluator to be added. """ self.__evaluators.append(evaluator)
@property def evaluators(self) -> List[Evaluator]: """Evaluators used to process the investigated object sample state.""" return self.__evaluators.copy() @property def num_samples(self) -> int: """Number of samples per simulation grid element. Returns: int: Number of samples Raises: ValueError: If number of samples is smaller than one. """ return self.__num_samples @num_samples.setter def num_samples(self, value: int) -> None: """Set number of samples per simulation grid element.""" if value < 1: raise ValueError("Number of samples must be greater than zero") self.__num_samples = value @property def min_num_samples(self) -> int: """Minimum number of samples per simulation grid element. Returns: int: Number of samples Raises: ValueError: If number of samples is smaller than zero. """ return self.__min_num_samples @min_num_samples.setter def min_num_samples(self, value: int) -> None: """Set minimum number of samples per simulation grid element.""" if value < 0.0: raise ValueError("Number of samples must be greater or equal to zero") self.__min_num_samples = value @property def max_num_samples(self) -> int: """Maximum number of samples over the whole simulation. Returns: int: Number of samples. """ num_samples = self.num_samples for dimension in self.__dimensions: num_samples *= dimension.num_sample_points return num_samples @property def num_actors(self) -> int: """Number of dedicated actors spawned during simulation runs. Returns: int: Number of actors. Raises: ValueError: If the number of actors is smaller than zero. """ # Return the number of available CPU cores as default value if self.__num_actors is not None: return self.__num_actors # Otherwise, the number of actors depends on the number of available CPUs and # the cpu requirements per actor return max(1, int(ray.available_resources().get("CPU", 1) / self.cpus_per_actor)) @num_actors.setter def num_actors(self, value: Optional[int]) -> None: """Set number of dedicated actors spawned during simulation runs.""" if value is None: self.__num_actors = None elif value < 1: raise ValueError("Number of actors must be greater or equal to one") else: self.__num_actors = value @property def console(self) -> Console: """Console the Simulation writes to. Returns: Console: Handle to the console. """ return self.__console @console.setter def console(self, value: Console) -> None: self.__console = value @property def section_block_size(self) -> int: """Number of generated samples per section block. Returns: int: Number of samples per block. Raises: ValueError: If the block size is smaller than one. """ if self.__section_block_size is not None: return self.__section_block_size size = 1 for dimension in self.__dimensions: size *= dimension.num_sample_points return size @section_block_size.setter def section_block_size(self, value: Optional[int]) -> None: if value is not None and value < 1: raise ValueError("Section block size must be greater or equal to one") self.__section_block_size = value @property def cpus_per_actor(self) -> int: """Number of CPU cores reserved for each actor. Returns: Number of cores. Raises: ValueError: If the number of cores is smaller than one """ return self.__cpus_per_actor @cpus_per_actor.setter def cpus_per_actor(self, num: int) -> None: if num < 1: raise ValueError("Number if CPU cores per actor must be greater or equal to one") self.__cpus_per_actor = num @property def console_mode(self) -> ConsoleMode: """Console mode during simulation runtime. Returms: The current console mode. """ return self.__console_mode @console_mode.setter def console_mode(self, value: Union[ConsoleMode, str]) -> None: # Convert string arguments to iterable if isinstance(value, str): value = ConsoleMode[value] self.__console_mode = value @staticmethod def _py_modules() -> List[str]: """List of python modules required by remote workers. In order to deploy Ray to computing clusters, dependencies listed here will be installed on remote nodes. Returns: List of module names. """ return [path.join(path.dirname(path.realpath(__file__)), "..")] @staticmethod def _pip_packages() -> List[str]: """List of python packages required by remote workers. In order to deploy Ray to computing clusters, dependencies listed here will be installed on remote nodes. Returns: List of package names. """ return ["ray", "numpy", "scipy", "matplotlib", "rich"]