import numpy as np
import mujoco
from mushroom_rl.rl_utils.spaces import Box
from mushroom_rl.environments.mujoco_envs.air_hockey.base import AirHockeyBase
from mushroom_rl.utils.mujoco import forward_kinematics
[docs]class AirHockeySingle(AirHockeyBase):
"""
Base class for single agent air hockey tasks.
"""
[docs] def __init__(self, gamma=0.99, horizon=120, env_noise=False, obs_noise=False, timestep=1 / 240.,
n_intermediate_steps=1, **viewer_params):
"""
Constructor.
"""
self.init_state = np.array([-0.9273, 0.9273, np.pi / 2])
super().__init__(gamma=gamma, horizon=horizon, env_noise=env_noise, n_agents=1, obs_noise=obs_noise,
timestep=timestep, n_intermediate_steps=n_intermediate_steps, **viewer_params)
# Remove z position and quaternion from puck pos
self.obs_helper.remove_obs("puck_pos", 2)
self.obs_helper.remove_obs("puck_pos", 3)
self.obs_helper.remove_obs("puck_pos", 4)
self.obs_helper.remove_obs("puck_pos", 5)
self.obs_helper.remove_obs("puck_pos", 6)
# Remove linear z velocity and angular velocity around x and y
self.obs_helper.remove_obs("puck_vel", 2)
self.obs_helper.remove_obs("puck_vel", 3)
self.obs_helper.remove_obs("puck_vel", 4)
self.obs_helper.add_obs("collision_robot_1_puck", 1, 0, 1)
self.obs_helper.add_obs("collision_short_sides_rim_puck", 1, 0, 1)
self._mdp_info.observation_space = Box(*self.obs_helper.get_obs_limits())
self.has_hit = False
self.has_bounce = False
[docs] def get_puck(self, obs):
"""
Getting the puck properties from the observations
Args:
obs: The current observation
Returns:
([pos_x, pos_y], [lin_vel_x, lin_vel_y], ang_vel_z)
"""
puck_pos = self.obs_helper.get_from_obs(obs, "puck_pos")
puck_lin_vel = self.obs_helper.get_from_obs(obs, "puck_vel")[1:]
puck_ang_vel = self.obs_helper.get_from_obs(obs, "puck_vel")[0]
return puck_pos, puck_lin_vel, puck_ang_vel
[docs] def get_ee(self):
"""
Getting the ee properties from the current internal state
Returns:
([pos_x, pos_y, pos_z], [ang_vel_x, ang_vel_y, ang_vel_z, lin_vel_x, lin_vel_y, lin_vel_z])
"""
ee_pos = self._read_data("robot_1/ee_pos")
ee_vel = self._read_data("robot_1/ee_vel")
return ee_pos, ee_vel
[docs] def _modify_observation(self, obs):
new_obs = obs.copy()
self._puck_2d_in_robot_frame(self.obs_helper.get_from_obs(new_obs, "puck_pos"), self.agents[0]["frame"])
self._puck_2d_in_robot_frame(self.obs_helper.get_from_obs(new_obs, "puck_vel"), self.agents[0]["frame"], type='vel')
if self.obs_noise:
self.obs_helper.get_from_obs(new_obs, "puck_pos")[:] += np.random.randn(2) * 0.001
return new_obs
[docs] def setup(self, obs):
self.has_hit = False
self.has_bounce = False
for i in range(3):
self._data.joint("planar_robot_1/joint_" + str(i+1)).qpos = self.init_state[i]
super().setup(obs)
mujoco.mj_fwdPosition(self._model, self._data)
[docs] def _simulation_post_step(self):
if not self.has_hit:
self.has_hit = self._check_collision("puck", "robot_1/ee")
if not self.has_bounce:
self.has_bounce = self._check_collision("puck", "rim_short_sides")
[docs] def _create_observation(self, state):
obs = super(AirHockeySingle, self)._create_observation(state)
return np.append(obs, [self.has_hit, self.has_bounce])
[docs] def _create_info_dictionary(self, obs):
constraints = {}
q_pos = self.obs_helper.get_joint_pos_from_obs(obs)
q_vel = self.obs_helper.get_joint_vel_from_obs(obs)
x_pos, _ = forward_kinematics(self.robot_model, self.robot_data, q_pos, "planar_robot_1/body_ee")
# Translate to table space
ee_pos = x_pos + self.agents[0]["frame"][:3, 3]
# ee_constraint: force the ee to stay within the bounds of the table
# 1 Dimension on x direction: x > x_lb
# 2 Dimension on y direction: y > y_lb, y < y_ub
x_lb = - (self.env_spec['table']['length'] / 2 + self.env_spec['mallet']['radius'])
y_lb = - (self.env_spec['table']['width'] / 2 - self.env_spec['mallet']['radius'])
y_ub = (self.env_spec['table']['width'] / 2 - self.env_spec['mallet']['radius'])
constraints["ee_constraints"] = np.array([-ee_pos[0] + x_lb,
-ee_pos[1] + y_lb, ee_pos[1] - y_ub])
# joint_pos_constraint: stay within the robots joint position limits
constraints["joint_pos_constraints"] = np.zeros(6)
constraints["joint_pos_constraints"][:3] = q_vel - self.obs_helper.get_joint_pos_limits()[1]
constraints["joint_pos_constraints"][3:] = self.obs_helper.get_joint_pos_limits()[0] - q_vel
# joint_vel_constraint: stay within the robots joint velocity limits
constraints["joint_vel_constraints"] = np.zeros(6)
constraints["joint_vel_constraints"][:3] = q_vel - self.obs_helper.get_joint_vel_limits()[1]
constraints["joint_vel_constraints"][3:] = self.obs_helper.get_joint_vel_limits()[0] - q_vel
return constraints