test_wrappers.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import math
  2. import gym
  3. import numpy as np
  4. import pytest
  5. from gym_minigrid.envs import EmptyEnv
  6. from gym_minigrid.minigrid import MiniGridEnv
  7. from gym_minigrid.wrappers import (
  8. ActionBonus,
  9. DictObservationSpaceWrapper,
  10. FlatObsWrapper,
  11. FullyObsWrapper,
  12. ImgObsWrapper,
  13. OneHotPartialObsWrapper,
  14. ReseedWrapper,
  15. RGBImgObsWrapper,
  16. RGBImgPartialObsWrapper,
  17. StateBonus,
  18. ViewSizeWrapper,
  19. )
  20. from tests.utils import all_testing_env_specs, assert_equals
  21. SEEDS = [100, 243, 500]
  22. NUM_STEPS = 100
  23. @pytest.mark.parametrize(
  24. "env_spec", all_testing_env_specs, ids=[spec.id for spec in all_testing_env_specs]
  25. )
  26. def test_reseed_wrapper(env_spec):
  27. """
  28. Test the ReseedWrapper with a list of SEEDS.
  29. """
  30. unwrapped_env = env_spec.make()
  31. env = env_spec.make()
  32. env = ReseedWrapper(env, seeds=SEEDS)
  33. env.action_space.seed(0)
  34. for seed in SEEDS:
  35. env.reset()
  36. unwrapped_env.reset(seed=seed)
  37. for time_step in range(NUM_STEPS):
  38. action = env.action_space.sample()
  39. obs, rew, done, info = env.step(action)
  40. (
  41. unwrapped_obs,
  42. unwrapped_rew,
  43. unwrapped_done,
  44. unwrapped_info,
  45. ) = unwrapped_env.step(action)
  46. assert_equals(obs, unwrapped_obs, f"[{time_step}] ")
  47. assert unwrapped_env.observation_space.contains(obs)
  48. assert (
  49. rew == unwrapped_rew
  50. ), f"[{time_step}] reward={rew}, unwrapped reward={unwrapped_rew}"
  51. assert (
  52. done == unwrapped_done
  53. ), f"[{time_step}] done={done}, unwrapped done={unwrapped_done}"
  54. assert_equals(info, unwrapped_info, f"[{time_step}] ")
  55. # Start the next seed
  56. if done:
  57. break
  58. env.close()
  59. unwrapped_env.close()
  60. @pytest.mark.parametrize("env_id", ["MiniGrid-Empty-16x16-v0"])
  61. def test_state_bonus_wrapper(env_id):
  62. env = gym.make(env_id)
  63. wrapped_env = StateBonus(gym.make(env_id))
  64. action_forward = MiniGridEnv.Actions.forward
  65. action_left = MiniGridEnv.Actions.left
  66. action_right = MiniGridEnv.Actions.right
  67. for _ in range(10):
  68. wrapped_env.reset()
  69. for _ in range(5):
  70. wrapped_env.step(action_forward)
  71. # Turn lef 3 times (check that actions don't influence bonus)
  72. for _ in range(3):
  73. _, wrapped_rew, _, _ = wrapped_env.step(action_left)
  74. env.reset()
  75. for _ in range(5):
  76. env.step(action_forward)
  77. # Turn right 3 times
  78. for _ in range(3):
  79. _, rew, _, _ = env.step(action_right)
  80. expected_bonus_reward = rew + 1 / math.sqrt(13)
  81. assert expected_bonus_reward == wrapped_rew
  82. @pytest.mark.parametrize("env_id", ["MiniGrid-Empty-16x16-v0"])
  83. def test_action_bonus_wrapper(env_id):
  84. env = gym.make(env_id)
  85. wrapped_env = ActionBonus(gym.make(env_id))
  86. action = MiniGridEnv.Actions.forward
  87. for _ in range(10):
  88. wrapped_env.reset()
  89. for _ in range(5):
  90. _, wrapped_rew, _, _ = wrapped_env.step(action)
  91. env.reset()
  92. for _ in range(5):
  93. _, rew, _, _ = env.step(action)
  94. expected_bonus_reward = rew + 1 / math.sqrt(10)
  95. assert expected_bonus_reward == wrapped_rew
  96. @pytest.mark.parametrize(
  97. "env_spec", all_testing_env_specs, ids=[spec.id for spec in all_testing_env_specs]
  98. )
  99. def test_dict_observation_space_wrapper(env_spec):
  100. env = env_spec.make()
  101. env = DictObservationSpaceWrapper(env)
  102. env.reset()
  103. mission = env.mission
  104. obs, _, _, _ = env.step(0)
  105. assert env.string_to_indices(mission) == [
  106. value for value in obs["mission"] if value != 0
  107. ]
  108. env.close()
  109. @pytest.mark.parametrize(
  110. "wrapper",
  111. [
  112. ReseedWrapper,
  113. ImgObsWrapper,
  114. FlatObsWrapper,
  115. ViewSizeWrapper,
  116. DictObservationSpaceWrapper,
  117. OneHotPartialObsWrapper,
  118. RGBImgPartialObsWrapper,
  119. FullyObsWrapper,
  120. ],
  121. )
  122. @pytest.mark.parametrize(
  123. "env_spec", all_testing_env_specs, ids=[spec.id for spec in all_testing_env_specs]
  124. )
  125. def test_main_wrappers(wrapper, env_spec):
  126. env = env_spec.make()
  127. env = wrapper(env)
  128. for _ in range(10):
  129. env.reset()
  130. env.step(0)
  131. env.close()
  132. @pytest.mark.parametrize(
  133. "wrapper",
  134. [
  135. OneHotPartialObsWrapper,
  136. RGBImgPartialObsWrapper,
  137. FullyObsWrapper,
  138. ],
  139. )
  140. @pytest.mark.parametrize(
  141. "env_spec", all_testing_env_specs, ids=[spec.id for spec in all_testing_env_specs]
  142. )
  143. def test_observation_space_wrappers(wrapper, env_spec):
  144. env = wrapper(env_spec.make(disable_env_checker=True))
  145. obs_space, wrapper_name = env.observation_space, wrapper.__name__
  146. assert isinstance(
  147. obs_space, gym.spaces.Dict
  148. ), f"Observation space for {wrapper_name} is not a Dict: {obs_space}."
  149. # This should not fail either
  150. ImgObsWrapper(env)
  151. env.reset()
  152. env.step(0)
  153. env.close()
  154. class EmptyEnvWithExtraObs(EmptyEnv):
  155. """
  156. Custom environment with an extra observation
  157. """
  158. def __init__(self) -> None:
  159. super().__init__(size=5)
  160. self.observation_space["size"] = gym.spaces.Box(
  161. low=0, high=np.iinfo(np.uint).max, shape=(2,), dtype=np.uint
  162. )
  163. def reset(self, **kwargs):
  164. obs = super().reset(**kwargs)
  165. obs["size"] = np.array([self.width, self.height])
  166. return obs
  167. def step(self, action):
  168. obs, reward, done, info = super().step(action)
  169. obs["size"] = np.array([self.width, self.height])
  170. return obs, reward, done, info
  171. @pytest.mark.parametrize(
  172. "wrapper",
  173. [
  174. OneHotPartialObsWrapper,
  175. RGBImgObsWrapper,
  176. RGBImgPartialObsWrapper,
  177. FullyObsWrapper,
  178. ],
  179. )
  180. def test_agent_sees_method(wrapper):
  181. env1 = wrapper(EmptyEnvWithExtraObs())
  182. env2 = wrapper(gym.make("MiniGrid-Empty-5x5-v0"))
  183. obs1 = env1.reset(seed=0)
  184. obs2 = env2.reset(seed=0)
  185. assert "size" in obs1
  186. assert obs1["size"].shape == (2,)
  187. assert (obs1["size"] == [5, 5]).all()
  188. for key in obs2:
  189. assert np.array_equal(obs1[key], obs2[key])
  190. obs1, reward1, done1, _ = env1.step(0)
  191. obs2, reward2, done2, _ = env2.step(0)
  192. assert "size" in obs1
  193. assert obs1["size"].shape == (2,)
  194. assert (obs1["size"] == [5, 5]).all()
  195. for key in obs2:
  196. assert np.array_equal(obs1[key], obs2[key])