dynamicobstacles.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from operator import add
  2. from typing import Optional
  3. from gymnasium.spaces import Discrete
  4. from minigrid.core.grid import Grid
  5. from minigrid.core.mission import MissionSpace
  6. from minigrid.core.world_object import Ball, Goal
  7. from minigrid.minigrid_env import MiniGridEnv
  8. class DynamicObstaclesEnv(MiniGridEnv):
  9. """
  10. <p>
  11. <img src="https://raw.githubusercontent.com/Farama-Foundation/Minigrid/master/figures/dynamic_obstacles.gif" alt="dynamic_obstacles" width="200px"/>
  12. </p>
  13. ### Description
  14. This environment is an empty room with moving obstacles.
  15. The goal of the agent is to reach the green goal square without colliding
  16. with any obstacle. A large penalty is subtracted if the agent collides with
  17. an obstacle and the episode finishes. This environment is useful to test
  18. Dynamic Obstacle Avoidance for mobile robots with Reinforcement Learning in
  19. Partial Observability.
  20. ### Mission Space
  21. "get to the green goal square"
  22. ### Action Space
  23. | Num | Name | Action |
  24. |-----|--------------|--------------|
  25. | 0 | left | Turn left |
  26. | 1 | right | Turn right |
  27. | 2 | forward | Move forward |
  28. | 3 | pickup | Unused |
  29. | 4 | drop | Unused |
  30. | 5 | toggle | Unused |
  31. | 6 | done | Unused |
  32. ### Observation Encoding
  33. - Each tile is encoded as a 3 dimensional tuple:
  34. `(OBJECT_IDX, COLOR_IDX, STATE)`
  35. - `OBJECT_TO_IDX` and `COLOR_TO_IDX` mapping can be found in
  36. [minigrid/minigrid.py](minigrid/minigrid.py)
  37. - `STATE` refers to the door state with 0=open, 1=closed and 2=locked
  38. ### Rewards
  39. A reward of '1' is given for success, and '0' for failure. A '-1' penalty is
  40. subtracted if the agent collides with an obstacle.
  41. ### Termination
  42. The episode ends if any one of the following conditions is met:
  43. 1. The agent reaches the goal.
  44. 2. The agent collides with an obstacle.
  45. 3. Timeout (see `max_steps`).
  46. ### Registered Configurations
  47. - `MiniGrid-Dynamic-Obstacles-5x5-v0`
  48. - `MiniGrid-Dynamic-Obstacles-Random-5x5-v0`
  49. - `MiniGrid-Dynamic-Obstacles-6x6-v0`
  50. - `MiniGrid-Dynamic-Obstacles-Random-6x6-v0`
  51. - `MiniGrid-Dynamic-Obstacles-8x8-v0`
  52. - `MiniGrid-Dynamic-Obstacles-16x16-v0`
  53. """
  54. def __init__(
  55. self,
  56. size=8,
  57. agent_start_pos=(1, 1),
  58. agent_start_dir=0,
  59. n_obstacles=4,
  60. max_steps: Optional[int] = None,
  61. **kwargs
  62. ):
  63. self.agent_start_pos = agent_start_pos
  64. self.agent_start_dir = agent_start_dir
  65. # Reduce obstacles if there are too many
  66. if n_obstacles <= size / 2 + 1:
  67. self.n_obstacles = int(n_obstacles)
  68. else:
  69. self.n_obstacles = int(size / 2)
  70. mission_space = MissionSpace(mission_func=self._gen_mission)
  71. if max_steps is None:
  72. max_steps = 4 * size**2
  73. super().__init__(
  74. mission_space=mission_space,
  75. grid_size=size,
  76. # Set this to True for maximum speed
  77. see_through_walls=True,
  78. max_steps=max_steps,
  79. **kwargs
  80. )
  81. # Allow only 3 actions permitted: left, right, forward
  82. self.action_space = Discrete(self.actions.forward + 1)
  83. self.reward_range = (-1, 1)
  84. @staticmethod
  85. def _gen_mission():
  86. return "get to the green goal square"
  87. def _gen_grid(self, width, height):
  88. # Create an empty grid
  89. self.grid = Grid(width, height)
  90. # Generate the surrounding walls
  91. self.grid.wall_rect(0, 0, width, height)
  92. # Place a goal square in the bottom-right corner
  93. self.grid.set(width - 2, height - 2, Goal())
  94. # Place the agent
  95. if self.agent_start_pos is not None:
  96. self.agent_pos = self.agent_start_pos
  97. self.agent_dir = self.agent_start_dir
  98. else:
  99. self.place_agent()
  100. # Place obstacles
  101. self.obstacles = []
  102. for i_obst in range(self.n_obstacles):
  103. self.obstacles.append(Ball())
  104. self.place_obj(self.obstacles[i_obst], max_tries=100)
  105. self.mission = "get to the green goal square"
  106. def step(self, action):
  107. # Invalid action
  108. if action >= self.action_space.n:
  109. action = 0
  110. # Check if there is an obstacle in front of the agent
  111. front_cell = self.grid.get(*self.front_pos)
  112. not_clear = front_cell and front_cell.type != "goal"
  113. # Update obstacle positions
  114. for i_obst in range(len(self.obstacles)):
  115. old_pos = self.obstacles[i_obst].cur_pos
  116. top = tuple(map(add, old_pos, (-1, -1)))
  117. try:
  118. self.place_obj(
  119. self.obstacles[i_obst], top=top, size=(3, 3), max_tries=100
  120. )
  121. self.grid.set(old_pos[0], old_pos[1], None)
  122. except Exception:
  123. pass
  124. # Update the agent's position/direction
  125. obs, reward, terminated, truncated, info = super().step(action)
  126. # If the agent tried to walk over an obstacle or wall
  127. if action == self.actions.forward and not_clear:
  128. reward = -1
  129. terminated = True
  130. return obs, reward, terminated, truncated, info
  131. return obs, reward, terminated, truncated, info