Source code for mushroom_rl.distributions.gaussian

import numpy as np
from .distribution import Distribution
from scipy.stats import multivariate_normal
from scipy.optimize import minimize

[docs]class GaussianDistribution(Distribution): """ Gaussian distribution with fixed covariance matrix. The parameters vector represents only the mean. """
[docs] def __init__(self, mu, sigma): """ Constructor. Args: mu (np.ndarray): initial mean of the distribution; sigma (np.ndarray): covariance matrix of the distribution. """ self._mu = mu self._sigma = sigma self._inv_sigma = np.linalg.inv(sigma) self._add_save_attr( _mu='numpy', _sigma='numpy', _inv_sigma='numpy' )
[docs] def sample(self): return np.random.multivariate_normal(self._mu, self._sigma)
[docs] def log_pdf(self, theta): return multivariate_normal.logpdf(theta, self._mu, self._sigma)
[docs] def __call__(self, theta): return multivariate_normal.pdf(theta, self._mu, self._sigma)
[docs] def entropy(self): n_dims = len(self._mu) sigma = self._sigma (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) return GaussianDistribution._entropy(logdet_sigma, n_dims)
[docs] def mle(self, theta, weights=None): if weights is None: self._mu = np.mean(theta, axis=0) else: self._mu = weights.dot(theta) / np.sum(weights)
def con_wmle(self, theta, weights, eps, *args): n_dims = len(self._mu) mu =self._mu sigma = self._sigma eta_start = np.array([1000]) res = minimize(GaussianDistribution._lagrangian_eta, eta_start, bounds=((np.finfo(np.float32).eps, np.inf),), args=(weights, theta, mu, sigma, n_dims, eps), method='SLSQP') eta_opt = res.x[0] self._mu = GaussianDistribution._compute_mu_from_lagrangian(weights, theta, mu, eta_opt)
[docs] def diff_log(self, theta): delta = theta - self._mu g = self._inv_sigma.dot(delta) return g
[docs] def get_parameters(self): return self._mu
[docs] def set_parameters(self, rho): self._mu = rho
@property def parameters_size(self): return len(self._mu) @staticmethod def _compute_mu_from_lagrangian(weights, theta, mu, eta): weights_sum = np.sum(weights) mu_new = (weights @ theta + eta * mu) / (weights_sum + eta) return mu_new @staticmethod def _kl_constraint(mu, mu_new, sigma, sigma_new, sigma_inv, sigma_new_inv, logdet_sigma, logdet_sigma_new, n_dims): return 0.5*(np.trace(sigma_new_inv@sigma) - n_dims + logdet_sigma_new - logdet_sigma + (mu_new - mu).T @ sigma_new_inv @ (mu_new - mu)) @staticmethod def _entropy(logdet_sigma, n_dims): c = n_dims * np.log(2*np.pi) return 0.5 * (logdet_sigma + c + n_dims) @staticmethod def _lagrangian_eta(lag_array, weights, theta, mu, sigma, n_dims, eps): eta = lag_array[0] mu_new = GaussianDistribution._compute_mu_from_lagrangian(weights, theta, mu, eta) sigma_inv = np.linalg.inv(sigma) (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) c = n_dims * np.log(2*np.pi) sum1 = np.sum([w_i * (-0.5 * (theta_i - mu_new)[:,np.newaxis].T @ sigma_inv @ (theta_i - mu_new)[:,np.newaxis] - 0.5 * logdet_sigma - 0.5 * c) for w_i, theta_i in zip(weights, theta)]) sum2 = eta * (eps - GaussianDistribution._kl_constraint(mu, mu_new, sigma, sigma, sigma_inv, sigma_inv, logdet_sigma, logdet_sigma, n_dims)) return sum1 + sum2
[docs]class GaussianDiagonalDistribution(Distribution): """ Gaussian distribution with diagonal covariance matrix. The parameters vector represents the mean and the standard deviation for each dimension. """
[docs] def __init__(self, mu, std): """ Constructor. Args: mu (np.ndarray): initial mean of the distribution; std (np.ndarray): initial vector of standard deviations for each variable of the distribution. """ assert(len(std.shape) == 1) self._mu = mu self._std = std self._add_save_attr( _mu='numpy', _std='numpy' )
[docs] def sample(self): sigma = np.diag(self._std**2) return np.random.multivariate_normal(self._mu, sigma)
[docs] def log_pdf(self, theta): sigma = np.diag(self._std ** 2) return multivariate_normal.logpdf(theta, self._mu, sigma)
[docs] def __call__(self, theta): sigma = np.diag(self._std ** 2) return multivariate_normal.pdf(theta, self._mu, sigma)
[docs] def entropy(self): n_dims = len(self._mu) sigma = np.diag(self._std**2) (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) return GaussianDiagonalDistribution._entropy(logdet_sigma, n_dims)
[docs] def mle(self, theta, weights=None): if weights is None: self._mu = np.mean(theta, axis=0) self._std = np.std(theta, axis=0) else: sumD = np.sum(weights) sumD2 = np.sum(weights**2) Z = sumD - sumD2 / sumD self._mu = weights.dot(theta) / sumD delta2 = (theta - self._mu)**2 self._std = np.sqrt(weights.dot(delta2) / Z)
def con_wmle(self, theta, weights, eps, kappa): n_dims = len(self._mu) mu = self._mu sigma = self._std eta_omg_start = np.array([1000, 0]) res = minimize(GaussianDiagonalDistribution._lagrangian_eta_omega, eta_omg_start, bounds=((np.finfo(np.float32).eps, np.inf),(np.finfo(np.float32).eps, np.inf)), args=(weights, theta, mu, sigma, n_dims, eps, kappa), method='SLSQP') eta_opt, omg_opt = res.x[0], res.x[1] self._mu, self._std = GaussianDiagonalDistribution._compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta_opt, omg_opt)
[docs] def diff_log(self, theta): n_dims = len(self._mu) sigma = self._std**2 g = np.empty(self.parameters_size) delta = theta - self._mu g_mean = delta / sigma g_std = delta**2 / (self._std**3) - 1 / self._std g[:n_dims] = g_mean g[n_dims:] = g_std return g
[docs] def get_parameters(self): rho = np.empty(self.parameters_size) n_dims = len(self._mu) rho[:n_dims] = self._mu rho[n_dims:] = self._std return rho
[docs] def set_parameters(self, rho): n_dims = len(self._mu) self._mu = rho[:n_dims] self._std = rho[n_dims:]
@property def parameters_size(self): return 2 * len(self._mu) @staticmethod def _compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta, omg): weights_sum = np.sum(weights) mu_new = (weights @ theta + eta * mu) / (weights_sum + eta) sigma_new = np.sqrt( ( np.sum([w_i * (theta_i-mu_new)**2 for theta_i, w_i in zip(theta, weights)], axis=0) + eta*sigma**2 + eta*(mu_new - mu)**2 ) / ( weights_sum + eta - omg ) ) return mu_new, sigma_new @staticmethod def _kl_constraint(mu, mu_new, sigma, sigma_new, sigma_inv, sigma_new_inv, logdet_sigma, logdet_sigma_new, n_dims): return 0.5*(np.trace(sigma_new_inv@sigma) - n_dims + logdet_sigma_new - logdet_sigma + (mu_new - mu).T @ sigma_new_inv @ (mu_new - mu)) @staticmethod def _entropy(logdet_sigma, n_dims): c = n_dims * np.log(2*np.pi) return 0.5 * (logdet_sigma + c + n_dims) @staticmethod def _lagrangian_eta_omega(lag_array, weights, theta, mu, sigma, n_dims, eps, kappa): eta, omg = lag_array[0], lag_array[1] mu_new, sigma_new = GaussianDiagonalDistribution._compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta, omg) sigma = np.diag(sigma**2) sigma_new = np.diag(sigma_new**2) sigma_inv = np.linalg.inv(sigma) sigma_new_inv = np.linalg.inv(sigma_new) (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) (sign_sigma_new, logdet_sigma_new) = np.linalg.slogdet(sigma_new) c = n_dims * np.log(2*np.pi) sum1 = np.sum([w_i * (-0.5*(theta_i - mu_new)[:,np.newaxis].T @ sigma_new_inv @ (theta_i - mu_new)[:,np.newaxis] - 0.5 * logdet_sigma_new - c/2) for w_i, theta_i in zip(weights, theta)]) sum2 = eta * (eps - GaussianDiagonalDistribution._kl_constraint(mu, mu_new, sigma, sigma_new, sigma_inv, sigma_new_inv, logdet_sigma, logdet_sigma_new, n_dims)) sum3 = omg * (GaussianDiagonalDistribution._entropy(logdet_sigma_new, n_dims) - ( GaussianDiagonalDistribution._entropy(logdet_sigma, n_dims) - kappa ) ) return sum1 + sum2 + sum3
[docs]class GaussianCholeskyDistribution(Distribution): """ Gaussian distribution with full covariance matrix. The parameters vector represents the mean and the Cholesky decomposition of the covariance matrix. This parametrization enforce the covariance matrix to be positive definite. """
[docs] def __init__(self, mu, sigma): """ Constructor. Args: mu (np.ndarray): initial mean of the distribution; sigma (np.ndarray): initial covariance matrix of the distribution. """ self._mu = mu self._chol_sigma = np.linalg.cholesky(sigma) self._add_save_attr( _mu='numpy', _chol_sigma='numpy' )
[docs] def sample(self): sigma = self._chol_sigma.dot(self._chol_sigma.T) return np.random.multivariate_normal(self._mu, sigma)
[docs] def log_pdf(self, theta): sigma = self._chol_sigma.dot(self._chol_sigma.T) return multivariate_normal.logpdf(theta, self._mu, sigma)
[docs] def __call__(self, theta): sigma = self._chol_sigma.dot(self._chol_sigma.T) return multivariate_normal.pdf(theta, self._mu, sigma)
[docs] def entropy(self): n_dims = len(self._mu) sigma = self._chol_sigma.dot(self._chol_sigma.T) (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) return GaussianCholeskyDistribution._entropy(logdet_sigma, n_dims)
[docs] def mle(self, theta, weights=None): if weights is None: self._mu = np.mean(theta, axis=0) sigma = np.cov(theta, rowvar=False) else: sumD = np.sum(weights) sumD2 = np.sum(weights**2) Z = sumD - sumD2 / sumD self._mu = weights.dot(theta) / sumD delta = theta - self._mu sigma = delta.T.dot(np.diag(weights)).dot(delta) / Z self._chol_sigma = np.linalg.cholesky(sigma)
def con_wmle(self, theta, weights, eps, kappa): n_dims = len(self._mu) mu =self._mu sigma = self._chol_sigma.dot(self._chol_sigma.T) eta_omg_start = np.array([1000, 0]) res = minimize(GaussianCholeskyDistribution._lagrangian_eta_omega, eta_omg_start, bounds=((np.finfo(np.float32).eps, np.inf),(np.finfo(np.float32).eps, np.inf)), args=(weights, theta, mu, sigma, n_dims, eps, kappa), method='SLSQP') eta_opt, omg_opt = res.x[0], res.x[1] mu_new, sigma_new = GaussianCholeskyDistribution._compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta_opt, omg_opt) self._mu, self._chol_sigma = mu_new, np.linalg.cholesky(sigma_new)
[docs] def diff_log(self, theta): n_dims = len(self._mu) inv_chol = np.linalg.inv(self._chol_sigma) inv_sigma = inv_chol.T.dot(inv_chol) g = np.empty(self.parameters_size) delta = theta - self._mu g_mean = inv_sigma.dot(delta) delta_a = np.reshape(delta, (-1, 1)) delta_b = np.reshape(delta, (1, -1)) S = inv_chol.dot(delta_a).dot(delta_b).dot(inv_sigma) g_cov = S - np.diag(np.diag(inv_chol)) g[:n_dims] = g_mean g[n_dims:] = g_cov.T[np.tril_indices(n_dims)] return g
[docs] def get_parameters(self): rho = np.empty(self.parameters_size) n_dims = len(self._mu) rho[:n_dims] = self._mu rho[n_dims:] = self._chol_sigma[np.tril_indices(n_dims)] return rho
[docs] def set_parameters(self, rho): n_dims = len(self._mu) self._mu = rho[:n_dims] self._chol_sigma[np.tril_indices(n_dims)] = rho[n_dims:]
@property def parameters_size(self): n_dims = len(self._mu) return 2 * n_dims + (n_dims * n_dims - n_dims) // 2 @staticmethod def _compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta, omg): weights_sum = np.sum(weights) mu_new = (weights @ theta + eta * mu) / (weights_sum + eta) sigmawa = (theta - mu_new).T @ np.diag(weights) @ (theta - mu_new) sigma_new = (sigmawa + eta * sigma + eta * (mu_new - mu)[:, np.newaxis] @ (mu_new - mu)[:, np.newaxis].T) / (weights_sum + eta - omg) return mu_new, sigma_new @staticmethod def _kl_constraint(mu, mu_new, sigma, sigma_new, sigma_inv, sigma_new_inv, logdet_sigma, logdet_sigma_new, n_dims): return 0.5*(np.trace(sigma_new_inv@sigma) - n_dims + logdet_sigma_new - logdet_sigma + (mu_new - mu).T @ sigma_new_inv @ (mu_new - mu)) @staticmethod def _entropy(logdet_sigma, n_dims): c = n_dims * np.log(2*np.pi) return 0.5 * (logdet_sigma + c + n_dims) @staticmethod def _lagrangian_eta_omega(lag_array, weights, theta, mu, sigma, n_dims, eps, kappa): eta, omg = lag_array[0], lag_array[1] mu_new, sigma_new = GaussianCholeskyDistribution._compute_mu_sigma_from_lagrangian(weights, theta, mu, sigma, eta, omg) sigma_inv = np.linalg.inv(sigma) sigma_new_inv = np.linalg.inv(sigma_new) (sign_sigma, logdet_sigma) = np.linalg.slogdet(sigma) (sign_sigma_new, logdet_sigma_new) = np.linalg.slogdet(sigma_new) c = n_dims * np.log(2*np.pi) sum1 = np.sum([w_i * (-0.5 * (theta_i - mu_new)[:,np.newaxis].T @ sigma_new_inv @ (theta_i - mu_new)[:,np.newaxis] - 0.5 * logdet_sigma_new - 0.5 * c) for w_i, theta_i in zip(weights, theta)]) sum2 = eta * (eps - GaussianCholeskyDistribution._kl_constraint(mu, mu_new, sigma, sigma_new, sigma_inv, sigma_new_inv, logdet_sigma, logdet_sigma_new, n_dims)) sum3 = omg * (GaussianCholeskyDistribution._entropy(logdet_sigma_new, n_dims) - ( GaussianCholeskyDistribution._entropy(logdet_sigma, n_dims) - kappa ) ) return sum1 + sum2 + sum3