import numpy as np
from .policy import ParametricPolicy
from scipy.stats import multivariate_normal
[docs]class AbstractGaussianPolicy(ParametricPolicy):
"""
Abstract class of Gaussian policies.
"""
[docs] def __call__(self, state, action):
mu, sigma = self._compute_multivariate_gaussian(state)[:2]
return multivariate_normal.pdf(action, mu, sigma)
[docs] def draw_action(self, state):
mu, sigma = self._compute_multivariate_gaussian(state)[:2]
return np.random.multivariate_normal(mu, sigma)
[docs]class GaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy.
This is a differentiable policy for continuous action spaces.
The policy samples an action in every state following a gaussian
distribution, where the mean is computed in the state and the covariance
matrix is fixed.
"""
[docs] def __init__(self, mu, sigma):
"""
Constructor.
Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
sigma (np.ndarray): a square positive definite matrix representing
the covariance matrix. The size of this matrix must be n x n,
where n is the action dimensionality.
"""
self._approximator = mu
self._inv_sigma = np.linalg.inv(sigma)
self._sigma = sigma
self._add_save_attr(
_approximator='mushroom',
_inv_sigma='numpy',
_sigma='numpy'
)
[docs] def set_sigma(self, sigma):
"""
Setter.
Args:
sigma (np.ndarray): the new covariance matrix. Must be a square
positive definite matrix.
"""
self._sigma = sigma
self._inv_sigma = np.linalg.inv(sigma)
[docs] def diff_log(self, state, action):
mu, _, inv_sigma = self._compute_multivariate_gaussian(state)
delta = action - mu
j_mu = self._approximator.diff(state)
if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)
g = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T)
return g
[docs] def set_weights(self, weights):
self._approximator.set_weights(weights)
[docs] def get_weights(self):
return self._approximator.get_weights()
@property
def weights_size(self):
return self._approximator.weights_size
def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._approximator.predict(np.expand_dims(state,
axis=0)), -1)
return mu, self._sigma, self._inv_sigma
[docs]class DiagonalGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, where the diagonal is the squared
standard deviation vector.
This is a differentiable policy for continuous action spaces.
This policy is similar to the gaussian policy, but the weights includes
also the standard deviation.
"""
[docs] def __init__(self, mu, std):
"""
Constructor.
Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
std (np.ndarray): a vector of standard deviations. The length of
this vector must be equal to the action dimensionality.
"""
self._approximator = mu
self._std = std
self._add_save_attr(
_approximator='mushroom',
_std='numpy'
)
[docs] def set_std(self, std):
"""
Setter.
Args:
std (np.ndarray): the new standard deviation. Must be a square
positive definite matrix.
"""
self._std = std
[docs] def diff_log(self, state, action):
mu, _, inv_sigma = self._compute_multivariate_gaussian(state)
delta = action - mu
# Compute mean derivative
j_mu = self._approximator.diff(state)
if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)
g_mu = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T)
# Compute standard deviation derivative
g_sigma = -1. / self._std + delta**2 / self._std**3
return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights):
self._approximator.set_weights(
weights[0:self._approximator.weights_size])
self._std = weights[self._approximator.weights_size:]
[docs] def get_weights(self):
return np.concatenate((self._approximator.get_weights(), self._std),
axis=0)
@property
def weights_size(self):
return self._approximator.weights_size + self._std.size
def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._approximator.predict(np.expand_dims(state,
axis=0)), -1)
sigma = self._std**2
return mu, np.diag(sigma), np.diag(1. / sigma)
[docs]class StateStdGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, where the diagonal is the squared
standard deviation, which is computed for each state.
This is a differentiable policy for continuous action spaces.
This policy is similar to the diagonal gaussian policy, but a parametric
regressor is used to compute the standard deviation, so the standard
deviation depends on the current state.
"""
[docs] def __init__(self, mu, std, eps=1e-6):
"""
Constructor.
Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
std (Regressor): the regressor representing the standard
deviations w.r.t. the state. The output dimensionality of the
regressor must be equal to the action dimensionality;
eps(float, 1e-6): A positive constant added to the variance to
ensure that is always greater than zero.
"""
assert(eps > 0)
self._mu_approximator = mu
self._std_approximator = std
self._eps = eps
self._add_save_attr(
_mu_approximator='mushroom',
_std_approximator='mushroom',
_eps='primitive'
)
[docs] def diff_log(self, state, action):
mu, sigma, std = self._compute_multivariate_gaussian(state)
diag_sigma = np.diag(sigma)
delta = action - mu
# Compute mean derivative
j_mu = self._mu_approximator.diff(state)
if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)
sigma_inv = np.diag(1 / diag_sigma)
g_mu = j_mu.dot(sigma_inv).dot(delta.T)
# Compute variance derivative
w = (delta**2 - diag_sigma) * std / diag_sigma**2
j_sigma = np.atleast_2d(self._std_approximator.diff(state).T)
g_sigma = np.atleast_1d(w.dot(j_sigma))
return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights):
mu_weights = weights[0:self._mu_approximator.weights_size]
std_weights = weights[self._mu_approximator.weights_size:]
self._mu_approximator.set_weights(mu_weights)
self._std_approximator.set_weights(std_weights)
[docs] def get_weights(self):
mu_weights = self._mu_approximator.get_weights()
std_weights = self._std_approximator.get_weights()
return np.concatenate((mu_weights, std_weights), axis=0)
@property
def weights_size(self):
return self._mu_approximator.weights_size + \
self._std_approximator.weights_size
def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._mu_approximator.predict(
np.expand_dims(state, axis=0)), -1)
std = np.reshape(self._std_approximator.predict(
np.expand_dims(state, axis=0)), -1)
sigma = std**2 + self._eps
return mu, np.diag(sigma), std
[docs]class StateLogStdGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, the diagonal is computed by an
exponential transformation of the logarithm of the standard deviation
computed in each state.
This is a differentiable policy for continuous action spaces.
This policy is similar to the State std gaussian policy, but here the
regressor represents the logarithm of the standard deviation.
"""
[docs] def __init__(self, mu, log_std):
"""
Constructor.
Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
log_std (Regressor): a regressor representing the logarithm of the
variance w.r.t. the state. The output dimensionality of the
regressor must be equal to the action dimensionality.
"""
self._mu_approximator = mu
self._log_std_approximator = log_std
self._add_save_attr(
_mu_approximator='mushroom',
_log_std_approximator='mushroom'
)
[docs] def diff_log(self, state, action):
mu, sigma = self._compute_multivariate_gaussian(state)
diag_sigma = np.diag(sigma)
delta = action - mu
# Compute mean derivative
j_mu = self._mu_approximator.diff(state)
if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)
sigma_inv = np.diag(1 / diag_sigma)
g_mu = j_mu.dot(sigma_inv).dot(delta.T)
# Compute variance derivative
w = delta**2 / diag_sigma
j_sigma = np.atleast_2d(self._log_std_approximator.diff(state).T)
g_sigma = np.atleast_1d(w.dot(j_sigma)) - np.sum(j_sigma, axis=0)
return np.concatenate((g_mu, g_sigma), axis=0)
[docs] def set_weights(self, weights):
mu_weights = weights[0:self._mu_approximator.weights_size]
log_std_weights = weights[self._mu_approximator.weights_size:]
self._mu_approximator.set_weights(mu_weights)
self._log_std_approximator.set_weights(log_std_weights)
[docs] def get_weights(self):
mu_weights = self._mu_approximator.get_weights()
log_std_weights = self._log_std_approximator.get_weights()
return np.concatenate((mu_weights, log_std_weights), axis=0)
@property
def weights_size(self):
return self._mu_approximator.weights_size + \
self._log_std_approximator.weights_size
def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._mu_approximator.predict(
np.expand_dims(state, axis=0)), -1)
log_std = np.reshape(self._log_std_approximator.predict(
np.expand_dims(state, axis=0)), -1)
sigma = np.exp(log_std)**2
return mu, np.diag(sigma)