Source code for mushroom_rl.environments.habitat_env

import warnings
import os

with warnings.catch_warnings():

    if 'VERBOSE_HABITAT' not in os.environ: # To suppress Habitat messages
        os.environ['MAGNUM_LOG'] = 'quiet'
        os.environ['GLOG_minloglevel'] = '2'
        os.environ['HABITAT_SIM_LOG'] = 'quiet'
    else:
        os.environ['GLOG_minloglevel'] = '0'
        os.environ['MAGNUM_LOG'] = 'verbose'
        os.environ['MAGNUM_GPU_VALIDATION'] = 'ON'

    warnings.filterwarnings("ignore", category=DeprecationWarning)
    import habitat
    from habitat_baselines.config.default import get_config
    from habitat_baselines.common.environments import get_env_class
    from habitat_baselines.utils.env_utils import make_env_fn
    from habitat.utils.visualizations.utils import observations_to_image
    from habitat.datasets.utils import get_action_shortest_path
    from habitat_sim.errors import GreedyFollowerError

import gym
import numpy as np

from mushroom_rl.core import Environment, MDPInfo
from mushroom_rl.environments import Gym
from mushroom_rl.rl_utils.spaces import Discrete, Box
from mushroom_rl.utils.viewer import ImageViewer


[docs]class HabitatNavigationWrapper(gym.Wrapper): """ Use it for navigation tasks, where the agent has to go from point A to B. Action is discrete: stop, turn left, turn right, move forward. The amount of degrees / distance the agent turns / moves is defined in the YAML file. 'STOP' ends the episode and the agent must do it to get success rewards. For example, if the agent successfully completes the task but does not do 'STOP' it will not get the success reward. The observation is the RGB agent's view of what it sees in front of itself. We also add the agent's true (x,y) position to the 'info' dictionary. """
[docs] def __init__(self, env): gym.Wrapper.__init__(self, env) self.action_space = self.env.action_space self.observation_space = self.env.observation_space['rgb'] self._last_full_obs = None # For rendering
[docs] def reset(self): return np.asarray(self.env.reset()['rgb'])
def get_position(self): return self.env._env._sim.get_agent_state().position
[docs] def step(self, action): obs, rwd, done, info = self.env.step(**{'action': action[0]}) info.update({'position': self.get_position()}) obs = np.asarray(obs['rgb']) self._last_full_obs = obs return obs, rwd, done, info
[docs] def get_shortest_path(self): ''' Returns observations and actions corresponding to the shortest path to the goal. If the goal cannot be reached within the episode steps limit, the best path (closest to the goal) will be returned. ''' max_steps = self.unwrapped._env._max_episode_steps - 1 try: shortest_path = [ get_action_shortest_path( self.unwrapped._env._sim, source_position=self.unwrapped._env._dataset.episodes[0].start_position, source_rotation=self.unwrapped._env._dataset.episodes[0].start_rotation, goal_position=self.unwrapped._env._dataset.episodes[0].goals[0].position, success_distance=self.unwrapped._core_env_config.TASK.SUCCESS_DISTANCE, max_episode_steps=max_steps, ) ][0] self.reset() # get_action_shortest_path changes the agent state actions = [p.action for p in shortest_path] obs = [self.unwrapped._env.sim.get_observations_at(p.position, p.rotation)['rgb'] for p in shortest_path] shortest_steps = len(actions) if shortest_steps == max_steps: print('WARNING! Shortest path not found with the given steps limit ({steps}).'.format(steps=max_steps), 'Returning best path.') else: print('Shortest path found: {steps} steps.'.format(steps=shortest_steps)) return obs, actions except GreedyFollowerError: print('WARNING! Cannot find shortest path (GreedyFollowerError).') return None, None
[docs] def get_optimal_policy_return(self): ''' Returns the undiscounted sum of rewards of the optimal policy. ''' _, actions = self.get_shortest_path() self.reset() rwd_sum = 0. for a in actions: _, rwd, done, _ = self.step([a-1]) rwd_sum += rwd if done: break self.reset() return rwd_sum
[docs]class HabitatRearrangeWrapper(gym.Wrapper): """ Use it for the rearrange task, where the robot has to interact with objects. There are several 'rearrange' tasks, such as 'pick', 'place', 'open X', 'close X', where X can be a door, the fridge, ... Each task has its own actions, observations, rewards, and terminal conditions. Please check Habitat 2.0 paper for more details https://arxiv.org/pdf/2106.14405.pdf This wrapper, instead, uses a common observation / action space. The observation is the RGB image returned by the sensor mounted on the head of the robot. We also return the end-effector position in the 'info' dictionary. The action is mixed. - The first elements of the action vector are continuous values for velocity control of the arm's joint. - The last element is a scalar value for picking / placing an object: if this scalar is positive and the gripper is not currently holding an object and the end-effector is within 15cm of an object, then the object closest to the end-effector is grasped; if the scalar is negative and the gripper is carrying an object, the object is released. - The last element is a scalar value for stopping the robot. This action ends the episode, and allows the agent to get a positive reward if the task has been completed. """
[docs] def __init__(self, env): gym.Wrapper.__init__(self, env) self.arm_ac_size = env.action_space['ARM_ACTION']['arm_action'].shape[0] self.grip_ac_size = env.action_space['ARM_ACTION']['grip_action'].shape[0] self.n_actions = self.arm_ac_size + self.grip_ac_size low = np.array([0.] * self.arm_ac_size + [-1.] * self.grip_ac_size) high = np.ones((self.arm_ac_size + self.grip_ac_size)) self.action_space = Box(low=low, high=high, shape=(self.n_actions,)) self.observation_space = self.env.observation_space['robot_head_rgb'] self._last_full_obs = None # For rendering
[docs] def reset(self): return np.asarray(self.env.reset()['robot_head_rgb'])
def get_ee_position(self): return self.env._env._sim.robot.ee_transform.translation
[docs] def step(self, action): action = {'action': 'ARM_ACTION', 'action_args': {'arm_action': action[:-self.grip_ac_size], 'grip_action': action[-self.grip_ac_size:]}} obs, rwd, done, info = self.env.step(**{'action': action}) info.update({'ee_position': np.asarray(obs['ee_pos'])}) obs = np.asarray(obs['robot_head_rgb']) self._last_full_obs = obs return obs, rwd, done, info
[docs]class Habitat(Gym): """ Interface for Habitat RL environments. This class is very generic and can be used for many Habitat task. Depending on the robot / task, you have to use different wrappers, since observation and action spaces may vary. See <MUSHROOM_RL PATH>/examples/habitat/ for more details. """
[docs] def __init__(self, wrapper, config_file, base_config_file=None, horizon=None, gamma=0.99, width=None, height=None): """ Constructor. For more details on how to pass YAML configuration files, please see <MUSHROOM_RL PATH>/examples/habitat/README.md Args: wrapper (str): wrapper for converting observations and actions (e.g., HabitatRearrangeWrapper); config_file (str): path to the YAML file specifying the RL task configuration (see <HABITAT_LAB PATH>/habitat_baselines/configs/); base_config_file (str, None): path to an optional YAML file, used as 'BASE_TASK_CONFIG_PATH' in the first YAML (see <HABITAT_LAB PATH>/configs/); horizon (int, None): the horizon; gamma (float, 0.99): the discount factor; width (int, None): width of the pixel observation. If None, the value specified in the config file is used. height (int, None): height of the pixel observation. If None, the value specified in the config file is used. """ # MDP creation self._not_pybullet = False self._first = True if base_config_file is None: base_config_file = config_file config = get_config(config_paths=config_file, opts=['BASE_TASK_CONFIG_PATH', base_config_file]) config.defrost() if horizon is None: horizon = config.TASK_CONFIG.ENVIRONMENT.MAX_EPISODE_STEPS # Get the default horizon config.TASK_CONFIG.ENVIRONMENT.MAX_EPISODE_STEPS = horizon + 1 # Hack to ignore gym time limit # Overwrite all RGB width / height used for the TASK (not SIMULATOR) for k in config['TASK_CONFIG']['SIMULATOR']: if 'rgb' in k.lower(): if height is not None: config['TASK_CONFIG']['SIMULATOR'][k]['HEIGHT'] = height if width is not None: config['TASK_CONFIG']['SIMULATOR'][k]['WIDTH'] = width config.freeze() env_class = get_env_class(config.ENV_NAME) env = make_env_fn(env_class=env_class, config=config) env = globals()[wrapper](env) self.env = env self._img_size = env.observation_space.shape[0:2] # MDP properties dt = 1/10 action_space = self.env.action_space observation_space = Box( low=0., high=255., shape=(3, self._img_size[1], self._img_size[0])) mdp_info = MDPInfo(observation_space, action_space, gamma, horizon, dt) if isinstance(action_space, Discrete): self._convert_action = lambda a: a[0] else: self._convert_action = lambda a: a self._viewer = ImageViewer((self._img_size[1], self._img_size[0]), dt) Environment.__init__(self, mdp_info)
[docs] def reset(self, state=None): assert state is None, 'Cannot set Habitat state' obs = self._convert_observation(np.atleast_1d(self.env.reset())) return obs, {}
[docs] def step(self, action): action = self._convert_action(action) obs, reward, absorbing, info = self.env.step(action) return self._convert_observation(np.atleast_1d(obs)), reward, absorbing, info
[docs] def stop(self): self._viewer.close()
[docs] def render(self, record=False): frame = observations_to_image( self.env._last_full_obs, self.env.unwrapped._env.get_metrics() ) self._viewer.display(frame) if record: return frame else: return None
@staticmethod def _convert_observation(observation): return observation.transpose((2, 0, 1)) @staticmethod def root_path(): return os.path.dirname(os.path.dirname(habitat.__file__))