# Source code for mushroom_rl.policy.gaussian_policy

import numpy as np

from .policy import ParametricPolicy
from scipy.stats import multivariate_normal

[docs]class AbstractGaussianPolicy(ParametricPolicy):
"""
Abstract class of Gaussian policies.

"""
[docs]    def __call__(self, state, action):
mu, sigma = self._compute_multivariate_gaussian(state)[:2]

return multivariate_normal.pdf(action, mu, sigma)

[docs]    def draw_action(self, state):
mu, sigma = self._compute_multivariate_gaussian(state)[:2]

return np.random.multivariate_normal(mu, sigma)

[docs]class GaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy.
This is a differentiable policy for continuous action spaces.
The policy samples an action in every state following a gaussian
distribution, where the mean is computed in the state and the covariance
matrix is fixed.

"""
[docs]    def __init__(self, mu, sigma):
"""
Constructor.

Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
sigma (np.ndarray): a square positive definite matrix representing
the covariance matrix. The size of this matrix must be n x n,
where n is the action dimensionality.

"""
self._approximator = mu
self._inv_sigma = np.linalg.inv(sigma)
self._sigma = sigma

_approximator='mushroom',
_inv_sigma='numpy',
_sigma='numpy'
)

[docs]    def set_sigma(self, sigma):
"""
Setter.

Args:
sigma (np.ndarray): the new covariance matrix. Must be a square
positive definite matrix.

"""
self._sigma = sigma
self._inv_sigma = np.linalg.inv(sigma)

[docs]    def diff_log(self, state, action):

mu, _, inv_sigma = self._compute_multivariate_gaussian(state)

delta = action - mu

j_mu = self._approximator.diff(state)

if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)

g = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T)

return g

[docs]    def set_weights(self, weights):
self._approximator.set_weights(weights)

[docs]    def get_weights(self):
return self._approximator.get_weights()

@property
def weights_size(self):
return self._approximator.weights_size

def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._approximator.predict(np.expand_dims(state,
axis=0)), -1)

return mu, self._sigma, self._inv_sigma

[docs]class DiagonalGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, where the diagonal is the squared
standard deviation vector.
This is a differentiable policy for continuous action spaces.
This policy is similar to the gaussian policy, but the weights includes
also the standard deviation.

"""
[docs]    def __init__(self, mu, std):
"""
Constructor.

Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
std (np.ndarray): a vector of standard deviations. The length of
this vector must be equal to the action dimensionality.

"""
self._approximator = mu
self._std = std

_approximator='mushroom',
_std='numpy'
)

[docs]    def set_std(self, std):
"""
Setter.

Args:
std (np.ndarray): the new standard deviation. Must be a square
positive definite matrix.

"""
self._std = std

[docs]    def diff_log(self, state, action):
mu, _, inv_sigma = self._compute_multivariate_gaussian(state)

delta = action - mu

# Compute mean derivative
j_mu = self._approximator.diff(state)

if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)

g_mu = .5 * j_mu.dot(inv_sigma + inv_sigma.T).dot(delta.T)

# Compute standard deviation derivative
g_sigma = -1. / self._std + delta**2 / self._std**3

return np.concatenate((g_mu, g_sigma), axis=0)

[docs]    def set_weights(self, weights):
self._approximator.set_weights(
weights[0:self._approximator.weights_size])
self._std = weights[self._approximator.weights_size:]

[docs]    def get_weights(self):
return np.concatenate((self._approximator.get_weights(), self._std),
axis=0)

@property
def weights_size(self):
return self._approximator.weights_size + self._std.size

def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._approximator.predict(np.expand_dims(state,
axis=0)), -1)

sigma = self._std**2

return mu, np.diag(sigma), np.diag(1. / sigma)

[docs]class StateStdGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, where the diagonal is the squared
standard deviation, which is computed for each state.
This is a differentiable policy for continuous action spaces.
This policy is similar to the diagonal gaussian policy, but a parametric
regressor is used to compute the standard deviation, so the standard
deviation depends on the current state.

"""
[docs]    def __init__(self, mu, std, eps=1e-6):
"""
Constructor.

Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
std (Regressor): the regressor representing the standard
deviations w.r.t. the state. The output dimensionality of the
regressor must be equal to the action dimensionality;
eps(float, 1e-6): A positive constant added to the variance to
ensure that is always greater than zero.

"""
assert(eps > 0)

self._mu_approximator = mu
self._std_approximator = std
self._eps = eps

_mu_approximator='mushroom',
_std_approximator='mushroom',
_eps='primitive'
)

[docs]    def diff_log(self, state, action):

mu, sigma, std = self._compute_multivariate_gaussian(state)
diag_sigma = np.diag(sigma)

delta = action - mu

# Compute mean derivative
j_mu = self._mu_approximator.diff(state)

if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)

sigma_inv = np.diag(1 / diag_sigma)

g_mu = j_mu.dot(sigma_inv).dot(delta.T)

# Compute variance derivative
w = (delta**2 - diag_sigma) * std / diag_sigma**2
j_sigma = np.atleast_2d(self._std_approximator.diff(state).T)
g_sigma = np.atleast_1d(w.dot(j_sigma))

return np.concatenate((g_mu, g_sigma), axis=0)

[docs]    def set_weights(self, weights):
mu_weights = weights[0:self._mu_approximator.weights_size]
std_weights = weights[self._mu_approximator.weights_size:]

self._mu_approximator.set_weights(mu_weights)
self._std_approximator.set_weights(std_weights)

[docs]    def get_weights(self):
mu_weights = self._mu_approximator.get_weights()
std_weights = self._std_approximator.get_weights()

return np.concatenate((mu_weights, std_weights), axis=0)

@property
def weights_size(self):
return self._mu_approximator.weights_size + \
self._std_approximator.weights_size

def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._mu_approximator.predict(
np.expand_dims(state, axis=0)), -1)

std = np.reshape(self._std_approximator.predict(
np.expand_dims(state, axis=0)), -1)

sigma = std**2 + self._eps

return mu, np.diag(sigma), std

[docs]class StateLogStdGaussianPolicy(AbstractGaussianPolicy):
"""
Gaussian policy with learnable standard deviation.
The Covariance matrix is
constrained to be a diagonal matrix, the diagonal is computed by an
exponential transformation of the logarithm of the standard deviation
computed in each state.
This is a differentiable policy for continuous action spaces.
This policy is similar to the State std gaussian policy, but here the
regressor represents the logarithm of the standard deviation.

"""
[docs]    def __init__(self, mu, log_std):
"""
Constructor.

Args:
mu (Regressor): the regressor representing the mean w.r.t. the
state;
log_std (Regressor): a regressor representing the logarithm of the
variance w.r.t. the state. The output dimensionality of the
regressor must be equal to the action dimensionality.

"""
self._mu_approximator = mu
self._log_std_approximator = log_std

_mu_approximator='mushroom',
_log_std_approximator='mushroom'
)

[docs]    def diff_log(self, state, action):

mu, sigma = self._compute_multivariate_gaussian(state)
diag_sigma = np.diag(sigma)

delta = action - mu

# Compute mean derivative
j_mu = self._mu_approximator.diff(state)

if len(j_mu.shape) == 1:
j_mu = np.expand_dims(j_mu, axis=1)

sigma_inv = np.diag(1 / diag_sigma)

g_mu = j_mu.dot(sigma_inv).dot(delta.T)

# Compute variance derivative
w = delta**2 / diag_sigma
j_sigma = np.atleast_2d(self._log_std_approximator.diff(state).T)
g_sigma = np.atleast_1d(w.dot(j_sigma)) - np.sum(j_sigma, axis=0)

return np.concatenate((g_mu, g_sigma), axis=0)

[docs]    def set_weights(self, weights):
mu_weights = weights[0:self._mu_approximator.weights_size]
log_std_weights = weights[self._mu_approximator.weights_size:]

self._mu_approximator.set_weights(mu_weights)
self._log_std_approximator.set_weights(log_std_weights)

[docs]    def get_weights(self):
mu_weights = self._mu_approximator.get_weights()
log_std_weights = self._log_std_approximator.get_weights()

return np.concatenate((mu_weights, log_std_weights), axis=0)

@property
def weights_size(self):
return self._mu_approximator.weights_size + \
self._log_std_approximator.weights_size

def _compute_multivariate_gaussian(self, state):
mu = np.reshape(self._mu_approximator.predict(
np.expand_dims(state, axis=0)), -1)

log_std = np.reshape(self._log_std_approximator.predict(
np.expand_dims(state, axis=0)), -1)

sigma = np.exp(log_std)**2

return mu, np.diag(sigma)