# -*- coding: utf-8 -*-
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict
from typing_extensions import override
import numpy as np
from scipy.optimize import bisect
from scipy.stats import norm
from hermespy.core import DeserializationProcess, Serializable, SerializationProcess, RandomNode
__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"
[docs]
class ConsistentVariable(ABC, Serializable):
"""Base class for spatially consistent random variables."""
_DEFAULT_OFFSET = 0
__shape: Tuple[int, ...]
__offset: int
__size: int
def __init__(self, shape: Tuple[int, ...], offset: int = _DEFAULT_OFFSET) -> None:
"""
Args:
shape:
Shape of the output array
offset:
Offset of the variable within its generator.
Defaults to zero.
Adding a variable to a generator will change the offset.
"""
# Initialize attributes
self.__shape = (1,) if shape is None else shape
self.__size = int(np.prod(self.shape))
self.__offset = offset
@property
def shape(self) -> Tuple[int, ...]:
"""Shape of the output array."""
return self.__shape
@property
def size(self) -> int:
"""Number of scalar samples to be generated."""
return self.__size
@property
def offset(self) -> int:
"""Offset of the variable in the generated samples.
Adding a variable to a generator will change the offset.
"""
return self.__offset
@offset.setter
def offset(self, value: int) -> None:
self.__offset = value
[docs]
def sample(self, sample: ConsistentSample) -> np.ndarray:
"""Sample the variable given a sample of the realization.
Args:
sample: Sample of the realization.
Returns: Numpy array of samples of dimensions matching the variable size.
"""
# Fetch the scalar samples
scalar_samples = sample.fetch_scalars(self.offset, self.size)
# Reshape to match the variable size
reshaped_samples = scalar_samples.reshape(self.shape)
return reshaped_samples
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_array(np.array(self.__shape), "shape")
process.serialize_integer(self.__offset, "offset")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> ConsistentVariable:
return cls(
tuple(process.deserialize_array("shape", np.int64)),
process.deserialize_integer("offset", cls._DEFAULT_OFFSET),
)
[docs]
class ConsistentSample(object):
"""Sample of a consistent realization.
Generated by calling :meth:`ConsistentRealization.sample`.
"""
[docs]
@abstractmethod
def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray:
"""Fetch the scalar samples.
Args:
offset: Offset of the scalar samples within this realization.
num_scalars: Number of scalar samples to fetch.
Returns: Numpy vector of scalar samples.
"""
... # pragma: no cover
[docs]
class ConsistentRealization(ABC, Serializable):
"""Base class for a realization of a consistent random generator."""
[docs]
@abstractmethod
def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> ConsistentSample:
"""Sample the realization given two locations in euclidean space.
Args:
position_a: First position in euclidean space.
position_b: Second position in euclidean space.
Returns: Sample of the realization.
"""
... # pragma: no cover
[docs]
class DualConsistentSample(ConsistentSample):
"""Sample of a dual consistent realization.
Generated by calling :meth:`DualConsistentRealization.sample`.
"""
def __init__(self, scalar_samples: np.ndarray) -> None:
"""Initialize a dual consistent sample."""
self.__scalar_samples = scalar_samples
[docs]
@override
def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray:
return self.__scalar_samples[offset : offset + num_scalars]
[docs]
class DualConsistentRealization(ConsistentRealization):
"""Realization of a set of dual consistent random variables."""
__frequencies: np.ndarray
__phases: np.ndarray
def __init__(self, frequencies: np.ndarray, phases: np.ndarray) -> None:
"""
Args:
frequencies: Frequencies of the spatially consistent process.
phases: Phases of the spatially consistent process.
"""
self.__frequencies = frequencies
self.__phases = phases
@property
def frequencies(self) -> np.ndarray:
"""Frequencies of the spatially consistent process."""
return self.__frequencies
@property
def phases(self) -> np.ndarray:
"""Phases of the spatially consistent process."""
return self.__phases
[docs]
@override
def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> DualConsistentSample:
# Eq. 3
scalar_samples: np.ndarray = (2 / self.__frequencies.shape[2]) ** 0.5 * np.sum(
np.cos(
np.tensordot(position_a, self.__frequencies[..., 0], (0, 0))
+ np.tensordot(position_b, self.__frequencies[..., 1], (0, 0))
+ self.__phases
),
axis=-1,
keepdims=False,
)
return DualConsistentSample(scalar_samples)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_array(self.__frequencies, "frequencies")
process.serialize_array(self.__phases, "phases")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> DualConsistentRealization:
return DualConsistentRealization(
process.deserialize_array("frequencies", np.float64),
process.deserialize_array("phases", np.float64),
)
[docs]
class StaticConsistentSample(ConsistentSample):
"""Consistent sample that is invariant in space."""
__scalar_samples: np.ndarray
def __init__(self, scalar_samples: np.ndarray) -> None:
"""
Args:
scalar_samples:
Scalar samples of the realization.
"""
self.__scalar_samples = scalar_samples
[docs]
@override
def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray:
return self.__scalar_samples[offset : offset + num_scalars]
[docs]
class StaticConsistentRealization(ConsistentRealization):
"""Consistent realization that is immutable in space."""
__scalar_samples: np.ndarray
def __init__(self, scalar_samples: np.ndarray) -> None:
"""
Args:
scalar_samples: Scalar samples of the realization.
"""
self.__scalar_samples = scalar_samples.flatten()
[docs]
@override
def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> DualConsistentSample:
return DualConsistentSample(self.__scalar_samples)
[docs]
@override
def serialize(self, process: SerializationProcess) -> None:
process.serialize_array(self.__scalar_samples, "scalar_samples")
[docs]
@classmethod
@override
def Deserialize(cls, process: DeserializationProcess) -> StaticConsistentRealization:
return StaticConsistentRealization(process.deserialize_array("scalar_samples", np.float64))
[docs]
class ConsistentGenerator(object):
"""Generator of consistent random variables."""
__rng: np.random.Generator | RandomNode
__offset: int
__variables: List[ConsistentVariable]
__cdf_cache: Dict[Tuple[float, int], np.ndarray] = dict()
@staticmethod
def __radial_velocity_cdf(fr: float, a: float, U: float) -> float:
"""Implementation of the probability density function in Eq. 10
Args:
fr: Radial velocity.
a: Correlation distance.
U: Expected CDF value.
"""
return (
2 / np.pi * np.arctan(2 * np.pi * fr / a)
- 4 * a * fr / (4 * np.pi**2 * fr**2 + a**2)
- U
)
def __init__(self, rng: np.random.Generator | RandomNode) -> None:
"""
Args:
rng: Random number generator used to initialize this random variable.
"""
# Initialize attributes
self.__rng = rng
self.__offset = 0
self.__variables = []
[docs]
def gaussian(self, shape: Tuple[int, ...] | None = None) -> ConsistentGaussian:
"""Create a dual consistent Gaussian random variable.
Args:
shape:
Shape of the output array.
If not specified, the variable will be scalar.
Returns: Dual consistent Gaussian random variable.
"""
_shape = (1,) if shape is None else shape
variable = ConsistentGaussian(_shape)
self.add_variable(variable)
return variable
[docs]
def boolean(self, shape: Tuple[int, ...] | None = None) -> ConsistentBoolean:
"""Create a dual consistent boolean random variable.
Args:
shape:
Shape of the output array.
If not specified, the variable will be scalar.
Returns: Dual consistent boolean random variable.
"""
_shape = (1,) if shape is None else shape
variable = ConsistentBoolean(_shape)
self.add_variable(variable)
return variable
[docs]
def add_variable(self, variable: ConsistentVariable) -> int:
"""Add a dual consistent random variable to the generator.
Returns: The variable's offset in the generated samples.
"""
if variable in self.__variables:
return variable.offset
variable_offset = self.__offset
self.__offset += variable.size
variable.offset = variable_offset
self.__variables.append(variable)
return variable_offset
def __sample_cdf(self, decorrelation_distance: float, num_samples: int = 1000) -> np.ndarray:
"""Sample the CDF of the radial velocity distribution.
This operation is rather computationally expensive,
therefore results are cached in both memory and on disk during runtime.
Args:
decorrelation_distance: Euclidean distance at which a sample of this Gaussian process is considered to be uncorrelated with another sample.
num_samples: Number of samples to generate. 1000 by default.
Returns: Numpy array of radial velocities.
"""
# Check the in-memory cache
cached_cdf = ConsistentGenerator.__cdf_cache.get((decorrelation_distance, num_samples))
if cached_cdf is not None:
return cached_cdf
# Eq. 12
u_candidates = np.linspace(0, 1, 1 + num_samples, endpoint=True, dtype=np.float64)[:-1]
radial_velocities = np.empty(num_samples, dtype=np.float64)
a = 1 / decorrelation_distance
fr_max = 1
for indices, u in np.ndenumerate(u_candidates):
# Find an appropriate upper bound to start the bisect function
# Not sure if this is the best way to do it
# Note that the CDF is monotonically increasing and uper bounded by 1
while self.__radial_velocity_cdf(fr_max, a, u) < 0: # type: ignore[arg-type]
fr_max *= 2
# Solve the equation
radial_velocities[indices] = bisect(self.__radial_velocity_cdf, 0, fr_max, args=(a, u))
# Cache the result
ConsistentGenerator.__cdf_cache[(decorrelation_distance, num_samples)] = radial_velocities
return radial_velocities
[docs]
def realize(
self, decorrelation_distance: float, num_sinusoids: int = 30
) -> ConsistentRealization:
"""
Args:
decorrelation_distance: Euclidean distance at which a sample of this Gaussian process is considered to be uncorrelated with another sample.
num_sinusoids: Number of sinusoids used to approximate the Gaussian process. 30 by default.
Returns: Realization of the consistent generator.
"""
# Collect the required number of scalar random variables
num_scalars = 0
for variable in self.__variables:
num_scalars += variable.size
dimensions = (num_scalars, num_sinusoids, 2)
rng = self.__rng if isinstance(self.__rng, np.random.Generator) else self.__rng._rng
# If the decorrelation distance is infinite, the process is static
if decorrelation_distance == float("inf"):
return StaticConsistentRealization(rng.standard_normal(num_scalars))
# Sample the radial velocities
radial_velocity_candidates = self.__sample_cdf(decorrelation_distance)
radial_velocities = rng.choice(radial_velocity_candidates, size=dimensions)
# Eq. 11
azimuth_angles = rng.uniform(
0, 2 * np.pi, size=dimensions
) # Phi in the respective equations
zenith_angles = np.arccos(
1 - rng.uniform(0, 2, size=dimensions)
) # Theta in the respective equations
# Eq. 5
sin_zenith_angles = np.sin(zenith_angles)
radial_directions = np.array(
[
np.cos(azimuth_angles) * sin_zenith_angles,
np.sin(azimuth_angles) * sin_zenith_angles,
np.cos(zenith_angles),
]
)
# Consolidate frequencies and phases to a realization
frequencies = 2 * np.pi * radial_velocities * radial_directions
phases = rng.uniform(0, 2 * np.pi, size=(num_scalars, num_sinusoids))
return DualConsistentRealization(frequencies, phases)
[docs]
class ConsistentGaussian(ConsistentVariable):
"""Spatially consistent normally distributed Gaussian variable."""
[docs]
@override
def sample(self, sample: ConsistentSample, mean: float = 0.0, std: float = 1.0) -> np.ndarray:
# Fetch the scalar samples
samples = ConsistentVariable.sample(self, sample)
# Transform to the desired distribution
return mean + std * samples
[docs]
class ConsistentBoolean(ConsistentVariable):
"""Spatially consistent boolean random variable."""
[docs]
@override
def sample(self, sample: ConsistentSample) -> np.ndarray:
# Fetch the scalar samples
samples = ConsistentVariable.sample(self, sample)
# Transform to the boolean distribution
return samples > 0.0