Source code for leaspy.algo.personalize.lme_personalize

import warnings

import numpy as np
import statsmodels.api as sm

from leaspy.io.data import Dataset
from leaspy.io.outputs.individual_parameters import IndividualParameters
from leaspy.models import LMEModel

from ..base import AlgorithmName
from .base import PersonalizeAlgorithm

__all__ = ["LMEPersonalizeAlgorithm"]


[docs] class LMEPersonalizeAlgorithm(PersonalizeAlgorithm[LMEModel, IndividualParameters]): r"""Personalization algorithm associated to :class:`~leaspy.models.LMEModel`. Parameters ---------- settings : :class:`.AlgorithmSettings` Algorithm settings (none yet). Most LME parameters are defined within LME model and LME fit algorithm. Attributes ---------- name : ``'lme_personalize'`` """ name: AlgorithmName = AlgorithmName.PERSONALIZE_LME deterministic: bool = True def _compute_individual_parameters( self, model: LMEModel, dataset: Dataset, **kwargs ) -> IndividualParameters: individual_parameters = IndividualParameters() residuals = [] if model.features != dataset.headers: raise ValueError( "Your data and the model you are using for personalisation do not have the same headers." ) for individual in range(dataset.n_individuals): idx = dataset.indices[individual] times = dataset.get_times_patient(individual) values = dataset.get_values_patient(individual).numpy() ind_ip, ind_residuals = self._get_individual_random_effects_and_residuals( model, times, values ) residuals.append(ind_residuals) individual_parameters.add_individual_parameters(str(idx), ind_ip) return individual_parameters @staticmethod def _remove_nans(values, times): values = values.flatten() mask = ~np.isnan(values) values = values[mask] times = times[mask] return values, times @classmethod def _get_individual_random_effects_and_residuals( cls, model: LMEModel, times, values ): values, times = cls._remove_nans(values, times) ages_norm = (times - model.parameters["ages_mean"]) / model.parameters[ "ages_std" ] X = sm.add_constant(ages_norm, prepend=True, has_constant="add") residuals = values - X @ model.parameters["fe_params"] cov_re_unscaled_inv = model.parameters["cov_re_unscaled_inv"] if not model.with_random_slope_age: # only valid with random intercept ("Z"=[1,...,1] and cov_re is a scalar) n = len(values) # number of effective observations random_intercept = np.sum(residuals) / (n + cov_re_unscaled_inv.item()) re_d = {"random_intercept": random_intercept} residuals = residuals - random_intercept else: # valid anytime (exog_re = X) re = cls._generic_get_random_effects( residuals, X, cov_re_unscaled_inv ).squeeze() re_d = {"random_intercept": re[0], "random_slope_age": re[1]} residuals = residuals - X @ re return re_d, residuals @staticmethod def _generic_get_random_effects(resid, Z, cov_re_unscaled_inv): """ The conditional means of random effects given the data. cf. http://sia.webpopix.org/lme.html#estimation-of-the-random-effects Parameters ---------- resid : :class:`numpy.ndarray` (n_i,) endog - fixed_effects * exog Z : :class:`numpy.ndarray` (n_i, k_re) exog_re cov_re_unscaled_inv : :class:`numpy.ndarray` (k_re, k_re) inverse Returns ------- random_effects : :class:`numpy.ndarray` (k_re,) For a given individual """ tZZ = np.dot(Z.T, Z) G = np.linalg.inv(tZZ + cov_re_unscaled_inv) return np.dot(G, np.dot(Z.T, resid)) # less costly to multiply in this order
[docs] def set_output_manager(self, output_settings): """ Not implemented. """ if output_settings is not None: warnings.warn( "Settings logs in lme personalize algorithm is not supported." ) return