Source code for mushroom.environments.generators.taxi

import numpy as np
from sklearn.utils.extmath import cartesian

from mushroom.environments.finite_mdp import FiniteMDP


[docs]def generate_taxi(grid, prob=.9, rew=(0, 1, 3, 15), gamma=.99, horizon=np.inf): """ This Taxi generator requires a .txt file to specify the shape of the grid world and the cells. There are five types of cells: 'S' is the starting where the agent is; 'G' is the goal state; '.' is a normal cell; 'F' is a passenger, when the agent steps on a hole, it picks up it. '#' is a wall, when the agent is supposed to step on a wall, it actually remains in its current state. The initial states distribution is uniform among all the initial states provided. The episode terminates when the agent reaches the goal state. The reward is always 0, except for the goal state where it depends on the number of collected passengers. Each action has a certain probability of success and, if it fails, the agent goes in a perpendicular direction from the supposed one. The grid is expected to be rectangular. This problem is inspired from: "Bayesian Q-Learning". Dearden R. et al.. 1998. Args: grid (str): the path of the file containing the grid structure; prob (float, .9): probability of success of an action; rew (tuple, (0, 1, 3, 15)): rewards obtained in goal states; gamma (float, .99): discount factor; horizon (int, np.inf): the horizon. Returns: A FiniteMDP object built with the provided parameters. """ grid_map, cell_list, passenger_list = parse_grid(grid) assert len(rew) == len(np.argwhere(np.array(grid_map) == 'F')) + 1 p = compute_probabilities(grid_map, cell_list, passenger_list, prob) r = compute_reward(grid_map, cell_list, passenger_list, rew) mu = compute_mu(grid_map, cell_list, passenger_list) return FiniteMDP(p, r, mu, gamma, horizon)
[docs]def parse_grid(grid): """ Parse the grid file: Args: grid (str): the path of the file containing the grid structure. Returns: A list containing the grid structure. """ grid_map = list() cell_list = list() passenger_list = list() with open(grid, 'r') as f: m = f.read() assert 'S' in m and 'G' in m row = list() row_idx = 0 col_idx = 0 for c in m: if c in ['#', '.', 'S', 'G', 'F']: row.append(c) if c in ['.', 'S', 'G', 'F']: cell_list.append([row_idx, col_idx]) if c == 'F': passenger_list.append([row_idx, col_idx]) col_idx += 1 elif c == '\n': grid_map.append(row) row = list() row_idx += 1 col_idx = 0 else: raise ValueError('Unknown marker.') return grid_map, cell_list, passenger_list
[docs]def compute_probabilities(grid_map, cell_list, passenger_list, prob): """ Compute the transition probability matrix. Args: grid_map (list): list containing the grid structure; cell_list (list): list of non-wall cells; passenger_list (list): list of passenger cells; prob (float): probability of success of an action. Returns: The transition probability matrix; """ g = np.array(grid_map) c = np.array(cell_list) n_states = len(cell_list) * 2**len(passenger_list) p = np.zeros((n_states, 4, n_states)) directions = [[-1, 0], [1, 0], [0, -1], [0, 1]] passenger_states = cartesian([[0, 1]] * len(passenger_list)) for i in range(n_states): idx = i // len(cell_list) collected_passengers = np.array( passenger_list)[np.argwhere(passenger_states[idx] == 1).ravel()] state = c[i % len(cell_list)] if g[tuple(state)] in ['.', 'S', 'F']: if g[tuple(state)] in ['F']\ and state.tolist() not in collected_passengers.tolist(): continue for a in range(len(directions)): new_state = state + directions[a] j = np.where((c == new_state).all(axis=1))[0] if j.size > 0: assert j.size == 1 if g[tuple(new_state)] == 'F' and new_state.tolist()\ not in collected_passengers.tolist(): current_passenger_state = np.zeros(len(passenger_list)) current_passenger_idx = np.where( (new_state == passenger_list).all(axis=1))[0] current_passenger_state[current_passenger_idx] = 1 new_passenger_state = passenger_states[ idx] + current_passenger_state new_idx = np.where(( passenger_states == new_passenger_state).all( axis=1))[0] j += len(cell_list) * new_idx else: j += len(cell_list) * idx else: j = i p[i, a, j] = prob for d in [1 - np.abs(directions[a]), np.abs(directions[a]) - 1]: slip_state = state + d k = np.where((c == slip_state).all(axis=1))[0] if k.size > 0: assert k.size == 1 if g[tuple(slip_state)] == 'F' and slip_state.tolist()\ not in collected_passengers.tolist(): current_passenger_state = np.zeros( len(passenger_list)) current_passenger_idx = np.where( (slip_state == passenger_list).all(axis=1))[0] current_passenger_state[current_passenger_idx] = 1 new_passenger_state = passenger_states[ idx] + current_passenger_state new_idx = np.where(( passenger_states == new_passenger_state).all( axis=1))[0] k += len(cell_list) * new_idx else: k += len(cell_list) * idx else: k = i p[i, a, k] += (1. - prob) * .5 return p
[docs]def compute_reward(grid_map, cell_list, passenger_list, rew): """ Compute the reward matrix. Args: grid_map (list): list containing the grid structure; cell_list (list): list of non-wall cells; passenger_list (list): list of passenger cells; rew (tuple): rewards obtained in goal states. Returns: The reward matrix. """ g = np.array(grid_map) c = np.array(cell_list) n_states = len(cell_list) * 2**len(passenger_list) r = np.zeros((n_states, 4, n_states)) directions = [[-1, 0], [1, 0], [0, -1], [0, 1]] passenger_states = cartesian([[0, 1]] * len(passenger_list)) for goal in np.argwhere(g == 'G'): for a in range(len(directions)): prev_state = goal - directions[a] if prev_state in c: for i in range(len(passenger_states)): i_idx = np.where((c == prev_state).all(axis=1))[0] + len( cell_list) * i j_idx = j = np.where((c == goal).all(axis=1))[0] + len( cell_list) * i r[i_idx, a, j_idx] = rew[np.sum(passenger_states[i])] return r
[docs]def compute_mu(grid_map, cell_list, passenger_list): """ Compute the initial states distribution. Args: grid_map (list): list containing the grid structure; cell_list (list): list of non-wall cells; passenger_list (list): list of passenger cells. Returns: The initial states distribution. """ g = np.array(grid_map) c = np.array(cell_list) n_states = len(cell_list) * 2**len(passenger_list) mu = np.zeros(n_states) starts = np.argwhere(g == 'S') for s in starts: i = np.where((c == s).all(axis=1))[0] mu[i] = 1. / len(starts) return mu