Source code for mushroom_rl.algorithms.policy_search.policy_gradient.gpomdp

import numpy as np

from mushroom_rl.algorithms.policy_search.policy_gradient import PolicyGradient


[docs]class GPOMDP(PolicyGradient): """ GPOMDP algorithm. "Infinite-Horizon Policy-Gradient Estimation". Baxter J. and Bartlett P. L.. 2001. """
[docs] def __init__(self, mdp_info, policy, learning_rate, features=None): super().__init__(mdp_info, policy, learning_rate, features) self.sum_d_log_pi = None self.list_sum_d_log_pi = list() self.list_sum_d_log_pi_ep = list() self.list_reward = list() self.list_reward_ep = list() self.baseline_num = list() self.baseline_den = list() self.step_count = 0 self._add_save_attr( sum_d_log_pi='numpy', list_sum_d_log_pi='pickle', list_sum_d_log_pi_ep='pickle', list_reward='pickle', list_reward_ep='pickle', baseline_num='pickle', baseline_den='pickle', step_count='numpy' ) # Ignore divide by zero np.seterr(divide='ignore', invalid='ignore')
[docs] def _compute_gradient(self, J): gradient = np.zeros(self.policy.weights_size) n_episodes = len(self.list_sum_d_log_pi_ep) for i in range(n_episodes): list_sum_d_log_pi = self.list_sum_d_log_pi_ep[i] list_reward = self.list_reward_ep[i] n_steps = len(list_sum_d_log_pi) for t in range(n_steps): step_grad = list_sum_d_log_pi[t] step_reward = list_reward[t] baseline = self.baseline_num[t] / self.baseline_den[t] baseline[np.logical_not(np.isfinite(baseline))] = 0. gradient += (step_reward - baseline) * step_grad gradient /= n_episodes self.list_reward_ep = list() self.list_sum_d_log_pi_ep = list() self.baseline_num = list() self.baseline_den = list() return gradient,
[docs] def _step_update(self, x, u, r): discounted_reward = self.df * r self.list_reward.append(discounted_reward) d_log_pi = self.policy.diff_log(x, u) self.sum_d_log_pi += d_log_pi self.list_sum_d_log_pi.append(self.sum_d_log_pi) squared_sum_d_log_pi = np.square(self.sum_d_log_pi) if self.step_count < len(self.baseline_num): self.baseline_num[ self.step_count] += discounted_reward * squared_sum_d_log_pi self.baseline_den[self.step_count] += squared_sum_d_log_pi else: self.baseline_num.append(discounted_reward * squared_sum_d_log_pi) self.baseline_den.append(squared_sum_d_log_pi) self.step_count += 1
[docs] def _episode_end_update(self): self.list_reward_ep.append(self.list_reward) self.list_reward = list() self.list_sum_d_log_pi_ep.append(self.list_sum_d_log_pi) self.list_sum_d_log_pi = list()
[docs] def _init_update(self): self.sum_d_log_pi = np.zeros(self.policy.weights_size) self.list_sum_d_log_pi = list() self.step_count = 0