Source code for hermespy.channel.consistent

# -*- coding: utf-8 -*-

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict
from typing_extensions import override

import numpy as np
from scipy.optimize import bisect
from scipy.stats import norm

from hermespy.core import DeserializationProcess, Serializable, SerializationProcess, RandomNode

__author__ = "Jan Adler"
__copyright__ = "Copyright 2024, Barkhausen Institut gGmbH"
__credits__ = ["Jan Adler"]
__license__ = "AGPLv3"
__version__ = "1.5.0"
__maintainer__ = "Jan Adler"
__email__ = "jan.adler@barkhauseninstitut.org"
__status__ = "Prototype"


[docs] class ConsistentVariable(ABC, Serializable): """Base class for spatially consistent random variables.""" _DEFAULT_OFFSET = 0 __shape: Tuple[int, ...] __offset: int __size: int def __init__(self, shape: Tuple[int, ...], offset: int = _DEFAULT_OFFSET) -> None: """ Args: shape: Shape of the output array offset: Offset of the variable within its generator. Defaults to zero. Adding a variable to a generator will change the offset. """ # Initialize attributes self.__shape = (1,) if shape is None else shape self.__size = int(np.prod(self.shape)) self.__offset = offset @property def shape(self) -> Tuple[int, ...]: """Shape of the output array.""" return self.__shape @property def size(self) -> int: """Number of scalar samples to be generated.""" return self.__size @property def offset(self) -> int: """Offset of the variable in the generated samples. Adding a variable to a generator will change the offset. """ return self.__offset @offset.setter def offset(self, value: int) -> None: self.__offset = value
[docs] def sample(self, sample: ConsistentSample) -> np.ndarray: """Sample the variable given a sample of the realization. Args: sample: Sample of the realization. Returns: Numpy array of samples of dimensions matching the variable size. """ # Fetch the scalar samples scalar_samples = sample.fetch_scalars(self.offset, self.size) # Reshape to match the variable size reshaped_samples = scalar_samples.reshape(self.shape) return reshaped_samples
[docs] @override def serialize(self, process: SerializationProcess) -> None: process.serialize_array(np.array(self.__shape), "shape") process.serialize_integer(self.__offset, "offset")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> ConsistentVariable: return cls( tuple(process.deserialize_array("shape", np.int64)), process.deserialize_integer("offset", cls._DEFAULT_OFFSET), )
[docs] class ConsistentSample(object): """Sample of a consistent realization. Generated by calling :meth:`ConsistentRealization.sample`. """
[docs] @abstractmethod def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray: """Fetch the scalar samples. Args: offset: Offset of the scalar samples within this realization. num_scalars: Number of scalar samples to fetch. Returns: Numpy vector of scalar samples. """ ... # pragma: no cover
[docs] class ConsistentRealization(ABC, Serializable): """Base class for a realization of a consistent random generator."""
[docs] @abstractmethod def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> ConsistentSample: """Sample the realization given two locations in euclidean space. Args: position_a: First position in euclidean space. position_b: Second position in euclidean space. Returns: Sample of the realization. """ ... # pragma: no cover
[docs] class DualConsistentSample(ConsistentSample): """Sample of a dual consistent realization. Generated by calling :meth:`DualConsistentRealization.sample`. """ def __init__(self, scalar_samples: np.ndarray) -> None: """Initialize a dual consistent sample.""" self.__scalar_samples = scalar_samples
[docs] @override def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray: return self.__scalar_samples[offset : offset + num_scalars]
[docs] class DualConsistentRealization(ConsistentRealization): """Realization of a set of dual consistent random variables.""" __frequencies: np.ndarray __phases: np.ndarray def __init__(self, frequencies: np.ndarray, phases: np.ndarray) -> None: """ Args: frequencies: Frequencies of the spatially consistent process. phases: Phases of the spatially consistent process. """ self.__frequencies = frequencies self.__phases = phases @property def frequencies(self) -> np.ndarray: """Frequencies of the spatially consistent process.""" return self.__frequencies @property def phases(self) -> np.ndarray: """Phases of the spatially consistent process.""" return self.__phases
[docs] @override def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> DualConsistentSample: # Eq. 3 scalar_samples: np.ndarray = (2 / self.__frequencies.shape[2]) ** 0.5 * np.sum( np.cos( np.tensordot(position_a, self.__frequencies[..., 0], (0, 0)) + np.tensordot(position_b, self.__frequencies[..., 1], (0, 0)) + self.__phases ), axis=-1, keepdims=False, ) return DualConsistentSample(scalar_samples)
[docs] @override def serialize(self, process: SerializationProcess) -> None: process.serialize_array(self.__frequencies, "frequencies") process.serialize_array(self.__phases, "phases")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> DualConsistentRealization: return DualConsistentRealization( process.deserialize_array("frequencies", np.float64), process.deserialize_array("phases", np.float64), )
[docs] class StaticConsistentSample(ConsistentSample): """Consistent sample that is invariant in space.""" __scalar_samples: np.ndarray def __init__(self, scalar_samples: np.ndarray) -> None: """ Args: scalar_samples: Scalar samples of the realization. """ self.__scalar_samples = scalar_samples
[docs] @override def fetch_scalars(self, offset: int, num_scalars: int) -> np.ndarray: return self.__scalar_samples[offset : offset + num_scalars]
[docs] class StaticConsistentRealization(ConsistentRealization): """Consistent realization that is immutable in space.""" __scalar_samples: np.ndarray def __init__(self, scalar_samples: np.ndarray) -> None: """ Args: scalar_samples: Scalar samples of the realization. """ self.__scalar_samples = scalar_samples.flatten()
[docs] @override def sample(self, position_a: np.ndarray, position_b: np.ndarray) -> DualConsistentSample: return DualConsistentSample(self.__scalar_samples)
[docs] @override def serialize(self, process: SerializationProcess) -> None: process.serialize_array(self.__scalar_samples, "scalar_samples")
[docs] @classmethod @override def Deserialize(cls, process: DeserializationProcess) -> StaticConsistentRealization: return StaticConsistentRealization(process.deserialize_array("scalar_samples", np.float64))
[docs] class ConsistentGenerator(object): """Generator of consistent random variables.""" __rng: np.random.Generator | RandomNode __offset: int __variables: List[ConsistentVariable] __cdf_cache: Dict[Tuple[float, int], np.ndarray] = dict() @staticmethod def __radial_velocity_cdf(fr: float, a: float, U: float) -> float: """Implementation of the probability density function in Eq. 10 Args: fr: Radial velocity. a: Correlation distance. U: Expected CDF value. """ return ( 2 / np.pi * np.arctan(2 * np.pi * fr / a) - 4 * a * fr / (4 * np.pi**2 * fr**2 + a**2) - U ) def __init__(self, rng: np.random.Generator | RandomNode) -> None: """ Args: rng: Random number generator used to initialize this random variable. """ # Initialize attributes self.__rng = rng self.__offset = 0 self.__variables = []
[docs] def gaussian(self, shape: Tuple[int, ...] | None = None) -> ConsistentGaussian: """Create a dual consistent Gaussian random variable. Args: shape: Shape of the output array. If not specified, the variable will be scalar. Returns: Dual consistent Gaussian random variable. """ _shape = (1,) if shape is None else shape variable = ConsistentGaussian(_shape) self.add_variable(variable) return variable
[docs] def uniform(self, shape: Tuple[int, ...] | None = None) -> ConsistentUniform: """Create a dual consistent uniform random variable. Args: shape: Shape of the output array. If not specified, the variable will be scalar. Returns: Dual consistent uniform random variable. """ _shape = (1,) if shape is None else shape variable = ConsistentUniform(_shape) self.add_variable(variable) return variable
[docs] def boolean(self, shape: Tuple[int, ...] | None = None) -> ConsistentBoolean: """Create a dual consistent boolean random variable. Args: shape: Shape of the output array. If not specified, the variable will be scalar. Returns: Dual consistent boolean random variable. """ _shape = (1,) if shape is None else shape variable = ConsistentBoolean(_shape) self.add_variable(variable) return variable
[docs] def add_variable(self, variable: ConsistentVariable) -> int: """Add a dual consistent random variable to the generator. Returns: The variable's offset in the generated samples. """ if variable in self.__variables: return variable.offset variable_offset = self.__offset self.__offset += variable.size variable.offset = variable_offset self.__variables.append(variable) return variable_offset
def __sample_cdf(self, decorrelation_distance: float, num_samples: int = 1000) -> np.ndarray: """Sample the CDF of the radial velocity distribution. This operation is rather computationally expensive, therefore results are cached in both memory and on disk during runtime. Args: decorrelation_distance: Euclidean distance at which a sample of this Gaussian process is considered to be uncorrelated with another sample. num_samples: Number of samples to generate. 1000 by default. Returns: Numpy array of radial velocities. """ # Check the in-memory cache cached_cdf = ConsistentGenerator.__cdf_cache.get((decorrelation_distance, num_samples)) if cached_cdf is not None: return cached_cdf # Eq. 12 u_candidates = np.linspace(0, 1, 1 + num_samples, endpoint=True, dtype=np.float64)[:-1] radial_velocities = np.empty(num_samples, dtype=np.float64) a = 1 / decorrelation_distance fr_max = 1 for indices, u in np.ndenumerate(u_candidates): # Find an appropriate upper bound to start the bisect function # Not sure if this is the best way to do it # Note that the CDF is monotonically increasing and uper bounded by 1 while self.__radial_velocity_cdf(fr_max, a, u) < 0: # type: ignore[arg-type] fr_max *= 2 # Solve the equation radial_velocities[indices] = bisect(self.__radial_velocity_cdf, 0, fr_max, args=(a, u)) # Cache the result ConsistentGenerator.__cdf_cache[(decorrelation_distance, num_samples)] = radial_velocities return radial_velocities
[docs] def realize( self, decorrelation_distance: float, num_sinusoids: int = 30 ) -> ConsistentRealization: """ Args: decorrelation_distance: Euclidean distance at which a sample of this Gaussian process is considered to be uncorrelated with another sample. num_sinusoids: Number of sinusoids used to approximate the Gaussian process. 30 by default. Returns: Realization of the consistent generator. """ # Collect the required number of scalar random variables num_scalars = 0 for variable in self.__variables: num_scalars += variable.size dimensions = (num_scalars, num_sinusoids, 2) rng = self.__rng if isinstance(self.__rng, np.random.Generator) else self.__rng._rng # If the decorrelation distance is infinite, the process is static if decorrelation_distance == float("inf"): return StaticConsistentRealization(rng.standard_normal(num_scalars)) # Sample the radial velocities radial_velocity_candidates = self.__sample_cdf(decorrelation_distance) radial_velocities = rng.choice(radial_velocity_candidates, size=dimensions) # Eq. 11 azimuth_angles = rng.uniform( 0, 2 * np.pi, size=dimensions ) # Phi in the respective equations zenith_angles = np.arccos( 1 - rng.uniform(0, 2, size=dimensions) ) # Theta in the respective equations # Eq. 5 sin_zenith_angles = np.sin(zenith_angles) radial_directions = np.array( [ np.cos(azimuth_angles) * sin_zenith_angles, np.sin(azimuth_angles) * sin_zenith_angles, np.cos(zenith_angles), ] ) # Consolidate frequencies and phases to a realization frequencies = 2 * np.pi * radial_velocities * radial_directions phases = rng.uniform(0, 2 * np.pi, size=(num_scalars, num_sinusoids)) return DualConsistentRealization(frequencies, phases)
[docs] class ConsistentGaussian(ConsistentVariable): """Spatially consistent normally distributed Gaussian variable."""
[docs] @override def sample(self, sample: ConsistentSample, mean: float = 0.0, std: float = 1.0) -> np.ndarray: # Fetch the scalar samples samples = ConsistentVariable.sample(self, sample) # Transform to the desired distribution return mean + std * samples
[docs] class ConsistentUniform(ConsistentVariable): """Spatially consistent uniformly distributed random variable."""
[docs] @override def sample(self, sample: ConsistentSample) -> np.ndarray: # Fetch the scalar samples samples = ConsistentVariable.sample(self, sample) # Transform to the uniform distribution over the gaussian CDF return norm.cdf(samples)
[docs] class ConsistentBoolean(ConsistentVariable): """Spatially consistent boolean random variable."""
[docs] @override def sample(self, sample: ConsistentSample) -> np.ndarray: # Fetch the scalar samples samples = ConsistentVariable.sample(self, sample) # Transform to the boolean distribution return samples > 0.0