Source code for leaspy.models.obs_models._gaussian

from __future__ import annotations

from typing import Callable

import torch

from leaspy.io.data.dataset import Dataset
from leaspy.models.utilities import compute_std_from_variance
from leaspy.utils.functional import Prod, Sqr
from leaspy.utils.weighted_tensor import (
    WeightedTensor,
    sum_dim,
    wsum_dim,
    wsum_dim_return_sum_of_weights_only,
    wsum_dim_return_weighted_sum_only,
)
from leaspy.variables.distributions import Normal
from leaspy.variables.specs import (
    LVL_FT,
    Collect,
    LinkedVariable,
    ModelParameter,
    VariableInterface,
    VariableName,
)
from leaspy.variables.state import State

from ._base import ObservationModel

__all__ = [
    "GaussianObservationModel",
    "FullGaussianObservationModel",
]


[docs] class GaussianObservationModel(ObservationModel): """ Specialized `ObservationModel` for noisy observations with Gaussian residuals assumption. Parameters ---------- name : :obj:`str` The name of observed variable (to name the data variable & attachment term related to this observation). getter : function :class:`.Dataset` -> :class:`.WeightedTensor` The way to retrieve the observed values from the :class:`.Dataset` (as a :class:`.WeightedTensor`): e.g. all values, subset of values - only x, y, z features, one-hot encoded features, ... loc : :obj:`str` The name of the variable representing the mean (location) of the Gaussian scale : :obj:`str` The name of the variable representing the standard deviation (scale) of the Gaussian (`noise_std`) **extra_vars : VariableInterface Additional variables required by the model Notes ----- - The model uses `leaspy.variables.distributions.Normal` internally for computing the log-likelihood and related operations. """ def __init__( self, name: VariableName, getter: Callable[[Dataset], WeightedTensor], loc: VariableName, scale: VariableName, **extra_vars: VariableInterface, ): super().__init__(name, getter, Normal(loc, scale), extra_vars=extra_vars)
[docs] class FullGaussianObservationModel(GaussianObservationModel): """ Specialized `GaussianObservationModel` when all data share the same observation model, with default naming. The default naming is: - 'y' for observations - 'model' for model predictions - 'noise_std' for scale of residuals We also provide a convenient factory `default` for most common case, which corresponds to `noise_std` directly being a `ModelParameter` (it could also be a `PopulationLatentVariable` with positive support). Whether scale of residuals is scalar or diagonal depends on the `dimension` argument of this method. """ tol_noise_variance = 1e-5 def __init__(self, noise_std: VariableInterface, **extra_vars: VariableInterface): super().__init__( name="y", getter=self.y_getter, loc="model", scale="noise_std", noise_std=noise_std, **extra_vars, )
[docs] @staticmethod def y_getter(dataset: Dataset) -> WeightedTensor: """ Extracts the observation values and mask from a dataset. Parameters ---------- dataset : :class:`.Dataset` A dataset object containing 'values' and 'mask' attributes Returns ------- :class:`.WeightedTensor` A tensor containing the observed values and a boolean mask used as weights for likekelihood and loss computations Raises ------ AssertionError If either `dataset.values`or `dataset.mask`is `None`. """ assert dataset.values is not None assert dataset.mask is not None return WeightedTensor(dataset.values, weight=dataset.mask.to(torch.bool))
[docs] @classmethod def noise_std_suff_stats(cls) -> dict[VariableName, LinkedVariable]: """ Dictionary of sufficient statistics needed for `noise_std` (when directly a model parameter). Returns ------- :obj:`dict` [ :class:`~leaspy.variables.specs.VariableName`, :class:`~leaspy.variables.specs.LinkedVariable`] A dictionary containing the sufficient statistics: - `"y_x_model"`: Product of the observed values (`"y"`) and the model predictions (`"model"`). - `"model_x_model"`: Squared values of the model predictions (`"model"`). """ return dict( y_x_model=LinkedVariable(Prod("y", "model")), model_x_model=LinkedVariable(Sqr("model")), )
[docs] @classmethod def scalar_noise_std_update( cls, *, state: State, y_x_model: WeightedTensor[float], model_x_model: WeightedTensor[float], ) -> torch.Tensor: """ Update rule for scalar `noise_std` (when directly a model parameter), from state & sufficient statistics. Computes a common `noise_std` for all the features Parameters ---------- state: :class:`State` A state dictionary containing precomputed values y_x_model : WeightedTensor[float] The weighted inner product between the observations and the model predictions. model_x_model : WeightedTensor[float] The weighted inner product of the model predictions with themselves. Returns ------- :class:`torch.Tensor` The updated scalar value of the `noise_std`. """ y_l2 = state["y_L2"] n_obs = state["n_obs"] # TODO? by linearity couldn't we only require `-2*y_x_model + model_x_model` as summary stat? # and couldn't we even collect the already summed version of it? s1 = sum_dim(y_x_model) s2 = sum_dim(model_x_model) noise_var = (y_l2 - 2 * s1 + s2) / n_obs.float() return compute_std_from_variance( noise_var, varname="noise_std", tol=cls.tol_noise_variance, )
[docs] @classmethod def diagonal_noise_std_update( cls, *, state: State, y_x_model: WeightedTensor[float], model_x_model: WeightedTensor[float], ) -> torch.Tensor: """ Update rule for feature-wise `noise_std` (when directly a model parameter), from state & sufficient statistics. Computes one `noise_std` per feature. Parameters ---------- state: :class:`State` A state dictionary containing precomputed values y_x_model : :class:`.WeightedTensor`[:obj:`float`] The weighted inner product between the observations and the model predictions. model_x_model : :class:`.WeightedTensor`[:obj:`float`] The weighted inner product of the model predictions with themselves. Returns ------- :class:`torch.Tensor` The updated value of the `noise_std` for each feature. """ y_l2_per_ft = state["y_L2_per_ft"] n_obs_per_ft = state["n_obs_per_ft"] # TODO: same remark as in `.scalar_noise_std_update()` # sum must be done after computation to use weights of y in model to mask missing data summed = sum_dim(-2 * y_x_model + model_x_model, but_dim=LVL_FT) noise_var = (y_l2_per_ft + summed) / n_obs_per_ft.float() return compute_std_from_variance( noise_var, varname="noise_std", tol=cls.tol_noise_variance, )
[docs] @classmethod def noise_std_specs(cls, dimension: int) -> ModelParameter: """ Default specifications of `noise_std` variable when directly modelled as a parameter (no latent population variable). Parameters ---------- dimension : :obj:`int` The dimension of the `noise_std`. - If `dimension == 1`, a scalar `noise_std` deviation is assumed. - If `dimension > 1`, feature-wise independent `noise_std` deviations are assumed (diagonal noise). Returns ------- ModelParameter The specification of the `noise_std`, including: - `shape`: tuple defining the parameter shape. - `suff_stats`: collected sufficient statistics needed for updates. - `update_rule`: method to update the parameter based on statistics. """ update_rule = ( cls.scalar_noise_std_update if dimension == 1 else cls.diagonal_noise_std_update ) return ModelParameter( shape=(dimension,), suff_stats=Collect(**cls.noise_std_suff_stats()), update_rule=update_rule, )
[docs] @classmethod def with_noise_std_as_model_parameter(cls, dimension: int): """ Default instance of `FullGaussianObservationModel` with `noise_std` (scalar or diagonal depending on `dimension`) being a `ModelParameter`. Parameters ---------- dimension : :obj:`int` The dimension of the `noise_std`. - If `dimension == 1`, a scalar `noise_std` is assumed. - If `dimension > 1`, feature-wise independent `noise_std` deviations are assumed (diagonal noise). Returns ------- FullGaussianObservationModel A configured instance with `noise_std` as a `ModelParameter`, along with the necessary sufficient statistics for inference. Raises ------ ValueError If `dimension` is not a positive integer. """ if not isinstance(dimension, int) or dimension < 1: raise ValueError( f"Dimension should be an integer >= 1. You provided {dimension}." ) if dimension == 1: extra_vars = { "y_L2": LinkedVariable( Sqr("y").then(wsum_dim_return_weighted_sum_only) ), "n_obs": LinkedVariable( Sqr("y").then(wsum_dim_return_sum_of_weights_only) ), } else: extra_vars = { "y_L2_per_ft": LinkedVariable( Sqr("y").then(wsum_dim_return_weighted_sum_only, but_dim=LVL_FT) ), "n_obs_per_ft": LinkedVariable( Sqr("y").then(wsum_dim_return_sum_of_weights_only, but_dim=LVL_FT) ), } return cls(noise_std=cls.noise_std_specs(dimension), **extra_vars)
# Util functions not directly used in code
[docs] @classmethod def compute_rmse( cls, *, y: WeightedTensor[float], model: WeightedTensor[float], ) -> torch.Tensor: """ Computes the Root Mean Square Error (RMSE) between predictions and observations. Parameters ---------- y : :class:`.WeightedTensor`[:obj:`float`] The observed target values with associated weights. model : :class:`.WeightedTensor`[:obj:`float`] The model predictions with the same shape and weighting scheme as `y`. Returns ------- :class:`torch.Tensor` A scalar tensor representing the RMSE between `model` and `y`. """ l2: WeightedTensor[float] = (model - y) ** 2 l2_sum, n_obs = wsum_dim(l2) return (l2_sum / n_obs.float()) ** 0.5
[docs] @classmethod def compute_rmse_per_ft( cls, *, y: WeightedTensor[float], model: WeightedTensor[float], ) -> torch.Tensor: """ Computes the Root Mean Square Error (RMSE) between predictions and observations separately for each feature. Parameters ---------- y : :class:`.WeightedTensor`[:obj:`float`] The observed target values with associated weights. model : :class:`.WeightedTensor`[:obj:`float`] The model predictions with the same shape and weighting scheme as `y`. Returns ------- :class:`torch.Tensor` A scalar tensor representing the RMSE between `model` and `y`. """ l2: WeightedTensor[float] = (model - y) ** 2 l2_sum_per_ft, n_obs_per_ft = wsum_dim(l2, but_dim=LVL_FT) return (l2_sum_per_ft / n_obs_per_ft.float()) ** 0.5
[docs] def to_string(self) -> str: """method for parameter saving""" if self.extra_vars["noise_std"].shape == (1,): return "gaussian-scalar" return "gaussian-diagonal"