import mujoco_py
from mujoco_py import functions as mj_fun
import numpy as np
from enum import Enum
from mushroom_rl.environments import Environment, MDPInfo
from mushroom_rl.utils.spaces import Box
import glfw
[docs]class ObservationType(Enum):
"""
An enum indicating the type of data that should be added to the observation
of the environment, can be Joint-/Body-/Site- positions and velocities.
"""
__order__ = "BODY_POS BODY_VEL JOINT_POS JOINT_VEL SITE_POS SITE_VEL"
BODY_POS = 0
BODY_VEL = 1
JOINT_POS = 2
JOINT_VEL = 3
SITE_POS = 4
SITE_VEL = 5
[docs]class MuJoCo(Environment):
"""
Class to create a Mushroom environment using the MuJoCo simulator.
"""
[docs] def __init__(self, file_name, actuation_spec, observation_spec, gamma,
horizon, n_substeps=1, n_intermediate_steps=1, additional_data_spec=None,
collision_groups=None):
"""
Constructor.
Args:
file_name (string): The path to the XML file with which the
environment should be created;
actuation_spec (list): A list specifying the names of the joints
which should be controllable by the agent. Can be left empty
when all actuators should be used;
observation_spec (list): A list containing the names of data that
should be made available to the agent as an observation and
their type (ObservationType). An entry in the list is given by:
(name, type);
gamma (float): The discounting factor of the environment;
horizon (int): The maximum horizon for the environment;
n_substeps (int): The number of substeps to use by the MuJoCo
simulator. An action given by the agent will be applied for
n_substeps before the agent receives the next observation and
can act accordingly;
n_intermediate_steps (int): The number of steps between every action
taken by the agent. Similar to n_substeps but allows the user
to modify, control and access intermediate states.
additional_data_spec (list): A list containing the data fields of
interest, which should be read from or written to during
simulation. The entries are given as the following tuples:
(key, name, type) key is a string for later referencing in the
"read_data" and "write_data" methods. The name is the name of
the object in the XML specification and the type is the
ObservationType;
collision_groups (list): A list containing groups of geoms for
which collisions should be checked during simulation via
``check_collision``. The entries are given as:
``(key, geom_names)``, where key is a string for later
referencing in the "check_collision" method, and geom_names is
a list of geom names in the XML specification.
"""
# Create the simulation
self._sim = self._load_simulation(file_name, n_substeps)
self._n_intermediate_steps = n_intermediate_steps
self._viewer = None
self._state = None
# Read the actuation spec and build the mapping between actions and ids
# as well as their limits
if len(actuation_spec) == 0:
self._action_indices = [i for i in range(0, len(self._sim.model._actuator_name2id))]
else:
self._action_indices = []
for name in actuation_spec:
self._action_indices.append(self._sim.model._actuator_name2id[name])
low = []
high = []
for index in self._action_indices:
if self._sim.model.actuator_ctrllimited[index]:
low.append(self._sim.model.actuator_ctrlrange[index][0])
high.append(self._sim.model.actuator_ctrlrange[index][1])
else:
low.append(-np.inf)
high.append(np.inf)
action_space = Box(np.array(low), np.array(high))
# Read the observation spec to build a mapping at every step. It is
# ensured that the values appear in the order they are specified.
if len(observation_spec) == 0:
raise AttributeError("No Environment observations were specified. "
"Add at least one observation to the observation_spec.")
else:
self._observation_map = observation_spec
# We can only specify limits for the joint positions, all other
# information can be potentially unbounded
low = []
high = []
for name, ot in self._observation_map:
obs_count = len(self._get_observation(name, ot))
if obs_count == 1 and ot == ObservationType.JOINT_POS:
joint_range = self._sim.model.jnt_range[self._sim.model._joint_name2id[name]]
if not(joint_range[0] == joint_range[1] == 0.0):
low.append(joint_range[0])
high.append(joint_range[1])
else:
low.extend(-np.inf)
high.extend(np.inf)
else:
low.extend([-np.inf] * obs_count)
high.extend([np.inf] * obs_count)
observation_space = Box(np.array(low), np.array(high))
# Pre-process the additional data to allow easier writing and reading
# to and from arrays in MuJoCo
self.additional_data = {}
if additional_data_spec is not None:
for key, name, ot in additional_data_spec:
self.additional_data[key] = (name, ot)
# Pre-process the collision groups for "fast" detection of contacts
self.collision_groups = {}
if collision_groups is not None:
for name, geom_names in collision_groups:
self.collision_groups[name] = {self._sim.model._geom_name2id[geom_name]
for geom_name in geom_names}
# Finally, we create the MDP information and call the constructor of
# the parent class
mdp_info = MDPInfo(observation_space, action_space, gamma, horizon)
super().__init__(mdp_info)
[docs] def seed(self, seed):
np.random.seed(seed)
[docs] def reset(self, state=None):
mj_fun.mj_resetData(self._sim.model, self._sim.data)
self._setup()
self._state = self._create_observation()
return self._state
[docs] def step(self, action):
cur_obs = self._state
action = self._preprocess_action(action)
self._step_init(cur_obs, action)
for i in range(self._n_intermediate_steps):
ctrl_action = self._compute_action(action)
self._sim.data.ctrl[self._action_indices] = ctrl_action
self._simulation_pre_step()
self._sim.step()
self._simulation_post_step()
self._state = self._create_observation()
self._step_finalize()
reward = self._reward(cur_obs, action, self._state)
return self._state, reward, self._is_absorbing(self._state), {}
def render(self):
if self._viewer is None:
self._viewer = mujoco_py.MjViewer(self._sim)
self._viewer.render()
[docs] def stop(self):
if self._viewer is not None:
v = self._viewer
self._viewer = None
glfw.destroy_window(v.window)
del v
def _get_observation(self, name, otype):
if otype == ObservationType.BODY_POS:
data = self._sim.data.get_body_xpos(name)
elif otype == ObservationType.BODY_VEL:
data = self._sim.data.get_body_xvelp(name)
elif otype == ObservationType.JOINT_POS:
data = self._sim.data.get_joint_qpos(name)
elif otype == ObservationType.JOINT_VEL:
data = self._sim.data.get_joint_qvel(name)
elif otype == ObservationType.SITE_POS:
data = self._sim.data.get_site_xpos(name)
elif otype == ObservationType.SITE_VEL:
data = self._sim.data.get_site_xvelp(name)
else:
raise ValueError('Invalid observation type')
if hasattr(data, "__len__"):
return data
else:
return [data]
def _create_observation(self):
data_obs = [self._get_observation(name, ot)
for name, ot in self._observation_map]
return np.concatenate(data_obs)
[docs] def _preprocess_action(self, action):
"""
Compute a transformation of the action provided to the
environment.
Args:
action (np.ndarray): numpy array with the actions
provided to the environment.
Returns:
The action to be used for the current step
"""
return action
[docs] def _step_init(self, state, action):
"""
Allows information to be initialized at the start of a step.
"""
pass
[docs] def _compute_action(self, action):
"""
Compute a transformation of the action at every intermediate step.
Useful to add control signals simulated directly in python.
Args:
action (np.ndarray): numpy array with the actions
provided at every step.
Returns:
The action to be set in the actual mujoco simulation.
"""
return action
[docs] def _simulation_pre_step(self):
"""
Allows information to be accesed and changed at every intermediate step
before taking a step in the mujoco simulation.
Can be usefull to apply an external force/torque to the specified bodies.
ex: apply a force over X to the torso:
force = [200, 0, 0]
torque = [0, 0, 0]
self.sim.data.xfrc_applied[self.sim.model._body_name2id["torso"],:] = force + torque
"""
pass
[docs] def _simulation_post_step(self):
"""
Allows information to be accesed at every intermediate step
after taking a step in the mujoco simulation.
Can be usefull to average forces over all intermediate steps.
"""
pass
[docs] def _step_finalize(self):
"""
Allows information to be accesed at the end of a step.
"""
pass
[docs] def _read_data(self, name):
"""
Read data form the MuJoCo data structure.
Args:
name (string): A name referring to an entry contained the
additional_data_spec list handed to the constructor.
Returns:
The desired data as a one-dimensional numpy array.
"""
data_id, otype = self.additional_data[name]
return np.array(self._get_observation(data_id, otype))
[docs] def _write_data(self, name, value):
"""
Write data to the MuJoCo data structure.
Args:
name (string): A name referring to an entry contained in the
additional_data_spec list handed to the constructor;
value (ndarray): The data that should be written.
"""
data_id, otype = self.additional_data[name]
if otype == ObservationType.JOINT_POS:
self._sim.data.set_joint_qpos(data_id, value)
elif otype == ObservationType.JOINT_VEL:
self._sim.data.set_joint_qvel(data_id, value)
else:
data_buffer = self._get_observation(data_id, otype)
data_buffer[:] = value
[docs] def _check_collision(self, group1, group2):
"""
Check for collision between the specified groups.
Args:
group1 (string): A name referring to an entry contained in the
collision_groups list handed to the constructor;
group2 (string): A name referring to an entry contained in the
collision_groups list handed to the constructor.
Returns:
A flag indicating whether a collision occurred between the given
groups or not.
"""
ids1 = self.collision_groups[group1]
ids2 = self.collision_groups[group2]
for coni in range(0, self._sim.data.ncon):
con = self._sim.data.contact[coni]
collision = con.geom1 in ids1 and con.geom2 in ids2
collision_trans = con.geom1 in ids2 and con.geom2 in ids1
if collision or collision_trans:
return True
return False
[docs] def _get_collision_force(self, group1, group2):
"""
Returns the collision force and torques between the specified groups.
Args:
group1 (string): A name referring to an entry contained in the
collision_groups list handed to the constructor;
group2 (string): A name referring to an entry contained in the
collision_groups list handed to the constructor.
Returns:
A 6D vector specifying the collision forces/torques[3D force + 3D torque]
between the given groups. Vector of 0's in case there was no collision.
http://mujoco.org/book/programming.html#siContact
"""
ids1 = self.collision_groups[group1]
ids2 = self.collision_groups[group2]
c_array = np.zeros(6, dtype=np.float64)
for con_i in range(0, self._sim.data.ncon):
con = self._sim.data.contact[con_i]
if (con.geom1 in ids1 and con.geom2 in ids2 or
con.geom1 in ids2 and con.geom2 in ids1):
mujoco_py.functions.mj_contactForce(self._sim.model, self._sim.data,
con_i, c_array)
return c_array
return c_array
[docs] def _reward(self, state, action, next_state):
"""
Compute the reward based on the given transition.
Args:
state (np.array): the current state of the system;
action (np.array): the action that is applied in the current state;
next_state (np.array): the state reached after applying the given
action.
Returns:
The reward as a floating point scalar value.
"""
raise NotImplementedError
[docs] def _is_absorbing(self, state):
"""
Check whether the given state is an absorbing state or not.
Args:
state (np.array): the state of the system.
Returns:
A boolean flag indicating whether this state is absorbing or not.
"""
raise NotImplementedError
[docs] def _setup(self):
"""
A function that allows to execute setup code after an environment
reset.
"""
pass
[docs] def _load_simulation(self, file_name, n_substeps):
"""
Load mujoco model. Can be overridden to provide custom load functions.
Args:
file_name (string): The path to the XML file with which the
environment should be created;
n_substeps (int): The number of substeps to use by the MuJoCo
simulator. An action given by the agent will be applied for
n_substeps before the agent receives the next observation and
can act accordingly.
Returns:
The loaded mujoco model.
"""
return mujoco_py.MjSim(mujoco_py.load_model_from_path(file_name),
nsubsteps=n_substeps)
@property
def _dt(self):
return self._sim.model.opt.timestep