"""This module defines the `AlgorithmType`, `AlgorithmName` and `AbstractAlgo` classes"""
import inspect
import random
import sys
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from enum import Enum
from typing import Generic, Optional, Type, TypeVar, Union
import numpy as np
import torch
from leaspy.exceptions import LeaspyAlgoInputError
from leaspy.io.data import Dataset
from .settings import AlgorithmSettings, OutputsSettings
__all__ = [
"BaseAlgorithm",
"IterativeAlgorithm",
"AlgorithmType",
"AlgorithmName",
"get_algorithm_type",
"get_algorithm_class",
"algorithm_factory",
"ReturnType",
"ModelType",
]
ModelType = TypeVar("ModelType", bound="BaseModel")
ReturnType = TypeVar("ReturnType")
[docs]
class AlgorithmType(str, Enum):
"""The type of the algorithms."""
FIT = "fit"
PERSONALIZE = "personalize"
SIMULATE = "simulate"
[docs]
class AlgorithmName(str, Enum):
"""The available algorithms in Leaspy."""
FIT_MCMC_SAEM = "mcmc_saem"
FIT_LME = "lme_fit"
PERSONALIZE_SCIPY_MINIMIZE = "scipy_minimize"
PERSONALIZE_MEAN_POSTERIOR = "mean_posterior"
PERSONALIZE_MODE_POSTERIOR = "mode_posterior"
PERSONALIZE_CONSTANT = "constant_prediction"
PERSONALIZE_LME = "lme_personalize"
SIMULATE = "simulate"
[docs]
class BaseAlgorithm(ABC, Generic[ModelType, ReturnType]):
"""Base class containing common methods for all algorithm classes.
Parameters
----------
settings : :class:`~leaspy.algo.AlgorithmSettings`
The specifications of the algorithm as a :class:`~leaspy.algo.AlgorithmSettings` instance.
Attributes
----------
name : :class:`~leaspy.algo.base.AlgorithmName`
Name of the algorithm.
family : :class:`~leaspy.algo.base.AlgorithmType`
Family of the algorithm.
deterministic : :obj:`bool`
True, if and only if algorithm does not involve randomness.
Setting a seed will have no effect on such algorithms.
algo_parameters : :obj:`dict`
Contains the algorithm's parameters. Those are controlled by
the :attr:`leaspy.algo.AlgorithmSettings.parameters` class attribute.
seed : :obj:`int`, optional
Seed used by :mod:`numpy` and :mod:`torch`.
"""
name: AlgorithmName = None
family: AlgorithmType = None
deterministic: bool = False
def __init__(self, settings: AlgorithmSettings):
if settings.name != self.name:
raise LeaspyAlgoInputError(
f"Inconsistent naming: {settings.name} != {self.name}"
)
self.seed = settings.seed
# we deepcopy the settings.parameters, because those algo_parameters may be
# modified within algorithm (e.g. `n_burn_in_iter`) and we would not want the original
# settings parameters to be also modified (e.g. to be able to reuse them without any trouble)
self.algo_parameters = deepcopy(settings.parameters)
self.output_manager = None
[docs]
@abstractmethod
def set_output_manager(self, output_settings: OutputsSettings) -> None:
raise NotImplementedError
@staticmethod
def _initialize_seed(seed: Optional[int]):
"""Set :mod:`random`, :mod:`numpy` and :mod:`torch` seeds and display it (static method).
Notes - numpy seed is needed for reproducibility for the simulation algorithm which use the scipy kernel
density estimation function. Indeed, scipy use numpy random seed.
Parameters
----------
seed : int
The wanted seed
"""
if seed is not None:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
# TODO: use logger instead (level=INFO)
# print(f" ==> Setting seed to sdfsdf {seed}") # Silenced to reduce verbosity
[docs]
def run(
self, model: ModelType, dataset: Optional[Dataset] = None, **kwargs
) -> ReturnType:
"""Main method, run the algorithm.
Parameters
----------
model : :class:`~leaspy.models.BaseModel`
The used model.
dataset : :class:`~leaspy.io.data.Dataset`
Contains all the subjects' observations with corresponding timepoints, in torch format to speed up computations.
Returns
-------
ReturnType:
Depends on algorithm class.
See Also
--------
:class:`.AbstractFitAlgo`
:class:`.AbstractPersonalizeAlgo`
"""
if self.algo_parameters is None:
raise LeaspyAlgoInputError(
f"The `{self.name}` algorithm was not properly created."
)
self._initialize_seed(self.seed)
time_beginning = time.time()
run_params = inspect.signature(self._run).parameters
run_kwargs = {}
if "dataset" in run_params:
run_kwargs["dataset"] = dataset
if "kwargs" in run_params or any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in run_params.values()
):
run_kwargs.update(kwargs)
# Appeler _run avec uniquement les arguments attendus
output = self._run(model, **run_kwargs)
duration_in_seconds = time.time() - time_beginning
if self.algo_parameters.get("progress_bar"):
print()
print(
f"\n{self.family.value.title()} with `{self.name}` took: {self._duration_to_str(duration_in_seconds)}"
)
return output
@abstractmethod
def _run(
self,
model: ModelType,
dataset: Optional[Dataset] = None,
**kwargs,
) -> ReturnType:
"""Run the algorithm (actual implementation), to be implemented in children classes.
Parameters
----------
model : :class:`~leaspy.models.BaseModel`
The used model.
dataset : :class:`~leaspy.io.data.Dataset`
Contains all the subjects' observations with corresponding timepoints, in torch format to speed up computations.
Returns
-------
ReturnType :
Depends on the algorithm.
See Also
--------
:class:`.AbstractFitAlgo`
:class:`.AbstractPersonalizeAlgo`
"""
raise NotImplementedError
[docs]
def load_parameters(self, parameters: dict):
"""Update the algorithm's parameters by the ones in the given dictionary.
The keys in the input which does not belong to the algorithm's parameters are ignored.
Parameters
----------
parameters : :obj:`dict`
Contains the pairs (key, value) of the requested parameters
Examples
--------
>>> from leaspy.algo import AlgorithmSettings, algorithm_factory, OutputsSettings
>>> my_algo = algorithm_factory(AlgorithmSettings("mcmc_saem"))
>>> my_algo.algo_parameters
{'progress_bar': True,
'n_iter': 10000,
'n_burn_in_iter': 9000,
'n_burn_in_iter_frac': 0.9,
'burn_in_step_power': 0.8,
'random_order_variables': True,
'sampler_ind': 'Gibbs',
'sampler_ind_params': {'acceptation_history_length': 25,
'mean_acceptation_rate_target_bounds': [0.2, 0.4],
'adaptive_std_factor': 0.1},
'sampler_pop': 'Gibbs',
'sampler_pop_params': {'random_order_dimension': True,
'acceptation_history_length': 25,
'mean_acceptation_rate_target_bounds': [0.2, 0.4],
'adaptive_std_factor': 0.1},
'annealing': {'do_annealing': False,
'initial_temperature': 10,
'n_plateau': 10,
'n_iter': None,
'n_iter_frac': 0.5}}
>>> parameters = {'n_iter': 5000, 'n_burn_in_iter': 4000}
>>> my_algo.load_parameters(parameters)
>>> my_algo.algo_parameters
{'progress_bar': True,
'n_iter': 5000,
'n_burn_in_iter': 4000,
'n_burn_in_iter_frac': 0.9,
'burn_in_step_power': 0.8,
'random_order_variables': True,
'sampler_ind': 'Gibbs',
'sampler_ind_params': {'acceptation_history_length': 25,
'mean_acceptation_rate_target_bounds': [0.2, 0.4],
'adaptive_std_factor': 0.1},
'sampler_pop': 'Gibbs',
'sampler_pop_params': {'random_order_dimension': True,
'acceptation_history_length': 25,
'mean_acceptation_rate_target_bounds': [0.2, 0.4],
'adaptive_std_factor': 0.1},
'annealing': {'do_annealing': False,
'initial_temperature': 10,
'n_plateau': 10,
'n_iter': None,
'n_iter_frac': 0.5}}
"""
for k, v in parameters.items():
if k in self.algo_parameters.keys():
previous_v = self.algo_parameters[k]
# TODO? log it instead (level=INFO or DEBUG)
print(f"Replacing {k} parameter from value {previous_v} to value {v}")
self.algo_parameters[k] = v
@staticmethod
def _duration_to_str(seconds: float, *, seconds_fmt=".2f") -> str:
"""
Convert a float representing computation time in seconds to a string giving time in hour, minutes and
seconds ``%h %min %s``.
If less than one hour, do not return hours. If less than a minute, do not return minutes.
Parameters
----------
seconds : :obj:`float`
Computation time
Returns
-------
str
Time formatting in hour, minutes and seconds.
"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60 # float
res = ""
if m:
if h:
res += f"{h}h "
res += f"{m}m "
res += f"{s:{seconds_fmt}}s"
return res
def __str__(self) -> str:
out = "=== ALGO ===\n"
out += f"Instance of {self.name} algo"
if hasattr(self, "algorithm_device"):
out += f" [{self.algorithm_device.upper()}]"
return out
[docs]
class IterativeAlgorithm(BaseAlgorithm[ModelType, ReturnType]):
def __init__(self, settings: AlgorithmSettings):
super().__init__(settings)
self.current_iteration: int = 0
@staticmethod
def _display_progress_bar(
iteration: int, n_iter: int, suffix: str, n_step_default: int = 50
):
"""
Display a progression bar while running algorithm, simply based on `sys.stdout`.
TODO: use tqdm instead?
Parameters
----------
iteration : :obj:`int` >= 0 or -1
Current iteration of the algorithm.
If a positive integer, it is the current iteration of the algorithm. If equals to '-1' then it initialises the bar.
The final current iteration should be `n_iter - 1`
n_iter : :obj:`int`
Total iterations' number of the algorithm.
suffix : :obj:`str`
Used to differentiate types of algorithms:
* for fit algorithms: ``suffix = 'iterations'``
* for personalization algorithms: ``suffix = 'subjects'``.
n_step_default : :obj:`int`, default 50
The size of the progression bar.
"""
n_step = min(n_step_default, n_iter)
if iteration == -1:
sys.stdout.write("\r")
sys.stdout.write("|" + "-" * n_step + "| 0/%d " % n_iter + suffix)
sys.stdout.flush()
else:
print_every_iter = n_iter // n_step
iteration_plus_1 = iteration + 1
display = iteration_plus_1 % print_every_iter
if display == 0:
nbar = iteration_plus_1 // print_every_iter
sys.stdout.write("\r")
sys.stdout.write(
f"|{'#' * nbar}{'-' * (n_step - nbar)}| {iteration_plus_1}/{n_iter} {suffix}"
)
sys.stdout.flush()
def _get_progress_str(self) -> Optional[str]:
# TODO in a special mixin for sequential algos with nb of iters (MCMC fit, MCMC personalize)
if not hasattr(self, "current_iteration"):
return None
return f"Iteration {self.current_iteration} / {self.algo_parameters['n_iter']}"
def __str__(self) -> str:
out = super().__str__()
progress_str = self._get_progress_str()
if progress_str:
out += "\n" + progress_str
return out
[docs]
def get_algorithm_type(name: Union[str, AlgorithmName]) -> AlgorithmType:
"""Return the algorithm type.
Parameters
----------
name : :obj:`str` or :class:`~leaspy.algo.base.AlgorithmName`
The name of the algorithm.
Returns
-------
algorithm type: :class:`leaspy.algo.AlgorithmType`
"""
name = AlgorithmName(name)
if name in (AlgorithmName.FIT_LME, AlgorithmName.FIT_MCMC_SAEM):
return AlgorithmType.FIT
if name == AlgorithmName.SIMULATE:
return AlgorithmType.SIMULATE
if name in (
AlgorithmName.PERSONALIZE_SCIPY_MINIMIZE,
AlgorithmName.PERSONALIZE_MEAN_POSTERIOR,
AlgorithmName.PERSONALIZE_MODE_POSTERIOR,
AlgorithmName.PERSONALIZE_CONSTANT,
AlgorithmName.PERSONALIZE_LME,
):
return AlgorithmType.PERSONALIZE
[docs]
def get_algorithm_class(name: Union[str, AlgorithmName]) -> Type[BaseAlgorithm]:
"""Return the algorithm class.
Parameters
----------
name : :obj:`str` or :class:`~leaspy.algo.base.AlgorithmName`
The name of the algorithm.
Returns
-------
algorithm class: :class:`~leaspy.algo.BaseAlgorithm`
"""
name = AlgorithmName(name)
if name == AlgorithmName.FIT_MCMC_SAEM:
from .fit import TensorMcmcSaemAlgorithm
return TensorMcmcSaemAlgorithm
if name == AlgorithmName.FIT_LME:
from .fit import LMEFitAlgorithm
return LMEFitAlgorithm
if name == AlgorithmName.PERSONALIZE_SCIPY_MINIMIZE:
from .personalize import ScipyMinimizeAlgorithm
return ScipyMinimizeAlgorithm
if name == AlgorithmName.PERSONALIZE_MEAN_POSTERIOR:
from .personalize import MeanPosteriorAlgorithm
return MeanPosteriorAlgorithm
if name == AlgorithmName.PERSONALIZE_MODE_POSTERIOR:
from .personalize import ModePosteriorAlgorithm
return ModePosteriorAlgorithm
if name == AlgorithmName.PERSONALIZE_CONSTANT:
from .personalize import ConstantPredictionAlgorithm
return ConstantPredictionAlgorithm
if name == AlgorithmName.PERSONALIZE_LME:
from .personalize import LMEPersonalizeAlgorithm
return LMEPersonalizeAlgorithm
if name == AlgorithmName.SIMULATE:
from .simulate import SimulationAlgorithm
return SimulationAlgorithm
[docs]
def algorithm_factory(settings: AlgorithmSettings) -> BaseAlgorithm:
"""Return the requested algorithm based on the provided settings.
Parameters
----------
settings : :class:`leaspy.algo.AlgorithmSettingss`
The algorithm settings.
Returns
-------
algorithm : child class of :class:`~leaspy.algo.BaseAlgorithm`
The requested algorithm. If it exists, it will be compatible with algorithm family.
"""
algorithm = get_algorithm_class(settings.name)(settings)
algorithm.set_output_manager(settings.logs)
return algorithm