|
@@ -788,3 +788,79 @@ class StochasticActionWrapper(ActionWrapper):
|
|
return self.np_random.integers(0, high=6)
|
|
return self.np_random.integers(0, high=6)
|
|
else:
|
|
else:
|
|
return self.random_action
|
|
return self.random_action
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class NoDeath(Wrapper):
|
|
|
|
+ """
|
|
|
|
+ Wrapper to prevent death in specific cells (e.g., lava cells).
|
|
|
|
+ Instead of dying, the agent will receive a negative reward.
|
|
|
|
+
|
|
|
|
+ Example:
|
|
|
|
+ >>> import gymnasium as gym
|
|
|
|
+ >>> from minigrid.wrappers import NoDeath
|
|
|
|
+ >>>
|
|
|
|
+ >>> env = gym.make("MiniGrid-LavaCrossingS9N1-v0")
|
|
|
|
+ >>> _, _ = env.reset(seed=2)
|
|
|
|
+ >>> _, _, _, _, _ = env.step(1)
|
|
|
|
+ >>> _, reward, term, *_ = env.step(2)
|
|
|
|
+ >>> reward, term
|
|
|
|
+ (0, True)
|
|
|
|
+ >>>
|
|
|
|
+ >>> env = NoDeath(env, no_death_types=("lava",), death_cost=-1.0)
|
|
|
|
+ >>> _, _ = env.reset(seed=2)
|
|
|
|
+ >>> _, _, _, _, _ = env.step(1)
|
|
|
|
+ >>> _, reward, term, *_ = env.step(2)
|
|
|
|
+ >>> reward, term
|
|
|
|
+ (-1.0, False)
|
|
|
|
+ >>>
|
|
|
|
+ >>>
|
|
|
|
+ >>> env = gym.make("MiniGrid-Dynamic-Obstacles-5x5-v0")
|
|
|
|
+ >>> _, _ = env.reset(seed=2)
|
|
|
|
+ >>> _, reward, term, *_ = env.step(2)
|
|
|
|
+ >>> reward, term
|
|
|
|
+ (-1, True)
|
|
|
|
+ >>>
|
|
|
|
+ >>> env = NoDeath(env, no_death_types=("ball",), death_cost=-1.0)
|
|
|
|
+ >>> _, _ = env.reset(seed=2)
|
|
|
|
+ >>> _, reward, term, *_ = env.step(2)
|
|
|
|
+ >>> reward, term
|
|
|
|
+ (-2.0, False)
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ def __init__(self, env, no_death_types: tuple[str, ...], death_cost: float = -1.0):
|
|
|
|
+ """A wrapper to prevent death in specific cells.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ env: The environment to apply the wrapper
|
|
|
|
+ no_death_types: List of strings to identify death cells
|
|
|
|
+ death_cost: The negative reward received in death cells
|
|
|
|
+
|
|
|
|
+ """
|
|
|
|
+ assert "goal" not in no_death_types, "goal cannot be a death cell"
|
|
|
|
+
|
|
|
|
+ super().__init__(env)
|
|
|
|
+ self.death_cost = death_cost
|
|
|
|
+ self.no_death_types = no_death_types
|
|
|
|
+
|
|
|
|
+ def step(self, action):
|
|
|
|
+ # In Dynamic-Obstacles, obstacles move after the agent moves,
|
|
|
|
+ # so we need to check for collision before self.env.step()
|
|
|
|
+ front_cell = self.grid.get(*self.front_pos)
|
|
|
|
+ going_to_death = (
|
|
|
|
+ action == self.actions.forward
|
|
|
|
+ and front_cell is not None
|
|
|
|
+ and front_cell.type in self.no_death_types
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ obs, reward, terminated, truncated, info = self.env.step(action)
|
|
|
|
+
|
|
|
|
+ # We also check if the agent stays in death cells (e.g., lava)
|
|
|
|
+ # without moving
|
|
|
|
+ current_cell = self.grid.get(*self.agent_pos)
|
|
|
|
+ in_death = current_cell is not None and current_cell.type in self.no_death_types
|
|
|
|
+
|
|
|
|
+ if terminated and (going_to_death or in_death):
|
|
|
|
+ terminated = False
|
|
|
|
+ reward += self.death_cost
|
|
|
|
+
|
|
|
|
+ return obs, reward, terminated, truncated, info
|