roomgrid.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. from .minigrid import *
  2. def reject_next_to(env, pos):
  3. """
  4. Function to filter out object positions that are right next to
  5. the agent's starting point
  6. """
  7. sx, sy = env.start_pos
  8. x, y = pos
  9. d = abs(sx - x) + abs(sy - y)
  10. return d < 2
  11. class Room:
  12. def __init__(
  13. self,
  14. top,
  15. size
  16. ):
  17. # Top-left corner and size (tuples)
  18. self.top = top
  19. self.size = size
  20. # List of door objects and door positions
  21. # Order of the doors is right, down, left, up
  22. self.doors = [None] * 4
  23. self.door_pos = [None] * 4
  24. # List of rooms adjacent to this one
  25. # Order of the neighbors is right, down, left, up
  26. self.neighbors = [None] * 4
  27. # Indicates if this room is behind a locked door
  28. self.locked = False
  29. # List of objects contained
  30. self.objs = []
  31. def rand_pos(self, env):
  32. topX, topY = self.top
  33. sizeX, sizeY = self.size
  34. return env._randPos(
  35. topX + 1, topX + sizeX - 1,
  36. topY + 1, topY + sizeY - 1
  37. )
  38. def pos_inside(self, x, y):
  39. """
  40. Check if a position is within the bounds of this room
  41. """
  42. topX, topY = self.top
  43. sizeX, sizeY = self.size
  44. if x < topX or y < topY:
  45. return False
  46. if x >= topX + sizeX or y >= topY + sizeY:
  47. return False
  48. return True
  49. class RoomGrid(MiniGridEnv):
  50. """
  51. Environment with multiple rooms and random objects.
  52. This is meant to serve as a base class for other environments.
  53. """
  54. def __init__(
  55. self,
  56. room_size=7,
  57. num_rows=3,
  58. num_cols=3,
  59. max_steps=100,
  60. seed=0
  61. ):
  62. assert room_size > 0
  63. assert room_size >= 3
  64. assert num_rows > 0
  65. assert num_cols > 0
  66. self.room_size = room_size
  67. self.num_rows = num_rows
  68. self.num_cols = num_cols
  69. height = (room_size - 1) * num_rows + 1
  70. width = (room_size - 1) * num_cols + 1
  71. grid_size = max(width, height)
  72. # By default, this environment has no mission
  73. self.mission = ''
  74. super().__init__(
  75. grid_size=grid_size,
  76. max_steps=max_steps,
  77. see_through_walls=False,
  78. seed=seed
  79. )
  80. def room_from_pos(self, x, y):
  81. """Get the room a given position maps to"""
  82. assert x >= 0
  83. assert y >= 0
  84. i = x // (self.room_size-1)
  85. j = y // (self.room_size-1)
  86. assert i < self.num_cols
  87. assert j < self.num_rows
  88. return self.room_grid[j][i]
  89. def get_room(self, i, j):
  90. assert i < self.num_cols
  91. assert j < self.num_rows
  92. return self.room_grid[j][i]
  93. def _gen_grid(self, width, height):
  94. # Create the grid
  95. self.grid = Grid(width, height)
  96. self.room_grid = []
  97. # For each row of rooms
  98. for j in range(0, self.num_rows):
  99. row = []
  100. # For each column of rooms
  101. for i in range(0, self.num_cols):
  102. room = Room(
  103. (i * (self.room_size-1), j * (self.room_size-1)),
  104. (self.room_size, self.room_size)
  105. )
  106. row.append(room)
  107. # Generate the walls for this room
  108. self.grid.wall_rect(*room.top, *room.size)
  109. self.room_grid.append(row)
  110. # For each row of rooms
  111. for j in range(0, self.num_rows):
  112. # For each column of rooms
  113. for i in range(0, self.num_cols):
  114. room = self.room_grid[j][i]
  115. x_l, y_l = (room.top[0] + 1, room.top[1] + 1)
  116. x_m, y_m = (room.top[0] + room.size[0] - 1, room.top[1] + room.size[1] - 1)
  117. # Door positions, order is right, down, left, up
  118. if i < self.num_cols - 1:
  119. room.neighbors[0] = self.room_grid[j][i+1]
  120. room.door_pos[0] = (x_m, self._rand_int(y_l, y_m))
  121. if j < self.num_rows - 1:
  122. room.neighbors[1] = self.room_grid[j+1][i]
  123. room.door_pos[1] = (self._rand_int(x_l, x_m), y_m)
  124. if i > 0:
  125. room.neighbors[2] = self.room_grid[j][i-1]
  126. room.door_pos[2] = room.neighbors[2].door_pos[0]
  127. if j > 0:
  128. room.neighbors[3] = self.room_grid[j-1][i]
  129. room.door_pos[3] = room.neighbors[3].door_pos[1]
  130. # The agent starts in the middle, facing right
  131. self.start_pos = (
  132. (self.num_cols // 2) * (self.room_size-1) + (self.room_size // 2),
  133. (self.num_rows // 2) * (self.room_size-1) + (self.room_size // 2)
  134. )
  135. self.start_dir = 0
  136. def place_in_room(self, i, j, obj):
  137. """
  138. Add an existing object to room (i, j)
  139. """
  140. room = self.get_room(i, j)
  141. pos = self.place_obj(
  142. obj,
  143. room.top,
  144. room.size,
  145. reject_fn=reject_next_to,
  146. max_tries=1000
  147. )
  148. room.objs.append(obj)
  149. return obj, pos
  150. def add_object(self, i, j, kind=None, color=None):
  151. """
  152. Add a new object to room (i, j)
  153. """
  154. if kind == None:
  155. kind = self._rand_elem(['key', 'ball', 'box'])
  156. if color == None:
  157. color = self._rand_color()
  158. # TODO: we probably want to add an Object.make helper function
  159. assert kind in ['key', 'ball', 'box']
  160. if kind == 'key':
  161. obj = Key(color)
  162. elif kind == 'ball':
  163. obj = Ball(color)
  164. elif kind == 'box':
  165. obj = Box(color)
  166. return self.place_in_room(i, j, obj)
  167. def add_door(self, i, j, door_idx=None, color=None, locked=None):
  168. """
  169. Add a door to a room, connecting it to a neighbor
  170. """
  171. room = self.get_room(i, j)
  172. if door_idx == None:
  173. # Need to make sure that there is a neighbor along this wall
  174. # and that there is not already a door
  175. while True:
  176. door_idx = self._rand_int(0, 4)
  177. if room.neighbors[door_idx] and room.doors[door_idx] is None:
  178. break
  179. if color == None:
  180. color = self._rand_color()
  181. if locked is None:
  182. locked = self._rand_bool()
  183. assert room.doors[door_idx] is None, "door already exists"
  184. if locked:
  185. door = LockedDoor(color)
  186. room.locked = True
  187. else:
  188. door = Door(color)
  189. pos = room.door_pos[door_idx]
  190. self.grid.set(*pos, door)
  191. neighbor = room.neighbors[door_idx]
  192. room.doors[door_idx] = door
  193. neighbor.doors[(door_idx+2) % 4] = door
  194. return door, pos
  195. def remove_wall(self, i, j, wall_idx):
  196. """
  197. Remove a wall between two rooms
  198. """
  199. room = self.get_room(i, j)
  200. assert wall_idx >= 0 and wall_idx < 4
  201. assert room.doors[wall_idx] is None, "door exists on this wall"
  202. assert room.neighbors[wall_idx], "invalid wall"
  203. neighbor = room.neighbors[wall_idx]
  204. tx, ty = room.top
  205. w, h = room.size
  206. # Ordering of walls is right, down, left, up
  207. if wall_idx == 0:
  208. for i in range(1, h - 1):
  209. self.grid.set(tx + w - 1, ty + i, None)
  210. elif wall_idx == 1:
  211. for i in range(1, w - 1):
  212. self.grid.set(tx + i, ty + h - 1, None)
  213. elif wall_idx == 2:
  214. for i in range(1, h - 1):
  215. self.grid.set(tx, ty + i, None)
  216. elif wall_idx == 3:
  217. for i in range(1, w - 1):
  218. self.grid.set(tx + i, ty, None)
  219. else:
  220. assert False, "invalid wall index"
  221. # Mark the rooms as connected
  222. room.doors[wall_idx] = True
  223. neighbor.doors[(wall_idx+2) % 4] = True
  224. def place_agent(self, i=None, j=None, rand_dir=True):
  225. """
  226. Place the agent in a room
  227. """
  228. if i == None:
  229. i = self._rand_int(0, self.num_cols)
  230. if j == None:
  231. j = self._rand_int(0, self.num_rows)
  232. room = self.room_grid[j][i]
  233. # Find a position that is not right in front of an object
  234. while True:
  235. super().place_agent(room.top, room.size, rand_dir, max_tries=1000)
  236. pos = self.start_pos
  237. dir = DIR_TO_VEC[self.start_dir]
  238. front_pos = pos + dir
  239. front_cell = self.grid.get(*front_pos)
  240. if front_cell is None or front_cell.type is 'wall':
  241. break
  242. return self.start_pos
  243. def connect_all(self, door_colors=COLOR_NAMES, max_itrs=5000):
  244. """
  245. Make sure that all rooms are reachable by the agent from its
  246. starting position
  247. """
  248. start_room = self.room_from_pos(*self.start_pos)
  249. added_doors = []
  250. def find_reach():
  251. reach = set()
  252. stack = [start_room]
  253. while len(stack) > 0:
  254. room = stack.pop()
  255. if room in reach:
  256. continue
  257. reach.add(room)
  258. for i in range(0, 4):
  259. if room.doors[i]:
  260. stack.append(room.neighbors[i])
  261. return reach
  262. num_itrs = 0
  263. while True:
  264. # This is to handle rare situations where random sampling produces
  265. # a level that cannot be connected, producing in an infinite loop
  266. if num_itrs > max_itrs:
  267. raise RecursionError('connect_all failed')
  268. num_itrs += 1
  269. # If all rooms are reachable, stop
  270. reach = find_reach()
  271. if len(reach) == self.num_rows * self.num_cols:
  272. break
  273. # Pick a random room and door position
  274. i = self._rand_int(0, self.num_cols)
  275. j = self._rand_int(0, self.num_rows)
  276. k = self._rand_int(0, 4)
  277. room = self.get_room(i, j)
  278. # If there is already a door there, skip
  279. if not room.door_pos[k] or room.doors[k]:
  280. continue
  281. if room.locked or room.neighbors[k].locked:
  282. continue
  283. color = self._rand_elem(door_colors)
  284. door, _ = self.add_door(i, j, k, color, False)
  285. added_doors.append(door)
  286. return added_doors
  287. def add_distractors(self, i=None, j=None, num_distractors=10, all_unique=True):
  288. """
  289. Add random objects that can potentially distract/confuse the agent.
  290. """
  291. # Collect a list of existing objects
  292. objs = []
  293. for row in self.room_grid:
  294. for room in row:
  295. for obj in room.objs:
  296. objs.append((obj.type, obj.color))
  297. # List of distractors added
  298. dists = []
  299. while len(dists) < num_distractors:
  300. color = self._rand_elem(COLOR_NAMES)
  301. type = self._rand_elem(['key', 'ball', 'box'])
  302. obj = (type, color)
  303. if all_unique and obj in objs:
  304. continue
  305. # Add the object to a random room if no room specified
  306. room_i = i
  307. room_j = j
  308. if room_i == None:
  309. room_i = self._rand_int(0, self.num_cols)
  310. if room_j == None:
  311. room_j = self._rand_int(0, self.num_rows)
  312. dist, pos = self.add_object(room_i, room_j, *obj)
  313. objs.append(obj)
  314. dists.append(dist)
  315. return dists