Source code for leaspy.models.utilities

import re
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import torch

from leaspy.exceptions import LeaspyConvergenceError
from leaspy.utils.functional import (
    Identity,
    MatMul,
    Mean,
    NamedInputFunction,
    Prod,
    Sqr,
    Std,
    Sum,
    SumDim,
    get_named_parameters,
)
from leaspy.utils.weighted_tensor import WeightedTensor

__all__ = [
    "tensorize_2D",
    "val_to_tensor",
    "serialize_tensor",
    "is_array_like",
    "tensor_to_list",
    "compute_std_from_variance",
    "compute_patient_slopes_distribution",
    "compute_linear_regression_subjects",
    "compute_patient_values_distribution",
    "compute_patient_time_distribution",
    "get_log_velocities",
    "torch_round",
    "compute_ind_param_mean_from_suff_stats_mixture",
    "compute_ind_param_std_from_suff_stats_mixture",
    "compute_ind_param_std_from_suff_stats_mixture_burn_in",
    "compute_probs_from_state",
]


def tensorize_2D(x, unsqueeze_dim: int, dtype=torch.float32) -> torch.Tensor:
    """Convert a scalar or array_like into an, at least 2D, dtype tensor.

    Parameters
    ----------
    x : scalar or array_like
        Element to be tensorized.

    unsqueeze_dim : :obj:`int`
        Dimension to be unsqueezed (0 or -1).
        Meaningful for 1D array-like only (for scalar or vector
        of length 1 it has no matter).

    Returns
    -------
    :class:`torch.Tensor`, at least 2D

    Examples
    --------
    >>> tensorize_2D([1, 2], 0) == tensor([[1, 2]])
    >>> tensorize_2D([1, 2], -1) == tensor([[1], [2])
    """
    # convert to torch.Tensor if not the case
    if not isinstance(x, torch.Tensor):
        x = torch.tensor(x, dtype=dtype)
    # convert dtype if needed
    if x.dtype != dtype:
        x = x.to(dtype)
    # if tensor is less than 2-dimensional add dimensions
    while x.dim() < 2:
        x = x.unsqueeze(dim=unsqueeze_dim)
    # postcondition: x.dim() >= 2
    return x


def val_to_tensor(val, shape: Optional[tuple] = None):
    if not isinstance(val, (torch.Tensor, WeightedTensor)):
        val = torch.tensor(val)
    if shape is not None:
        val = val.view(shape)  # no expansion here
    return val


def serialize_tensor(v, *, indent: str = "", sub_indent: str = "") -> str:
    """Nice serialization of floats, torch tensors (or numpy arrays)."""
    from torch._tensor_str import PRINT_OPTS as torch_print_opts

    if isinstance(v, (str, bool, int)):
        return str(v)
    if isinstance(v, np.ndarray):
        return str(v.tolist())
    if isinstance(v, float) or getattr(v, "ndim", -1) == 0:
        # for 0D tensors / arrays the default behavior is to print all digits...
        # change this!
        return f"{v:.{1 + torch_print_opts.precision}g}"
    if isinstance(v, (list, frozenset, set, tuple)):
        try:
            return serialize_tensor(
                torch.tensor(list(v)), indent=indent, sub_indent=sub_indent
            )
        except Exception:
            return str(v)
    if isinstance(v, dict):
        if not len(v):
            return ""
        subs = [
            f"{p} : "
            + serialize_tensor(vp, indent="  ", sub_indent=" " * len(f"{p} : ["))
            for p, vp in v.items()
        ]
        lines = [indent + _ for _ in "\n".join(subs).split("\n")]
        return "\n" + "\n".join(lines)
    # torch.tensor, np.array, ...
    # in particular you may use `torch.set_printoptions` and `np.set_printoptions` globally
    # to tune the number of decimals when printing tensors / arrays
    v_repr = str(v)
    # remove tensor prefix & possible device/size/dtype suffixes
    v_repr = re.sub(r"^[^\(]+\(", "", v_repr)
    v_repr = re.sub(r"(?:, device=.+)?(?:, size=.+)?(?:, dtype=.+)?\)$", "", v_repr)
    # adjust justification
    return re.sub(r"\n[ ]+([^ ])", rf"\n{sub_indent}\1", v_repr)


def is_array_like(v: Any) -> bool:
    try:
        len(v)  # exclude np.array(scalar) or torch.tensor(scalar)
        return hasattr(v, "__getitem__")  # exclude set
    except Exception:
        return False


[docs] def tensorize_2D(x, unsqueeze_dim: int, dtype=torch.float32) -> torch.Tensor: """Convert a scalar or array_like into an, at least 2D, dtype tensor. Parameters ---------- x : scalar or array_like Element to be tensorized. unsqueeze_dim : :obj:`int` Dimension to be unsqueezed (0 or -1). Meaningful for 1D array-like only (for scalar or vector of length 1 it has no matter). Returns ------- :class:`torch.Tensor`, at least 2D Examples -------- >>> tensorize_2D([1, 2], 0) == tensor([[1, 2]]) >>> tensorize_2D([1, 2], -1) == tensor([[1], [2]) """ # convert to torch.Tensor if not the case if not isinstance(x, torch.Tensor): x = torch.tensor(x, dtype=dtype) # convert dtype if needed if x.dtype != dtype: x = x.to(dtype) # if tensor is less than 2-dimensional add dimensions while x.dim() < 2: x = x.unsqueeze(dim=unsqueeze_dim) # postcondition: x.dim() >= 2 return x
[docs] def val_to_tensor(val, shape: Optional[tuple] = None): if not isinstance(val, (torch.Tensor, WeightedTensor)): val = torch.tensor(val) if shape is not None: val = val.view(shape) # no expansion here return val
[docs] def serialize_tensor(v, *, indent: str = "", sub_indent: str = "") -> str: """Nice serialization of floats, torch tensors (or numpy arrays).""" from torch._tensor_str import PRINT_OPTS as torch_print_opts if isinstance(v, (str, bool, int)): return str(v) if isinstance(v, np.ndarray): return str(v.tolist()) if isinstance(v, float) or getattr(v, "ndim", -1) == 0: # for 0D tensors / arrays the default behavior is to print all digits... # change this! return f"{v:.{1 + torch_print_opts.precision}g}" if isinstance(v, (list, frozenset, set, tuple)): try: return serialize_tensor( torch.tensor(list(v)), indent=indent, sub_indent=sub_indent ) except Exception: return str(v) if isinstance(v, dict): if not len(v): return "" subs = [ f"{p} : " + serialize_tensor(vp, indent=" ", sub_indent=" " * len(f"{p} : [")) for p, vp in v.items() ] lines = [indent + _ for _ in "\n".join(subs).split("\n")] return "\n" + "\n".join(lines) # torch.tensor, np.array, ... # in particular you may use `torch.set_printoptions` and `np.set_printoptions` globally # to tune the number of decimals when printing tensors / arrays v_repr = str(v) # remove tensor prefix & possible device/size/dtype suffixes v_repr = re.sub(r"^[^\(]+\(", "", v_repr) v_repr = re.sub(r"(?:, device=.+)?(?:, size=.+)?(?:, dtype=.+)?\)$", "", v_repr) # adjust justification return re.sub(r"\n[ ]+([^ ])", rf"\n{sub_indent}\1", v_repr)
[docs] def is_array_like(v: Any) -> bool: try: len(v) # exclude np.array(scalar) or torch.tensor(scalar) return hasattr(v, "__getitem__") # exclude set except Exception: return False
[docs] def tensor_to_list(x: Union[list, torch.Tensor]) -> list: """ Convert input tensor to list. Parameters ---------- x : :obj:`list` or :obj:`torch.Tensor` Input tensor or list to be converted. Returns ------- :obj:`list` List converted from tensor input, or original list if input was not a tensor. Raises ------ :exc:`NotImplementedError` If the input is a `WeightedTensor`, as this functionality is not yet implemented. """ if isinstance(x, (np.ndarray, torch.Tensor)): return x.tolist() if isinstance(x, WeightedTensor): raise NotImplementedError("TODO") return x
[docs] def compute_std_from_variance( variance: torch.Tensor, varname: str, tol: float = 1e-5, ) -> torch.Tensor: """ Check that variance is strictly positive and return its square root, otherwise fail with a convergence error. If variance is multivariate check that all components are strictly positive. TODO? a full Bayesian setting with good priors on all variables should prevent such convergence issues. Parameters ---------- variance : :obj:`torch.Tensor` The variance we would like to convert to a std-dev. varname : :obj:`str` The name of the variable. tol : :obj:`float`, optional The lower bound on variance, under which the converge error is raised. Default=1e-5. Returns ------- :obj: `torch.Tensor` : The standard deviation from the variance. Raises ------ :exc:`.LeaspyConvergenceError` If the variance is less than the specified tolerance, indicating a convergence issue. """ if (variance < tol).any(): raise LeaspyConvergenceError( f"The parameter '{varname}' collapsed to zero, which indicates a convergence issue.\n" "Start by investigating what happened in the logs of your calibration and try to double check:" "\n- your training dataset (not enough subjects and/or visits? too much missing data?)" "\n- the hyperparameters of your Leaspy model (`source_dimension` too low or too high? " "observation model not suited to your data?)" "\n- the hyperparameters of your calibration algorithm" ) return variance.sqrt()
def compute_ind_param_std_from_suff_stats( state: Dict[str, torch.Tensor], ip_values: torch.Tensor, ip_sqr_values: torch.Tensor, *, ip_name: str, dim: int, **kws, ): """ Maximization rule, from the sufficient statistics, of the standard-deviation of Gaussian prior for individual latent variables. Parameters ---------- state : Dict[str, torch.Tensor] ip_values : torch.Tensor ip_sqr_values : torch.Tensor ip_name : str dim : int """ ip_old_mean = state[f"{ip_name}_mean"] ip_cur_mean = torch.mean(ip_values, dim=dim) ip_var_update = torch.mean(ip_sqr_values, dim=dim) - 2 * ip_old_mean * ip_cur_mean ip_var = ip_var_update + ip_old_mean**2 return compute_std_from_variance(ip_var, varname=f"{ip_name}_std", **kws)
[docs] def compute_ind_param_mean_from_suff_stats_mixture( state: Dict[str, torch.Tensor], *, ip_name: str, ) -> torch.Tensor: ind_var = state[f"{ip_name}"] nll_regul_ind_sum_ind = state["nll_regul_ind_sum_ind"].value nll_cluster = -nll_regul_ind_sum_ind probs_ind = torch.nn.Softmax(dim=1)(torch.clamp(nll_cluster, -100.0)) if ip_name == "sources": # special treatment due to the extra dimension ind_var_expanded = ind_var.unsqueeze(-1) probs_expanded = probs_ind.unsqueeze(1) result = ind_var_expanded * probs_expanded else: result = probs_ind * ind_var result = result.sum(dim=0) / probs_ind.sum(dim=0) return result
[docs] def compute_ind_param_std_from_suff_stats_mixture( state: Dict[str, torch.Tensor], ip_values: torch.Tensor, ip_sqr_values: torch.Tensor, *, ip_name: str, dim: int, **kws, ): ip_old_mean = state[f"{ip_name}_mean"] ip_cur_mean = torch.mean(ip_values, dim=0) ip_var_update = torch.mean(ip_sqr_values, dim=0) - 2 * ip_old_mean * ip_cur_mean ip_var = ip_var_update + ip_old_mean**2 std = ip_var.sqrt() nll_regul_ind_sum_ind = state["nll_regul_ind_sum_ind"].value nll_cluster = -nll_regul_ind_sum_ind probs_ind = torch.nn.Softmax(dim=1)(torch.clamp(nll_cluster, -100.0)) result = (probs_ind * std).sum(dim=0) / probs_ind.sum(dim=0) return result
[docs] def compute_ind_param_std_from_suff_stats_mixture_burn_in( state: Dict[str, torch.Tensor], *, ip_name: str, ) -> torch.Tensor: ind_var = state[f"{ip_name}"].std(dim=0) nll_regul_ind_sum_ind = state["nll_regul_ind_sum_ind"].value nll_cluster = -nll_regul_ind_sum_ind probs_ind = torch.nn.Softmax(dim=1)(torch.clamp(nll_cluster, -100.0)) result = (probs_ind * ind_var).sum(dim=0) / probs_ind.sum(dim=0) return result
[docs] def compute_probs_from_state( state: Dict[str, torch.Tensor], ) -> torch.Tensor: nll_regul_ind_sum_ind = state["nll_regul_ind_sum_ind"].value n_inds = nll_regul_ind_sum_ind.shape[0] nll_cluster = -nll_regul_ind_sum_ind probs_ind = torch.nn.Softmax(dim=1)(torch.clamp(nll_cluster, -100.0)) return probs_ind.sum(dim=0) / n_inds
[docs] def compute_patient_slopes_distribution( df: pd.DataFrame, *, max_inds: Optional[int] = None, ) -> Tuple[torch.Tensor, torch.Tensor]: """ Compute linear regression slopes and their standard deviations for each feature. Parameters ---------- df : :obj:`pd.DataFrame` DataFrame containing individual scores. max_inds : :obj:`int`, optional Restrict computation to first `max_inds` individuals. Default="None" Returns ------- :obj:`Tuple`[:obj:`torch.Tensor`, :obj:`torch.Tensor`]: Tuple with : - [0] : torch.Tensor of shape (n_features,) - Regression slopes - [1] : torch.Tensor of shape (n_features,) - Standard deviation of the slopes """ d_regress_params = compute_linear_regression_subjects(df, max_inds=max_inds) slopes_mu, slopes_sigma = [], [] for ft, df_regress_ft in d_regress_params.items(): slopes_mu.append(df_regress_ft["slope"].mean()) slopes_sigma.append(df_regress_ft["slope"].std()) return torch.tensor(slopes_mu), torch.tensor(slopes_sigma)
[docs] def compute_linear_regression_subjects( df: pd.DataFrame, *, max_inds: Optional[int] = None, ) -> Dict[str, pd.DataFrame]: """ Linear Regression on each feature to get intercept & slopes Parameters ---------- df : :obj:`pd.DataFrame` Contains the individual scores. max_inds : :obj:`int`, optional Restrict computation to first `max_inds` individuals. Default="None" Returns ------- :obj: `Dict`[:obj:`str`, :obj:`pd.DataFrame`]: Dictionary with : - keys : feature names - values : DataFrame with : - index : Individual IDs - columns : 'intercept', 'slope' """ regression_parameters = {} for feature, data in df.items(): data = data.dropna() n_visits = data.groupby("ID").size() indices_train = n_visits[n_visits >= 2].index if max_inds: indices_train = indices_train[:max_inds] data_train = data.loc[indices_train] regression_parameters[feature] = ( data_train.groupby("ID").apply(_linear_regression_against_time).unstack(-1) ) return regression_parameters
def _linear_regression_against_time(data: pd.Series) -> Dict[str, float]: """ Return intercept & slope of a linear regression of series values against time (present in series index). Parameters ---------- data : :obj:`pd.Series` Series with time index and values to regress Returns ------- :obj: `Dict`[:obj:`str`, :obj: `float`]: Dictionary with: - keys : 'intercept', 'slope' - values : intercept & slope of the linear regression """ from scipy.stats import linregress y = data.values t = data.index.get_level_values("TIME").values slope, intercept, r_value, p_value, std_err = linregress(t, y) return {"intercept": intercept, "slope": slope}
[docs] def compute_patient_values_distribution( df: pd.DataFrame, ) -> Tuple[torch.Tensor, torch.Tensor]: """ Returns means and standard deviations for the features of the given dataset values. Parameters ---------- df : :obj:`pd.DataFrame` Contains the individual scores. Returns ------- :obj: Tuple[:obj:`torch.Tensor`, :obj:`torch.Tensor`]: Tuple with: - [0] : torch.Tensor of shape (n_features,) - Means of the features - [1] : torch.Tensor of shape (n_features,) - Standard deviations of the features """ return torch.tensor(df.mean().values), torch.tensor(df.std().values)
[docs] def compute_patient_time_distribution( df: pd.DataFrame, ) -> Tuple[torch.Tensor, torch.Tensor]: """ Returns mu / sigma of given dataset times. Parameters ---------- df : :obj:`pd.DataFrame` Contains the individual scores. Returns ------- :obj:`Tuple`[:obj:`torch.Tensor`, :obj:`torch.Tensor`]: Tuple with: - [0] : torch.Tensor - Mean of the times - [1] : torch.Tensor - Standard deviation of the times """ times = df.index.get_level_values("TIME").values return torch.tensor([times.mean()]), torch.tensor([times.std()])
[docs] def get_log_velocities( velocities: torch.Tensor, features: List[str], *, min: float = 1e-2 ) -> torch.Tensor: """ Get the log of the velocities, clamping them to `min` if negative. Parameters ---------- velocities : :obj:`torch.Tensor` The velocities to be clamped and logged. features : :obj:`List`[:obj:`str`] The names of the features corresponding to the velocities. min : :obj:`float`, optional The minimum value to clamp the velocities to. Default=1e-2 Returns ------- :obj:`torch.Tensor` : The log of the clamped velocities. Raises ------ :obj:`Warning` If some negative velocities are provided. """ neg_velocities = velocities <= 0 if neg_velocities.any(): warnings.warn( f"Mean slope of individual linear regressions made at initialization is negative for " f"{[f for f, vel in zip(features, velocities) if vel <= 0]}: not properly handled in model..." ) return velocities.clamp(min=min).log()
[docs] def torch_round(t: torch.FloatTensor, *, tol: float = 1 << 16) -> torch.FloatTensor: """ Multiplies the tensor by `tol`, applies standard rounding, then scales back. This effectively rounds values to the nearest multiple of `1.0 / tol`. Parameters ---------- t : :obj:`torch.FloatTensor` The tensor to be rounded. tol : :obj:`float`, optional The tolerance factor controlling rounding precision (higher = finer rounding). Default=1 << 16 (65536). This corresponds to rounding to ~ 10**-4.8. Returns ------- :obj:`torch.FloatTensor` : The rounded tensor with the same shape as input `t`. """ return (t * tol).round() * (1.0 / tol)