Source code for mushroom_rl.policy.gaussian_policy

import numpy as np

from .policy import ParametricPolicy
from scipy.stats import multivariate_normal


[docs]class AbstractGaussianPolicy(ParametricPolicy): """ Abstract class of Gaussian policies. """
[docs] def __call__(self, state, action): mu, sigma = self._compute_multivariate_gaussian(state)[:2] return multivariate_normal.pdf(action, mu, sigma)
[docs] def draw_action(self, state): mu, sigma = self._compute_multivariate_gaussian(state)[:2] return np.random.multivariate_normal(mu, sigma)
[docs]class GaussianPolicy(AbstractGaussianPolicy): """ Gaussian policy. This is a differentiable policy for continuous action spaces. The policy samples an action in every state following a gaussian distribution, where the mean is computed in the state and the covariance matrix is fixed. """
[docs] def __init__(self, mu, sigma): """ Constructor. Args: mu (Regressor): the regressor representing the mean w.r.t. the state; sigma (np.ndarray): a square positive definite matrix representing the covariance matrix. The size of this matrix must be n x n, where n is the action dimensionality. """ self._approximator = mu self._predict_params = dict() self._inv_sigma = np.linalg.inv(sigma) self._sigma = sigma self._add_save_attr( _approximator='mushroom', _predict_params='pickle', _inv_sigma='numpy', _sigma='numpy' )
[docs] def set_sigma(self, sigma): """ Setter. Args: sigma (np.ndarray): the new covariance matrix. Must be a square positive definite matrix. """ self._sigma = sigma self._inv_sigma = np.linalg.inv(sigma)
[docs] def diff_log(self, state, action): mu, _, inv_sigma = self._compute_multivariate_gaussian(state) delta = action - mu j_mu = self._approximator.diff(state) if len(j_mu.shape) == 1: j_mu = np.expand_dims(j_mu, axis=1) g = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T) return g
[docs] def set_weights(self, weights): self._approximator.set_weights(weights)
[docs] def get_weights(self): return self._approximator.get_weights()
@property def weights_size(self): return self._approximator.weights_size def _compute_multivariate_gaussian(self, state): mu = np.reshape(self._approximator.predict(np.expand_dims(state, axis=0), **self._predict_params), -1) return mu, self._sigma, self._inv_sigma
[docs]class DiagonalGaussianPolicy(AbstractGaussianPolicy): """ Gaussian policy with learnable standard deviation. The Covariance matrix is constrained to be a diagonal matrix, where the diagonal is the squared standard deviation vector. This is a differentiable policy for continuous action spaces. This policy is similar to the gaussian policy, but the weights includes also the standard deviation. """
[docs] def __init__(self, mu, std): """ Constructor. Args: mu (Regressor): the regressor representing the mean w.r.t. the state; std (np.ndarray): a vector of standard deviations. The length of this vector must be equal to the action dimensionality. """ self._approximator = mu self._predict_params = dict() self._std = std self._add_save_attr( _approximator='mushroom', _predict_params='pickle', _std='numpy' )
[docs] def set_std(self, std): """ Setter. Args: std (np.ndarray): the new standard deviation. Must be a square positive definite matrix. """ self._std = std
[docs] def diff_log(self, state, action): mu, _, inv_sigma = self._compute_multivariate_gaussian(state) delta = action - mu # Compute mean derivative j_mu = self._approximator.diff(state) if len(j_mu.shape) == 1: j_mu = np.expand_dims(j_mu, axis=1) g_mu = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T) # Compute standard deviation derivative g_sigma = -1. / self._std + delta**2 / self._std**3 return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights): self._approximator.set_weights( weights[0:self._approximator.weights_size]) self._std = weights[self._approximator.weights_size:]
[docs] def get_weights(self): return np.concatenate((self._approximator.get_weights(), self._std), axis=0)
@property def weights_size(self): return self._approximator.weights_size + self._std.size def _compute_multivariate_gaussian(self, state): mu = np.reshape(self._approximator.predict(np.expand_dims(state, axis=0), **self._predict_params), -1) sigma = self._std**2 return mu, np.diag(sigma), np.diag(1. / sigma)
[docs]class StateStdGaussianPolicy(AbstractGaussianPolicy): """ Gaussian policy with learnable standard deviation. The Covariance matrix is constrained to be a diagonal matrix, where the diagonal is the squared standard deviation, which is computed for each state. This is a differentiable policy for continuous action spaces. This policy is similar to the diagonal gaussian policy, but a parametric regressor is used to compute the standard deviation, so the standard deviation depends on the current state. """
[docs] def __init__(self, mu, std, eps=1e-6): """ Constructor. Args: mu (Regressor): the regressor representing the mean w.r.t. the state; std (Regressor): the regressor representing the standard deviations w.r.t. the state. The output dimensionality of the regressor must be equal to the action dimensionality; eps(float, 1e-6): A positive constant added to the variance to ensure that is always greater than zero. """ assert(eps > 0) self._mu_approximator = mu self._std_approximator = std self._predict_params = dict() self._eps = eps self._add_save_attr( _mu_approximator='mushroom', _std_approximator='mushroom', _predict_params='pickle', _eps='primitive' )
[docs] def diff_log(self, state, action): mu, sigma, std = self._compute_multivariate_gaussian(state) diag_sigma = np.diag(sigma) delta = action - mu # Compute mean derivative j_mu = self._mu_approximator.diff(state) if len(j_mu.shape) == 1: j_mu = np.expand_dims(j_mu, axis=1) sigma_inv = np.diag(1 / diag_sigma) g_mu = j_mu.dot(sigma_inv).dot(delta.T) # Compute variance derivative w = (delta**2 - diag_sigma) * std / diag_sigma**2 j_sigma = np.atleast_2d(self._std_approximator.diff(state).T) g_sigma = np.atleast_1d(w.dot(j_sigma)) return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights): mu_weights = weights[0:self._mu_approximator.weights_size] std_weights = weights[self._mu_approximator.weights_size:] self._mu_approximator.set_weights(mu_weights) self._std_approximator.set_weights(std_weights)
[docs] def get_weights(self): mu_weights = self._mu_approximator.get_weights() std_weights = self._std_approximator.get_weights() return np.concatenate((mu_weights, std_weights), axis=0)
@property def weights_size(self): return self._mu_approximator.weights_size + \ self._std_approximator.weights_size def _compute_multivariate_gaussian(self, state): mu = np.reshape(self._mu_approximator.predict( np.expand_dims(state, axis=0), **self._predict_params), -1) std = np.reshape(self._std_approximator.predict( np.expand_dims(state, axis=0), **self._predict_params), -1) sigma = std**2 + self._eps return mu, np.diag(sigma), std
[docs]class StateLogStdGaussianPolicy(AbstractGaussianPolicy): """ Gaussian policy with learnable standard deviation. The Covariance matrix is constrained to be a diagonal matrix, the diagonal is computed by an exponential transformation of the logarithm of the standard deviation computed in each state. This is a differentiable policy for continuous action spaces. This policy is similar to the State std gaussian policy, but here the regressor represents the logarithm of the standard deviation. """
[docs] def __init__(self, mu, log_std): """ Constructor. Args: mu (Regressor): the regressor representing the mean w.r.t. the state; log_std (Regressor): a regressor representing the logarithm of the variance w.r.t. the state. The output dimensionality of the regressor must be equal to the action dimensionality. """ self._mu_approximator = mu self._log_std_approximator = log_std self._predict_params = dict() self._add_save_attr( _mu_approximator='mushroom', _log_std_approximator='mushroom', _predict_params='pickle' )
[docs] def diff_log(self, state, action): mu, sigma = self._compute_multivariate_gaussian(state) diag_sigma = np.diag(sigma) delta = action - mu # Compute mean derivative j_mu = self._mu_approximator.diff(state) if len(j_mu.shape) == 1: j_mu = np.expand_dims(j_mu, axis=1) sigma_inv = np.diag(1 / diag_sigma) g_mu = j_mu.dot(sigma_inv).dot(delta.T) # Compute variance derivative w = delta**2 / diag_sigma j_sigma = np.atleast_2d(self._log_std_approximator.diff(state).T) g_sigma = np.atleast_1d(w.dot(j_sigma)) - np.sum(j_sigma, axis=0) return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights): mu_weights = weights[0:self._mu_approximator.weights_size] log_std_weights = weights[self._mu_approximator.weights_size:] self._mu_approximator.set_weights(mu_weights) self._log_std_approximator.set_weights(log_std_weights)
[docs] def get_weights(self): mu_weights = self._mu_approximator.get_weights() log_std_weights = self._log_std_approximator.get_weights() return np.concatenate((mu_weights, log_std_weights), axis=0)
@property def weights_size(self): return self._mu_approximator.weights_size + \ self._log_std_approximator.weights_size def _compute_multivariate_gaussian(self, state): mu = np.reshape(self._mu_approximator.predict( np.expand_dims(state, axis=0), **self._predict_params), -1) log_std = np.reshape(self._log_std_approximator.predict( np.expand_dims(state, axis=0), **self._predict_params), -1) sigma = np.exp(log_std)**2 return mu, np.diag(sigma)