import warnings
from typing import Optional
import numpy as np
import pandas as pd
import torch
from leaspy.exceptions import LeaspyInputError
from leaspy.utils.distributions import discrete_sf_from_pdf
from leaspy.utils.typing import FeatureType, KwargsType
from .data import Data
from .individual_data import IndividualData
__all__ = ["Dataset"]
[docs]
class Dataset:
"""
Data container based on :class:`torch.Tensor`, used to run algorithms.
Parameters
----------
data : :class:`~leaspy.io.data.Data`
Create `Dataset` from `Data` object
no_warning : :obj:`bool`, default False
Whether to deactivate warnings that are emitted by methods of this dataset instance.
We may want to deactivate them because we rebuild a dataset per individual in scipy minimize.
Indeed, all relevant warnings certainly occurred for the overall dataset.
Attributes
----------
headers : :obj:`list` [:obj:`str`]
Features names
dimension : :obj:`int`
Number of features
n_individuals : :obj:`int`
Number of individuals
indices : :obj:`list` [:class:`~leaspy.utils.typing.IDType`]
Order of patients
event_time : :obj:`torch.FloatTensor`
Time of an event, if the event is censored, the time correspond to the last patient observation
event_bool : :obj:`torch.BoolTensor`
Boolean to indicate if an event is censored or not: 1 observed, 0 censored
n_visits_per_individual : :obj:`list` [:obj:`int`]
Number of visits per individual
n_visits_max : :obj:`int`
Maximum number of visits for one individual
n_visits : :obj:`int`
Total number of visits
n_observations_per_ind_per_ft : :obj:`torch.LongTensor`, shape (n_individuals, dimension)
Number of observations (not taking into account missing values) per individual per feature
n_observations_per_ft : :obj:`torch.LongTensor`, shape (dimension,)
Total number of observations per feature
n_observations : :obj:`int`
Total number of observations
timepoints : :obj:`torch.FloatTensor`, shape (n_individuals, n_visits_max)
Ages of patients at their different visits
values : :obj:`torch.FloatTensor`, shape (n_individuals, n_visits_max, dimension)
Values of patients for each visit for each feature
mask : :obj:`torch.FloatTensor`, shape (n_individuals, n_visits_max, dimension)
Binary mask associated to values.
If 1: value is meaningful
If 0: value is meaningless (either was nan or does not correspond to a real visit - only here for padding)
L2_norm_per_ft : :obj:`torch.FloatTensor`, shape (dimension,)
Sum of all non-nan squared values, feature per feature
L2_norm : scalar :obj:`torch.FloatTensor`
Sum of all non-nan squared values
no_warning : :obj:`bool`, default False
Whether to deactivate warnings that are emitted by methods of this dataset instance.
We may want to deactivate them because we rebuild a dataset per individual in scipy minimize.
Indeed, all relevant warnings certainly occurred for the overall dataset.
_one_hot_encoding : :obj:`dict` [:obj:`bool`, :obj:`torch.LongTensor`]
Values of patients for each visit for each feature, but tensorized into a one-hot encoding (pdf or sf)
Shapes of tensors are (n_individuals, n_visits_max, dimension, max_ordinal_level [-1 when `sf=True`])
Raises
------
:exc:`.LeaspyInputError`
if data, model or algo are not compatible together.
"""
def __init__(self, data: Data, *, no_warning: bool = False):
# Patients information
self.n_individuals = data.n_individuals
self.indices = list(data.individuals.keys())
# Longitudinal outcome information
self.headers: list[FeatureType] = data.headers
self.dimension: int = data.dimension
self.n_visits: int = data.n_visits
self.timepoints: Optional[torch.FloatTensor] = None
self.values: Optional[torch.FloatTensor] = None
self.mask: Optional[torch.FloatTensor] = None
self.n_observations: Optional[int] = None
self.n_observations_per_ft: Optional[torch.LongTensor] = None
self.n_observations_per_ind_per_ft: Optional[torch.LongTensor] = None
self.n_visits_per_individual: Optional[list[int]] = None
self.n_visits_max: Optional[int] = None
# Event information
self.event_time_name: Optional[str] = data.event_time_name
self.event_bool_name: Optional[str] = data.event_bool_name
self.event_time: Optional[torch.FloatTensor] = None
self.event_bool: Optional[torch.IntTensor] = None
# Covariate information
self.covariate_names: Optional[list[str]] = data.covariate_names
self.covariates: Optional[torch.IntTensor] = None
# internally used by ordinal models only (cache)
self._one_hot_encoding: Optional[dict[bool, torch.LongTensor]] = None
self.L2_norm_per_ft: Optional[torch.FloatTensor] = None
self.L2_norm: Optional[torch.FloatTensor] = None
if data.dimension:
self._construct_values(data)
self._construct_timepoints(data)
self._compute_L2_norm()
if self.event_time_name:
self._construct_events(data)
if self.covariate_names:
self._construct_covariates(data)
self.no_warning = no_warning
def _construct_values(self, data: Data):
"""
Construct the values tensor and the mask tensor from the data.
The values tensor is of shape (n_individuals, n_visits_max, dimension).
Parameters
----------
data : :class:`~leaspy.io.data.Data`
The data from which to construct the values and mask tensors.
"""
self.n_visits_per_individual = [len(_.timepoints) for _ in data]
self.n_visits_max = (
max(self.n_visits_per_individual) if self.n_visits_per_individual else 0
) # handle case when empty dataset
values = torch.zeros((self.n_individuals, self.n_visits_max, self.dimension))
padding_mask = torch.zeros_like(values)
# TODO missing values in mask ?
for i, nb_vis in enumerate(self.n_visits_per_individual):
# PyTorch 1.10 warns: Creating a tensor from a list of numpy.ndarrays is extremely slow.
# Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor.
# TODO: IndividualData.observations is really badly constructed (list of numpy 1D arrays), we should change this...
indiv_values = torch.tensor(
np.array(data[i].observations), dtype=torch.float32
)
values[i, 0:nb_vis, :] = indiv_values
padding_mask[i, 0:nb_vis, :] = 1.0
mask_missingvalues = (~torch.isnan(values)).float()
# mask should be 0 on visits outside individual's existing visits (he may have fewer visits than the individual with maximum nb of visits)
# (we need to enforce it here because we padded values with 0, not with nan, so actual mask is 1 on these fictive values)
mask = padding_mask * mask_missingvalues
values[torch.isnan(values)] = 0.0 # Set values of missing values to 0.
self.values = values
self.mask = mask
# number of non-nan observations (different levels of aggregation)
self.n_observations_per_ind_per_ft = mask.sum(dim=1).int()
self.n_observations_per_ft = self.n_observations_per_ind_per_ft.sum(dim=0)
self.n_observations = self.n_observations_per_ft.sum().item()
def _construct_timepoints(self, data: Data):
"""
Construct the timepoints tensor from the data.
Parameters
----------
data : :class:`~leaspy.io.data.Data`
The data from which to construct the timepoints tensor.
"""
self.timepoints = torch.zeros((self.n_individuals, self.n_visits_max))
nbs_vis = [len(_.timepoints) for _ in data]
for i, nb_vis in enumerate(nbs_vis):
self.timepoints[i, 0:nb_vis] = torch.tensor(data[i].timepoints)
def _construct_events(self, data: Data):
"""
Construct the event time and event boolean tensors from the data.
Parameters
----------
data : :class:`~leaspy.io.data.Data`
The data from which to construct the event time and event boolean tensors.
"""
self.event_time = torch.tensor(
np.array([_.event_time for _ in data]), dtype=torch.double
)
self.event_bool = torch.tensor(
np.array([_.event_bool for _ in data]), dtype=torch.bool
)
def _construct_covariates(self, data: Data):
"""
Construct the covariates tensor from the data.
Parameters
----------
data : :class:`~leaspy.io.data.Data`
The data from which to construct the covariates tensor.
"""
self.covariates = torch.tensor(
np.array([_.covariates for _ in data]), dtype=torch.int
)
def _compute_L2_norm(self):
"""
Compute the L2 norm of the values tensor, feature per feature and overall.
The L2 norm is the sum of the squared values, ignoring nans.
"""
self.L2_norm_per_ft = torch.sum(
self.mask.float() * self.values * self.values, dim=(0, 1)
) # 1D tensor of shape (dimension,)
self.L2_norm = self.L2_norm_per_ft.sum() # sum on all features
[docs]
def get_times_patient(self, i: int) -> torch.FloatTensor:
"""
Get ages for patient number ``i``
Parameters
----------
i : :obj:`int`
The index of the patient (<!> not its identifier)
Returns
-------
:obj:`torch.Tensor`, shape (n_obs_of_patient,)
Contains float
"""
return self.timepoints[i, : self.n_visits_per_individual[i]]
[docs]
def get_event_patient(self, idx_patient: int) -> tuple[torch.Tensor, torch.Tensor]:
"""
Get ages at event for patient number ``idx_patient``
Parameters
----------
idx_patient : :obj:`int`
The index of the patient (<!> not its identifier)
Returns
-------
:obj:`tuple` [:obj:`torch.Tensor`, :obj:`torch.Tensor`] , shape (n_obs_of_patient,)
Contains float
"""
if self.event_time is not None and self.event_bool is not None:
return self.event_time[idx_patient], self.event_bool[idx_patient]
raise ValueError("Dataset has no event. Please verify your data.")
[docs]
def get_covariates_patient(self, idx_patient: int) -> torch.IntTensor:
"""
Get covariates for patient number ``idx_patient``
Parameters
----------
idx_patient : :obj:`int`
The index of the patient (<!> not its identifier)
Returns
-------
:obj:`torch.Tensor`, shape (n_obs_of_patient,)
Contains float
Raises
------
:exc:`.ValueError`
If the dataset has no covariates.
"""
if self.covariates is not None:
return self.covariates[idx_patient]
raise ValueError("Dataset has no covariates. Please verify your data.")
[docs]
def get_values_patient(self, i: int, *, adapt_for_model=None) -> torch.FloatTensor:
"""
Get values for patient number ``i``, with nans.
Parameters
----------
i : :obj:`int`
The index of the patient (<!> not its identifier)
adapt_for_model : None, default or :class:`~leaspy.models.mcmc_saem_compatible.McmcSaemCompatibleModel`
The values returned are suited for this model.
In particular:
* For model with `noise_model='ordinal'` will return one-hot-encoded values [P(X = l), l=0..ordinal_max_level]
* For model with `noise_model='ordinal_ranking'` will return survival function values [P(X > l), l=0..ordinal_max_level-1]
If None, we return the raw values, whatever the model is.
Returns
-------
:obj:`torch.Tensor`, shape (n_obs_of_patient, dimension [, extra_dimension_for_ordinal_models])
Contains float or nans
"""
# default case (raw values whatever the model)
values_to_pick_from = self.values
nans = self.mask[i, : self.n_visits_per_individual[i], :] == 0
# customization when ordinal model
if adapt_for_model is not None and getattr(
adapt_for_model, "is_ordinal", False
):
# we directly fetch the one-hot encoded values (pdf or sf depending on precise `noise_model`)
values_to_pick_from = self.get_one_hot_encoding(
sf=adapt_for_model.is_ordinal_ranking,
ordinal_infos=adapt_for_model.ordinal_infos,
).float()
# we restrict to the right individual and mask the irrelevant data
values_with_nans = (
values_to_pick_from[i, : self.n_visits_per_individual[i], ...]
.clone()
.detach()
)
values_with_nans[nans, ...] = float("nan")
return values_with_nans
[docs]
def to_pandas(self, apply_headers: bool = False) -> pd.DataFrame:
"""
Convert dataset to a `DataFrame` with ['ID', 'TIME'] index, with all covariates, events and repeated measures if
apply_headers is False, and only the repeated measures otherwise.
Parameters
----------
apply_headers : :obj:`bool`
Enable to select only the columns that are needed for leaspy fit (headers attribute)
Returns
-------
:obj:`pandas.DataFrame`
DataFrame with index ['ID', 'TIME'] and columns corresponding to the features, events and covariates.
Raises
------
:exc:`.LeaspyInputError`
If the index of the DataFrame is not unique or contains invalid values.
"""
to_concat = []
for i, idx in enumerate(self.indices):
ind_pat = IndividualData(idx)
if self.event_time is not None:
pat_event_time, pat_event_bool = self.get_event_patient(i)
ind_pat.add_event(
pat_event_time.cpu().tolist(), pat_event_bool.cpu().tolist()
)
if self.covariates is not None:
pat_covariates = self.get_covariates_patient(i)
ind_pat.add_covariates(pat_covariates.cpu().tolist())
if self.values is not None:
times = self.get_times_patient(i).cpu().numpy()
x = self.get_values_patient(i).cpu().numpy()
ind_pat.add_observations(times, x)
to_concat.append(
ind_pat.to_frame(
self.headers,
self.event_time_name,
self.event_bool_name,
self.covariate_names,
)
)
df = pd.concat(to_concat).sort_index()
if apply_headers:
df = df[self.headers]
if not df.index.is_unique:
raise LeaspyInputError("Index of DataFrame is not unique.")
if not df.index.to_frame().notnull().all(axis=None):
raise LeaspyInputError("Index of DataFrame contains invalid values.")
return df
[docs]
def move_to_device(self, device: torch.device) -> None:
"""
Moves the dataset to the specified device.
Parameters
----------
device : :obj:`torch.device`
"""
for attribute_name in dir(self):
if attribute_name.startswith("__"):
continue
attribute = getattr(self, attribute_name)
if isinstance(attribute, torch.Tensor):
setattr(self, attribute_name, attribute.to(device))
## we have to manually put other variables to the new device
# Dictionary of one-hot encoded values
if self._one_hot_encoding is not None:
self._one_hot_encoding = {
k: t.to(device) for k, t in self._one_hot_encoding.items()
}
[docs]
def get_one_hot_encoding(
self, *, sf: bool, ordinal_infos: KwargsType
) -> torch.LongTensor:
"""
Builds the one-hot encoding of ordinal data once and for all and returns it.
Parameters
----------
sf : :obj:`bool`
Whether the vector should be the survival function [1(X > l), l=0..max_level-1]
instead of the probability density function [1(X=l), l=0..max_level]
ordinal_infos : :class:`~leaspy.utils.typing.KwargsType`
All the hyperparameters concerning ordinal modelling (in particular maximum level per features)
Returns
-------
:obj:`torch.LongTensor`
One-hot encoding of data values.
Raises
------
:exc:`.LeaspyInputError`
If the values are not non-negative integers or if the features in `ordinal_infos` are not consistent with the dataset headers.
"""
if self._one_hot_encoding is not None:
return self._one_hot_encoding[sf]
## Check the data & construct the one-hot encodings once for all for fast look-up afterwards
# Check for values different than non-negative integers
if (self.values != self.values.round()).any() or (self.values < 0).any():
raise LeaspyInputError(
"Please make sure your data contains only integers >= 0 when using ordinal noise modelling."
)
# First of all check consistency of features given in ordinal_infos compared to the ones in the dataset (names & order!)
ordinal_feat_names = list(ordinal_infos["max_levels"])
if ordinal_feat_names != self.headers:
raise LeaspyInputError(
f"Features stored in ordinal model ({ordinal_feat_names}) are not consistent with features in data ({self.headers})"
)
# Now check that integers are within the expected range, per feature [0, max_level_ft]
# (masked values are encoded by 0 at this point)
vals = self.values.long()
vals_issues = {
"unexpected": [],
"missing": [],
}
for ft_i, (ft, max_level_ft) in enumerate(ordinal_infos["max_levels"].items()):
expected_codes = set(range(0, max_level_ft + 1)) # max level is included
vals_ft = vals[:, :, ft_i]
if not self.no_warning:
# replacing masked values by -1 (which was guaranteed not to be part of input from first check, all >= 0)
actual_vals_ft = vals_ft.where(
self.mask[:, :, ft_i].bool(), torch.tensor(-1)
)
actual_codes = set(actual_vals_ft.unique().tolist()).difference({-1})
unexpected_codes = sorted(actual_codes.difference(expected_codes))
missing_codes = sorted(expected_codes.difference(actual_codes))
if len(unexpected_codes):
vals_issues["unexpected"].append(
f"- {ft} [[0..{max_level_ft}]]: {unexpected_codes} were unexpected"
)
if len(missing_codes):
vals_issues["missing"].append(
f"- {ft} [[0..{max_level_ft}]]: {missing_codes} are missing"
)
# clip the values (per feature)
# we must keep this even if no_warning enabled
vals[:, :, ft_i] = vals_ft.clamp(0, max_level_ft)
if not self.no_warning and len(vals_issues["unexpected"]):
warnings.warn(
f"Some features have unexpected codes (they were clipped to the maximum known level):\n"
+ "\n".join(vals_issues["unexpected"])
)
if not self.no_warning and len(vals_issues["missing"]):
warnings.warn(
f"Some features have missing codes:\n"
+ "\n".join(vals_issues["missing"])
)
# one-hot encode all the values after the checks & clipping
vals_pdf = torch.nn.functional.one_hot(
vals, num_classes=ordinal_infos["max_level"] + 1
)
# build the survival function by simple (1 - cumsum) and remove the useless P(X >= 0) = 1
vals_sf = discrete_sf_from_pdf(vals_pdf)
# cache the values to retrieve them fast afterwards
self._one_hot_encoding = {False: vals_pdf, True: vals_sf}
return self._one_hot_encoding[sf]