Source code for mushroom_rl.core.core

from mushroom_rl.core.dataset import Dataset
from mushroom_rl.utils.record import VideoRecorder

from ._impl import CoreLogic


[docs]class Core(object): """ Implements the functions to run a generic algorithm. """
[docs] def __init__(self, agent, env, callbacks_fit=None, callback_step=None, record_dictionary=None): """ Constructor. Args: agent (Agent): the agent moving according to a policy; env (Environment): the environment in which the agent moves; callbacks_fit (list): list of callbacks to execute at the end of each fit; callback_step (Callback): callback to execute after each step; record_dictionary (dict, None): a dictionary of parameters for the recording, must containt the recorder_class, fps, and optionally other keyword arguments to be passed to build the recorder class. By default, the VideoRecorder class is used and the environment action frequency as frames per second. """ self.agent = agent self.env = env self.callbacks_fit = callbacks_fit if callbacks_fit is not None else list() self.callback_step = callback_step if callback_step is not None else lambda x: None self._state = None self._policy_state = None self._current_theta = None self._episode_steps = None self._core_logic = CoreLogic() if record_dictionary is None: record_dictionary = dict() self._record = self._build_recorder_class(**record_dictionary)
[docs] def learn(self, n_steps=None, n_episodes=None, n_steps_per_fit=None, n_episodes_per_fit=None, render=False, record=False, quiet=False): """ This function moves the agent in the environment and fits the policy using the collected samples. The agent can be moved for a given number of steps or a given number of episodes and, independently from this choice, the policy can be fitted after a given number of steps or a given number of episodes. The environment is reset at the beginning of the learning process. Args: n_steps (int, None): number of steps to move the agent; n_episodes (int, None): number of episodes to move the agent; n_steps_per_fit (int, None): number of steps between each fit of the policy; n_episodes_per_fit (int, None): number of episodes between each fit of the policy; render (bool, False): whether to render the environment or not; record (bool, False): whether to record a video of the environment or not. If True, also the render flag should be set to True. quiet (bool, False): whether to show the progress bar or not. """ assert (render and record) or (not record), "To record, the render flag must be set to true" self._core_logic.initialize_learn(n_steps_per_fit, n_episodes_per_fit) dataset = Dataset.generate(self.env.info, self.agent.info, n_steps_per_fit, n_episodes_per_fit) self._run(dataset, n_steps, n_episodes, render, quiet, record)
[docs] def evaluate(self, initial_states=None, n_steps=None, n_episodes=None, render=False, quiet=False, record=False): """ This function moves the agent in the environment using its policy. The agent is moved for a provided number of steps, episodes, or from a set of initial states for the whole episode. The environment is reset at the beginning of the learning process. Args: initial_states (np.ndarray, None): the starting states of each episode; n_steps (int, None): number of steps to move the agent; n_episodes (int, None): number of episodes to move the agent; render (bool, False): whether to render the environment or not; quiet (bool, False): whether to show the progress bar or not; record (bool, False): whether to record a video of the environment or not. If True, also the render flag should be set to True. Returns: The collected dataset. """ assert (render and record) or (not record), "To record, the render flag must be set to true" self._core_logic.initialize_evaluate() n_episodes_dataset = len(initial_states) if initial_states is not None else n_episodes dataset = Dataset.generate(self.env.info, self.agent.info, n_steps, n_episodes_dataset) return self._run(dataset, n_steps, n_episodes, render, quiet, record, initial_states)
def _run(self, dataset, n_steps, n_episodes, render, quiet, record, initial_states=None): self._core_logic.initialize_run(n_steps, n_episodes, initial_states, quiet) last = True while self._core_logic.move_required(): if last: self._reset(initial_states) if self.agent.info.is_episodic: dataset.append_theta(self._current_theta) sample, step_info = self._step(render, record) self.callback_step(sample) self._core_logic.after_step(sample[5]) dataset.append(sample, step_info) if self._core_logic.fit_required(): self.agent.fit(dataset) self._core_logic.after_fit() for c in self.callbacks_fit: c(dataset) dataset.clear() last = sample[5] self.agent.stop() self.env.stop() self._end(record) return dataset
[docs] def _step(self, render, record): """ Single step. Args: render (bool): whether to render or not. Returns: A tuple containing the previous state, the action sampled by the agent, the reward obtained, the reached state, the absorbing flag of the reached state and the last step flag. """ action, policy_next_state = self.agent.draw_action(self._state, self._policy_state) next_state, reward, absorbing, step_info = self.env.step(action) if render: frame = self.env.render(record) if record: self._record(frame) self._episode_steps += 1 last = self._episode_steps >= self.env.info.horizon or absorbing state = self._state policy_state = self._policy_state next_state = self._preprocess(next_state) self._state = next_state self._policy_state = policy_next_state return (state, action, reward, next_state, absorbing, last, policy_state, policy_next_state), step_info
[docs] def _reset(self, initial_states): """ Reset the state of the agent. """ initial_state = self._core_logic.get_initial_state(initial_states) state, episode_info = self.env.reset(initial_state) self._policy_state, self._current_theta = self.agent.episode_start(state, episode_info) self._state = self._preprocess(state) self.agent.next_action = None self._episode_steps = 0
def _end(self, record): self._state = None self._policy_state = None self._current_theta = None self._episode_steps = None if record: self._record.stop() self._core_logic.terminate_run()
[docs] def _preprocess(self, state): """ Method to apply state preprocessors. Args: state (np.ndarray): the state to be preprocessed. Returns: The preprocessed state. """ for p in self.agent.core_preprocessors: p.update(state) state = p(state) return state
[docs] def _build_recorder_class(self, recorder_class=None, fps=None, **kwargs): """ Method to create a video recorder class. Args: recorder_class (class): the class used to record the video. By default, we use the ``VideoRecorder`` class from mushroom. The class must implement the ``__call__`` and ``stop`` methods. Returns: The recorder object. """ if not recorder_class: recorder_class = VideoRecorder if not fps: fps = int(1 / self.env.info.dt) return recorder_class(fps=fps, **kwargs)