Source code for domino._slice.domino

from __future__ import annotations

import warnings
from functools import wraps
from typing import Union

import meerkat as mk
import numpy as np
import sklearn.cluster as cluster
from scipy import linalg
from scipy.special import logsumexp
from sklearn.decomposition import PCA
from sklearn.exceptions import ConvergenceWarning
from sklearn.mixture import GaussianMixture
from sklearn.mixture._base import _check_X, check_random_state
from sklearn.mixture._gaussian_mixture import (
    _compute_precision_cholesky,
    _estimate_gaussian_covariances_diag,
    _estimate_gaussian_covariances_full,
    _estimate_gaussian_covariances_spherical,
    _estimate_gaussian_covariances_tied,
)
from sklearn.preprocessing import label_binarize
from sklearn.utils.validation import check_is_fitted
from tqdm.auto import tqdm

from domino.utils import unpack_args

from .abstract import Slicer


[docs]class DominoSlicer(Slicer): r""" Slice Discovery based on the Domino Mixture Model. Discover slices by jointly modeling a mixture of input embeddings (e.g. activations from a trained model), class labels, and model predictions. This encourages slices that are homogeneous with respect to error type (e.g. all false positives). Examples -------- Suppose you've trained a model and stored its predictions on a dataset in a `Meerkat DataPanel <https://github.com/robustness-gym/meerkat>`_ with columns "emb", "target", and "pred_probs". After loading the DataPanel, you can discover underperforming slices of the validation dataset with the following: .. code-block:: python from domino import DominoSlicer dp = ... # Load dataset into a Meerkat DataPanel # split dataset valid_dp = dp.lz[dp["split"] == "valid"] test_dp = dp.lz[dp["split"] == "test"] domino = DominoSlicer() domino.fit( data=valid_dp, embeddings="emb", targets="target", pred_probs="pred_probs" ) dp["domino_slices"] = domino.transform( data=test_dp, embeddings="emb", targets="target", pred_probs="pred_probs" ) Args: n_slices (int, optional): The number of slices to discover. Defaults to 5. covariance_type (str, optional): The type of covariance parameter :math:`\mathbf{\Sigma}` to use. Same as in sklearn.mixture.GaussianMixture. Defaults to "diag", which is recommended. n_pca_components (Union[int, None], optional): The number of PCA components to use. If ``None``, then no PCA is performed. Defaults to 128. n_mixture_components (int, optional): The number of clusters in the mixture model, :math:`\bar{k}`. This differs from ``n_slices`` in that the ``DominoSDM`` only returns the top ``n_slices`` with the highest error rate of the ``n_mixture_components``. Defaults to 25. y_log_likelihood_weight (float, optional): The weight :math:`\gamma` applied to the :math:`P(Y=y_{i} | S=s)` term in the log likelihood during the E-step. Defaults to 1. y_hat_log_likelihood_weight (float, optional): The weight :math:`\hat{\gamma}` applied to the :math:`P(\hat{Y} = h_\theta(x_i) | S=s)` term in the log likelihood during the E-step. Defaults to 1. max_iter (int, optional): The maximum number of iterations to run. Defaults to 100. init_params (str, optional): The initialization method to use. Options are the same as in sklearn.mixture.GaussianMixture plus one addition, "confusion". If "confusion", the clusters are initialized such that almost all of the examples in a cluster come from same cell in the confusion matrix. See Notes below for more details. Defaults to "confusion". confusion_noise (float, optional): Only used if ``init_params="confusion"``. The scale of noise added to the confusion matrix initialization. See notes below for more details. Defaults to 0.001. random_state (Union[int, None], optional): The random seed to use when initializing the parameters. Notes ----- The mixture model is an extension of a standard Gaussian Mixture Model. The model is based on the assumption that data is generated according to the following generative process. * Each example belongs to one of :math:`\bar{k}` slices. This slice :math:`S` is sampled from a categorical distribution :math:`S \sim Cat(\mathbf{p}_S)` with parameter :math:`\mathbf{p}_S \in\{\mathbf{p} \in \mathbb{R}_+^{\bar{k}} : \sum_{i = 1}^{\bar{k}} p_i = 1\}` (see ``DominoSDM.mm.weights_``). * Given the slice :math:`S'`, the embeddings are normally distributed :math:`Z | S \sim \mathcal{N}(\mathbf{\mu}, \mathbf{\Sigma}`) with parameters mean :math:`\mathbf{\mu} \in \mathbb{R}^d` (see ``DominoSDM.mm.means_``) and :math:`\mathbf{\Sigma} \in \mathbb{S}^{d}_{++}` (see ``DominoSDM.mm.covariances_``; normally this parameter is constrained to the set of symmetric positive definite :math:`d \\times d` matrices, however the argument ``covariance_type`` allows for other constraints). * Given the slice, the labels vary as a categorical :math:`Y |S \sim Cat(\mathbf{p})` with parameter :math:`\mathbf{p} \in \{\mathbf{p} \in \mathbb{R}^c_+ : \sum_{i = 1}^c p_i = 1\}` (see ``DominoSDM.mm.y_probs``). * Given the slice, the model predictions also vary as a categorical :math:`\hat{Y} | S \sim Cat(\mathbf{\hat{p}})` with parameter :math:`\mathbf{\hat{p}} \in \{\mathbf{\hat{p}} \in \mathbb{R}^c_+ : \sum_{i = 1}^c \hat{p}_i = 1\}` (see ``DominoSDM.mm.y_hat_probs``). The mixture model is, thus, parameterized by :math:`\phi = [\mathbf{p}_S, \mu, \Sigma, \mathbf{p}, \mathbf{\hat{p}}]` corresponding to the attributes ``weights_, means_, covariances_, y_probs, y_hat_probs`` respectively. The log-likelihood over the :math:`n` examples in the validation dataset :math:`D_v` is given as followsand maximized using expectation-maximization: .. math:: \ell(\phi) = \sum_{i=1}^n \log \sum_{s=1}^{\hat{k}} P(S=s)P(Z=z_i| S=s) P( Y=y_i| S=s)P(\hat{Y} = h_\theta(x_i) | S=s) We include two optional hyperparameters :math:`\gamma, \hat{\gamma} \in \mathbb{R}_+` (see ``y_log_liklihood_weight`` and ``y_hat_log_likelihood_weight`` below) that balance the importance of modeling the class labels and predictions against the importance of modeling the embedding. The modified log-likelihood over :math:`n` examples is given as follows: .. math:: \ell(\phi) = \sum_{i=1}^n \log \sum_{s=1}^{\hat{k}} P(S=s)P(Z=z_i| S=s) P( Y=y_i| S=s)^\gamma P(\hat{Y} = h_\theta(x_i) | S=s)^{\hat{\gamma}} .. attention:: Although we model the prediction :math:`\hat{Y}` as a categorical random variable, in practice predictions are sometimes "soft" (e.g. the output of a softmax layer is a probability distribution over labels, not a single label). In these cases, the prediction :math:`\hat{Y}` is technically a dirichlet random variable (i.e. a distribution over distributions). However, to keep the implementation simple while still leveraging the extra information provided by "soft" predictions, we naïvely plug the "soft" predictions directly into the categorical PMF in the E-step and the update in the M-step. Specifically, during the E-step, instead of computing the categorical PMF :math:`P(\hat{Y}=\hat{y_i} | S=s)` we compute :math:`\sum_{j=1}^c \hat{y_i}(j) P(\hat{Y}=j | S=s)` where :math:`\hat{y_i}(j)` is the "soft" prediction for class :math:`j` (we can think of this like we're marginalizing out the uncertainty in the prediction). During the M-step, we compute a "soft" update for the categorical parameters :math:`p_j^{(s)} = \sum_{i=1}^n Q(s,i) \hat{y_i}(j)` where :math:`Q(s,i)` is the "responsibility" of slice :math:`s` towards the data point :math:`i`. When using ``"confusion"`` initialization, each slice $s^{(j)}$ is assigned a :math:`y^{(j)}\in \mathcal{Y}` and :math:`\hat{y}^{(j)} \in \mathcal{Y}` (*i.e.* each slice is assigned a cell in the confusion matrix). This is typically done in a round-robin fashion so that there are at least :math:`\floor{\hat{k} / {|\mathcal{Y}|^2}}` slices assigned to each cell in the confusion matrix. Then, we fill in the initial responsibility matrix :math:`Q \in \mathbb{R}^{n \times \hat{k}}`, where each cell :math:`Q_{ij}` corresponds to our model's initial estimate of :math:`P(S=s^{(j)}|Y=y_i, \hat{Y}=\hat{y}_i)`. We do this according to .. math:: \bar{Q}_{ij} \leftarrow \begin{cases} 1 + \epsilon & y_i=y^{(j)} \land \hat{y}_i = \hat{y}^{(j)} \\ \epsilon & \text{otherwise} \end{cases} .. math:: Q_{ij} \leftarrow \frac{\bar{Q}_{ij} } {\sum_{l=1}^{\hat{k}} \bar{Q}_{il}} where :math:`\epsilon` is random noise which ensures that slices assigned to the same confusion matrix cell won't have the exact same initialization. We sample :math:`\epsilon` uniformly from the range ``(0, confusion_noise]``. """ def __init__( self, n_slices: int = 5, covariance_type: str = "diag", n_pca_components: Union[int, None] = 128, n_mixture_components: int = 25, y_log_likelihood_weight: float = 1, y_hat_log_likelihood_weight: float = 1, max_iter: int = 100, init_params: str = "confusion", confusion_noise: float = 1e-3, random_state: int = None, ): super().__init__(n_slices=n_slices) self.config.covariance_type = covariance_type self.config.n_pca_components = n_pca_components self.config.n_mixture_components = n_mixture_components self.config.init_params = init_params self.config.confusion_noise = confusion_noise self.config.y_log_likelihood_weight = y_log_likelihood_weight self.config.y_hat_log_likelihood_weight = y_hat_log_likelihood_weight self.config.max_iter = max_iter if self.config.n_pca_components is None: self.pca = None else: self.pca = PCA(n_components=self.config.n_pca_components) self.mm = DominoMixture( n_components=self.config.n_mixture_components, y_log_likelihood_weight=self.config.y_log_likelihood_weight, y_hat_log_likelihood_weight=self.config.y_hat_log_likelihood_weight, covariance_type=self.config.covariance_type, init_params=self.config.init_params, max_iter=self.config.max_iter, confusion_noise=self.config.confusion_noise, random_state=random_state, ) def fit( self, data: Union[dict, mk.DataPanel] = None, embeddings: Union[str, np.ndarray] = "embedding", targets: Union[str, np.ndarray] = "target", pred_probs: Union[str, np.ndarray] = "pred_probs", ) -> DominoSlicer: """ Fit the mixture model to data. Args: data (mk.DataPanel, optional): A `Meerkat DataPanel` with columns for embeddings, targets, and prediction probabilities. The names of the columns can be specified with the ``embeddings``, ``targets``, and ``pred_probs`` arguments. Defaults to None. embeddings (Union[str, np.ndarray], optional): The name of a colum in ``data`` holding embeddings. If ``data`` is ``None``, then an np.ndarray of shape (n_samples, dimension of embedding). Defaults to "embedding". targets (Union[str, np.ndarray], optional): The name of a column in ``data`` holding class labels. If ``data`` is ``None``, then an np.ndarray of shape (n_samples,). Defaults to "target". pred_probs (Union[str, np.ndarray], optional): The name of a column in ``data`` holding model predictions (can either be "soft" probability scores or "hard" 1-hot encoded predictions). If ``data`` is ``None``, then an np.ndarray of shape (n_samples, n_classes) or (n_samples,) in the binary case. Defaults to "pred_probs". Returns: DominoSDM: Returns a fit instance of DominoSDM. """ embeddings, targets, pred_probs = unpack_args( data, embeddings, targets, pred_probs ) if self.pca is not None: self.pca.fit(X=embeddings) embeddings = self.pca.transform(X=embeddings) self.mm.fit(X=embeddings, y=targets, y_hat=pred_probs) self.slice_cluster_indices = ( -np.abs((self.mm.y_hat_probs - self.mm.y_probs).max(axis=1)) ).argsort()[: self.config.n_slices] return self def predict( self, data: Union[dict, mk.DataPanel] = None, embeddings: Union[str, np.ndarray] = "embedding", targets: Union[str, np.ndarray] = "target", pred_probs: Union[str, np.ndarray] = "pred_probs", ) -> np.ndarray: """ Get probabilistic slice membership for data using a fit mixture model. .. caution:: Must call ``DominoSlicer.fit`` prior to calling ``DominoSlicer.predict``. Args: data (mk.DataPanel, optional): A `Meerkat DataPanel` with columns for embeddings, targets, and prediction probabilities. The names of the columns can be specified with the ``embeddings``, ``targets``, and ``pred_probs`` arguments. Defaults to None. embeddings (Union[str, np.ndarray], optional): The name of a colum in ``data`` holding embeddings. If ``data`` is ``None``, then an np.ndarray of shape (n_samples, dimension of embedding). Defaults to "embedding". targets (Union[str, np.ndarray], optional): The name of a column in ``data`` holding class labels. If ``data`` is ``None``, then an np.ndarray of shape (n_samples,). Defaults to "target". pred_probs (Union[str, np.ndarray], optional): The name of a column in ``data`` holding model predictions (can either be "soft" probability scores or "hard" 1-hot encoded predictions). If ``data`` is ``None``, then an np.ndarray of shape (n_samples, n_classes) or (n_samples,) in the binary case. Defaults to "pred_probs". Returns: np.ndarray: A binary ``np.ndarray`` of shape (n_samples, n_slices) where values are either 1 or 0. """ probs = self.predict_proba( data=data, embeddings=embeddings, targets=targets, pred_probs=pred_probs, ) preds = np.zeros_like(probs, dtype=np.int32) preds[np.arange(preds.shape[0]), probs.argmax(axis=-1)] = 1 return preds def predict_proba( self, data: Union[dict, mk.DataPanel] = None, embeddings: Union[str, np.ndarray] = "embedding", targets: Union[str, np.ndarray] = "target", pred_probs: Union[str, np.ndarray] = "pred_probs", ) -> np.ndarray: """ Get probabilistic slice membership for data using a fit mixture model. .. caution:: Must call ``DominoSlicer.fit`` prior to calling ``DominoSlicer.predict_proba``. Args: data (mk.DataPanel, optional): A `Meerkat DataPanel` with columns for embeddings, targets, and prediction probabilities. The names of the columns can be specified with the ``embeddings``, ``targets``, and ``pred_probs`` arguments. Defaults to None. embeddings (Union[str, np.ndarray], optional): The name of a colum in ``data`` holding embeddings. If ``data`` is ``None``, then an np.ndarray of shape (n_samples, dimension of embedding). Defaults to "embedding". targets (Union[str, np.ndarray], optional): The name of a column in ``data`` holding class labels. If ``data`` is ``None``, then an np.ndarray of shape (n_samples,). Defaults to "target". pred_probs (Union[str, np.ndarray], optional): The name of a column in ``data`` holding model predictions (can either be "soft" probability scores or "hard" 1-hot encoded predictions). If ``data`` is ``None``, then an np.ndarray of shape (n_samples, n_classes) or (n_samples,) in the binary case. Defaults to "pred_probs". Returns: np.ndarray: A ``np.ndarray`` of shape (n_samples, n_slices) where values in are in range [0,1] and rows sum to 1. """ embeddings, targets, pred_probs = unpack_args( data, embeddings, targets, pred_probs ) if self.pca is not None: embeddings = self.pca.transform(X=embeddings) clusters = self.mm.predict_proba(embeddings, y=targets, y_hat=pred_probs) return clusters[:, self.slice_cluster_indices]
class DominoMixture(GaussianMixture): @wraps(GaussianMixture.__init__) def __init__( self, *args, y_log_likelihood_weight: float = 1, y_hat_log_likelihood_weight: float = 1, confusion_noise: float = 1e-3, **kwargs, ): self.y_log_likelihood_weight = y_log_likelihood_weight self.y_hat_log_likelihood_weight = y_hat_log_likelihood_weight self.confusion_noise = confusion_noise super().__init__(*args, **kwargs) def _initialize_parameters(self, X, y, y_hat, random_state): """Initialize the model parameters. Parameters ---------- X : array-like of shape (n_samples, n_features) random_state : RandomState A random number generator instance that controls the random seed used for the method chosen to initialize the parameters. """ n_samples, _ = X.shape if self.init_params == "kmeans": resp = np.zeros((n_samples, self.n_components)) label = ( cluster.KMeans( n_clusters=self.n_components, n_init=1, random_state=random_state ) .fit(X) .labels_ ) resp[np.arange(n_samples), label] = 1 elif self.init_params == "random": resp = random_state.rand(n_samples, self.n_components) resp /= resp.sum(axis=1)[:, np.newaxis] elif self.init_params == "confusion": num_classes = y.shape[-1] if self.n_components < num_classes ** 2: raise ValueError( "Can't use parameter init 'error' when " "`n_components` < `num_classes **2`" ) resp = np.matmul(y[:, :, np.newaxis], y_hat[:, np.newaxis, :]).reshape( len(y), -1 ) resp = np.concatenate( [resp] * ( int(self.n_components / (num_classes ** 2)) + (self.n_components % (num_classes ** 2) > 0) ), axis=1, )[:, : self.n_components] resp /= resp.sum(axis=1)[:, np.newaxis] resp += ( random_state.rand(n_samples, self.n_components) * self.confusion_noise ) resp /= resp.sum(axis=1)[:, np.newaxis] else: raise ValueError( "Unimplemented initialization method '%s'" % self.init_params ) self._initialize(X, y, y_hat, resp) def _initialize(self, X, y, y_hat, resp): """Initialization of the Gaussian mixture parameters. Parameters ---------- X : array-like of shape (n_samples, n_features) resp : array-like of shape (n_samples, n_components) """ n_samples, _ = X.shape weights, means, covariances, y_probs, y_hat_probs = _estimate_parameters( X, y, y_hat, resp, self.reg_covar, self.covariance_type ) weights /= n_samples self.weights_ = weights if self.weights_init is None else self.weights_init self.means_ = means if self.means_init is None else self.means_init self.y_probs, self.y_hat_probs = y_probs, y_hat_probs if self.precisions_init is None: self.covariances_ = covariances self.precisions_cholesky_ = _compute_precision_cholesky( covariances, self.covariance_type ) elif self.covariance_type == "full": self.precisions_cholesky_ = np.array( [ linalg.cholesky(prec_init, lower=True) for prec_init in self.precisions_init ] ) elif self.covariance_type == "tied": self.precisions_cholesky_ = linalg.cholesky( self.precisions_init, lower=True ) else: self.precisions_cholesky_ = self.precisions_init def fit(self, X, y, y_hat): self.fit_predict(X, y, y_hat) return self def _preprocess_ys(self, y: np.ndarray = None, y_hat: np.ndarray = None): if y is not None: y = label_binarize(y, classes=np.arange(np.max(y) + 1)) if y.shape[-1] == 1: # binary targets transform to a column vector with label_binarize y = np.array([1 - y[:, 0], y[:, 0]]).T if y_hat is not None: if len(y_hat.shape) == 1: y_hat = np.array([1 - y_hat, y_hat]).T return y, y_hat def fit_predict(self, X, y, y_hat): y, y_hat = self._preprocess_ys(y, y_hat) X = _check_X(X, self.n_components, ensure_min_samples=2) self._check_n_features(X, reset=True) self._check_initial_parameters(X) # if we enable warm_start, we will have a unique initialisation do_init = not (self.warm_start and hasattr(self, "converged_")) n_init = self.n_init if do_init else 1 max_lower_bound = -np.infty self.converged_ = False random_state = check_random_state(self.random_state) n_samples, _ = X.shape for init in range(n_init): self._print_verbose_msg_init_beg(init) if do_init: self._initialize_parameters(X, y, y_hat, random_state) lower_bound = -np.infty if do_init else self.lower_bound_ for n_iter in tqdm(range(1, self.max_iter + 1), colour="#f17a4a"): prev_lower_bound = lower_bound log_prob_norm, log_resp = self._e_step(X, y, y_hat) self._m_step(X, y, y_hat, log_resp) lower_bound = self._compute_lower_bound(log_resp, log_prob_norm) change = lower_bound - prev_lower_bound self._print_verbose_msg_iter_end(n_iter, change) if abs(change) < self.tol: self.converged_ = True break self._print_verbose_msg_init_end(lower_bound) if lower_bound > max_lower_bound: max_lower_bound = lower_bound best_params = self._get_parameters() best_n_iter = n_iter if not self.converged_: warnings.warn( "Initialization %d did not converge. " "Try different init parameters, " "or increase max_iter, tol " "or check for degenerate data." % (init + 1), ConvergenceWarning, ) self._set_parameters(best_params) self.n_iter_ = best_n_iter self.lower_bound_ = max_lower_bound # Always do a final e-step to guarantee that the labels returned by # fit_predict(X) are always consistent with fit(X).predict(X) # for any value of max_iter and tol (and any random_state). _, log_resp = self._e_step(X, y, y_hat) return log_resp.argmax(axis=1) def predict_proba( self, X: np.ndarray, y: np.ndarray = None, y_hat: np.ndarray = None ): y, y_hat = self._preprocess_ys(y, y_hat) check_is_fitted(self) X = _check_X(X, None, self.means_.shape[1]) _, log_resp = self._estimate_log_prob_resp(X, y, y_hat) return np.exp(log_resp) def _m_step(self, X, y, y_hat, log_resp): """M step. Parameters ---------- X : array-like of shape (n_samples, n_features) log_resp : array-like of shape (n_samples, n_components) Logarithm of the posterior probabilities (or responsibilities) of the point of each sample in X. """ resp = np.exp(log_resp) n_samples, _ = X.shape ( self.weights_, self.means_, self.covariances_, self.y_probs, self.y_hat_probs, ) = _estimate_parameters( X, y, y_hat, resp, self.reg_covar, self.covariance_type ) self.weights_ /= n_samples self.precisions_cholesky_ = _compute_precision_cholesky( self.covariances_, self.covariance_type ) def _e_step(self, X, y, y_hat): """E step. Parameters ---------- X : array-like of shape (n_samples, n_features) Returns ------- log_prob_norm : float Mean of the logarithms of the probabilities of each sample in X log_responsibility : array, shape (n_samples, n_components) Logarithm of the posterior probabilities (or responsibilities) of the point of each sample in X. """ log_prob_norm, log_resp = self._estimate_log_prob_resp(X, y, y_hat) return np.mean(log_prob_norm), log_resp def _estimate_log_prob_resp(self, X, y=None, y_hat=None): """Estimate log probabilities and responsibilities for each sample. Compute the log probabilities, weighted log probabilities per component and responsibilities for each sample in X with respect to the current state of the model. Parameters ---------- X : array-like of shape (n_samples, n_features) Returns ------- log_prob_norm : array, shape (n_samples,) log p(X) log_responsibilities : array, shape (n_samples, n_components) logarithm of the responsibilities """ weighted_log_prob = self._estimate_weighted_log_prob(X, y, y_hat) log_prob_norm = logsumexp(weighted_log_prob, axis=1) with np.errstate(under="ignore"): # ignore underflow log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis] return log_prob_norm, log_resp def _estimate_weighted_log_prob(self, X, y=None, y_hat=None): log_prob = self._estimate_log_prob(X) + self._estimate_log_weights() if y is not None: log_prob += self._estimate_y_log_prob(y) * self.y_log_likelihood_weight if y_hat is not None: log_prob += ( self._estimate_y_hat_log_prob(y_hat) * self.y_hat_log_likelihood_weight ) return log_prob def _get_parameters(self): return ( self.weights_, self.means_, self.covariances_, self.y_probs, self.y_hat_probs, self.precisions_cholesky_, ) def _set_parameters(self, params): ( self.weights_, self.means_, self.covariances_, self.y_probs, self.y_hat_probs, self.precisions_cholesky_, ) = params # Attributes computation _, n_features = self.means_.shape if self.covariance_type == "full": self.precisions_ = np.empty(self.precisions_cholesky_.shape) for k, prec_chol in enumerate(self.precisions_cholesky_): self.precisions_[k] = np.dot(prec_chol, prec_chol.T) elif self.covariance_type == "tied": self.precisions_ = np.dot( self.precisions_cholesky_, self.precisions_cholesky_.T ) else: self.precisions_ = self.precisions_cholesky_ ** 2 def _n_parameters(self): """Return the number of free parameters in the model.""" return super()._n_parameters() + 2 * self.n_components def _estimate_y_log_prob(self, y): """Estimate the Gaussian distribution parameters. Parameters ---------- y: array-like of shape (n_samples, n_classes) y_hat: array-like of shpae (n_samples, n_classes) """ # add epsilon to avoid "RuntimeWarning: divide by zero encountered in log" return np.log(np.dot(y, self.y_probs.T) + np.finfo(self.y_probs.dtype).eps) def _estimate_y_hat_log_prob(self, y_hat): """Estimate the Gaussian distribution parameters. Parameters ---------- y: array-like of shape (n_samples, n_classes) y_hat: array-like of shpae (n_samples, n_classes) """ # add epsilon to avoid "RuntimeWarning: divide by zero encountered in log" return np.log( np.dot(y_hat, self.y_hat_probs.T) + np.finfo(self.y_hat_probs.dtype).eps ) def _estimate_parameters(X, y, y_hat, resp, reg_covar, covariance_type): """Estimate the Gaussian distribution parameters. Parameters ---------- X : array-like of shape (n_samples, n_features) The input data array. y: array-like of shape (n_samples, n_classes) y_hat: array-like of shpae (n_samples, n_classes) resp : array-like of shape (n_samples, n_components) The responsibilities for each data sample in X. reg_covar : float The regularization added to the diagonal of the covariance matrices. covariance_type : {'full', 'tied', 'diag', 'spherical'} The type of precision matrices. Returns ------- nk : array-like of shape (n_components,) The numbers of data samples in the current components. means : array-like of shape (n_components, n_features) The centers of the current components. covariances : array-like The covariance matrix of the current components. The shape depends of the covariance_type. """ nk = resp.sum(axis=0) + 10 * np.finfo(resp.dtype).eps # (n_components, ) means = np.dot(resp.T, X) / nk[:, np.newaxis] covariances = { "full": _estimate_gaussian_covariances_full, "tied": _estimate_gaussian_covariances_tied, "diag": _estimate_gaussian_covariances_diag, "spherical": _estimate_gaussian_covariances_spherical, }[covariance_type](resp, X, nk, means, reg_covar) y_probs = np.dot(resp.T, y) / nk[:, np.newaxis] # (n_components, n_classes) y_hat_probs = np.dot(resp.T, y_hat) / nk[:, np.newaxis] # (n_components, n_classes) return nk, means, covariances, y_probs, y_hat_probs