123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- from .minigrid import *
- def reject_next_to(env, pos):
- """
- Function to filter out object positions that are right next to
- the agent's starting point
- """
- sx, sy = env.start_pos
- x, y = pos
- d = abs(sx - x) + abs(sy - y)
- return d < 2
- class Room:
- def __init__(
- self,
- top,
- size
- ):
- # Top-left corner and size (tuples)
- self.top = top
- self.size = size
- # List of door objects and door positions
- # Order of the doors is right, down, left, up
- self.doors = [None] * 4
- self.door_pos = [None] * 4
- # List of rooms adjacent to this one
- # Order of the neighbors is right, down, left, up
- self.neighbors = [None] * 4
- # Indicates if this room is behind a locked door
- self.locked = False
- # List of objects contained
- self.objs = []
- def rand_pos(self, env):
- topX, topY = self.top
- sizeX, sizeY = self.size
- return env._randPos(
- topX + 1, topX + sizeX - 1,
- topY + 1, topY + sizeY - 1
- )
- def pos_inside(self, x, y):
- """
- Check if a position is within the bounds of this room
- """
- topX, topY = self.top
- sizeX, sizeY = self.size
- if x < topX or y < topY:
- return False
- if x >= topX + sizeX or y >= topY + sizeY:
- return False
- return True
- class RoomGrid(MiniGridEnv):
- """
- Environment with multiple rooms and random objects.
- This is meant to serve as a base class for other environments.
- """
- def __init__(
- self,
- room_size=7,
- num_rows=3,
- num_cols=3,
- max_steps=100,
- seed=0
- ):
- assert room_size > 0
- assert room_size >= 3
- assert num_rows > 0
- assert num_cols > 0
- self.room_size = room_size
- self.num_rows = num_rows
- self.num_cols = num_cols
- height = (room_size - 1) * num_rows + 1
- width = (room_size - 1) * num_cols + 1
- grid_size = max(width, height)
- # By default, this environment has no mission
- self.mission = ''
- super().__init__(
- grid_size=grid_size,
- max_steps=max_steps,
- see_through_walls=False,
- seed=seed
- )
- def room_from_pos(self, x, y):
- """Get the room a given position maps to"""
- assert x >= 0
- assert y >= 0
- i = x // (self.room_size-1)
- j = y // (self.room_size-1)
- assert i < self.num_cols
- assert j < self.num_rows
- return self.room_grid[j][i]
- def get_room(self, i, j):
- assert i < self.num_cols
- assert j < self.num_rows
- return self.room_grid[j][i]
- def _gen_grid(self, width, height):
- # Create the grid
- self.grid = Grid(width, height)
- self.room_grid = []
- # For each row of rooms
- for j in range(0, self.num_rows):
- row = []
- # For each column of rooms
- for i in range(0, self.num_cols):
- room = Room(
- (i * (self.room_size-1), j * (self.room_size-1)),
- (self.room_size, self.room_size)
- )
- row.append(room)
- # Generate the walls for this room
- self.grid.wall_rect(*room.top, *room.size)
- self.room_grid.append(row)
- # For each row of rooms
- for j in range(0, self.num_rows):
- # For each column of rooms
- for i in range(0, self.num_cols):
- room = self.room_grid[j][i]
- x_l, y_l = (room.top[0] + 1, room.top[1] + 1)
- x_m, y_m = (room.top[0] + room.size[0] - 1, room.top[1] + room.size[1] - 1)
- # Door positions, order is right, down, left, up
- if i < self.num_cols - 1:
- room.neighbors[0] = self.room_grid[j][i+1]
- room.door_pos[0] = (x_m, self._rand_int(y_l, y_m))
- if j < self.num_rows - 1:
- room.neighbors[1] = self.room_grid[j+1][i]
- room.door_pos[1] = (self._rand_int(x_l, x_m), y_m)
- if i > 0:
- room.neighbors[2] = self.room_grid[j][i-1]
- room.door_pos[2] = room.neighbors[2].door_pos[0]
- if j > 0:
- room.neighbors[3] = self.room_grid[j-1][i]
- room.door_pos[3] = room.neighbors[3].door_pos[1]
- # The agent starts in the middle, facing right
- self.start_pos = (
- (self.num_cols // 2) * (self.room_size-1) + (self.room_size // 2),
- (self.num_rows // 2) * (self.room_size-1) + (self.room_size // 2)
- )
- self.start_dir = 0
- def place_in_room(self, i, j, obj):
- """
- Add an existing object to room (i, j)
- """
- room = self.get_room(i, j)
- pos = self.place_obj(
- obj,
- room.top,
- room.size,
- reject_fn=reject_next_to,
- max_tries=1000
- )
- room.objs.append(obj)
- return obj, pos
- def add_object(self, i, j, kind=None, color=None):
- """
- Add a new object to room (i, j)
- """
- if kind == None:
- kind = self._rand_elem(['key', 'ball', 'box'])
- if color == None:
- color = self._rand_color()
- # TODO: we probably want to add an Object.make helper function
- assert kind in ['key', 'ball', 'box']
- if kind == 'key':
- obj = Key(color)
- elif kind == 'ball':
- obj = Ball(color)
- elif kind == 'box':
- obj = Box(color)
- return self.place_in_room(i, j, obj)
- def add_door(self, i, j, door_idx=None, color=None, locked=None):
- """
- Add a door to a room, connecting it to a neighbor
- """
- room = self.get_room(i, j)
- if door_idx == None:
- # Need to make sure that there is a neighbor along this wall
- # and that there is not already a door
- while True:
- door_idx = self._rand_int(0, 4)
- if room.neighbors[door_idx] and room.doors[door_idx] is None:
- break
- if color == None:
- color = self._rand_color()
- if locked is None:
- locked = self._rand_bool()
- assert room.doors[door_idx] is None, "door already exists"
- if locked:
- door = LockedDoor(color)
- room.locked = True
- else:
- door = Door(color)
- pos = room.door_pos[door_idx]
- self.grid.set(*pos, door)
- neighbor = room.neighbors[door_idx]
- room.doors[door_idx] = door
- neighbor.doors[(door_idx+2) % 4] = door
- return door, pos
- def remove_wall(self, i, j, wall_idx):
- """
- Remove a wall between two rooms
- """
- room = self.get_room(i, j)
- assert wall_idx >= 0 and wall_idx < 4
- assert room.doors[wall_idx] is None, "door exists on this wall"
- assert room.neighbors[wall_idx], "invalid wall"
- neighbor = room.neighbors[wall_idx]
- tx, ty = room.top
- w, h = room.size
- # Ordering of walls is right, down, left, up
- if wall_idx == 0:
- for i in range(1, h - 1):
- self.grid.set(tx + w - 1, ty + i, None)
- elif wall_idx == 1:
- for i in range(1, w - 1):
- self.grid.set(tx + i, ty + h - 1, None)
- elif wall_idx == 2:
- for i in range(1, h - 1):
- self.grid.set(tx, ty + i, None)
- elif wall_idx == 3:
- for i in range(1, w - 1):
- self.grid.set(tx + i, ty, None)
- else:
- assert False, "invalid wall index"
- # Mark the rooms as connected
- room.doors[wall_idx] = True
- neighbor.doors[(wall_idx+2) % 4] = True
- def place_agent(self, i=None, j=None, rand_dir=True):
- """
- Place the agent in a room
- """
- if i == None:
- i = self._rand_int(0, self.num_cols)
- if j == None:
- j = self._rand_int(0, self.num_rows)
- room = self.room_grid[j][i]
- # Find a position that is not right in front of an object
- while True:
- super().place_agent(room.top, room.size, rand_dir, max_tries=1000)
- pos = self.start_pos
- dir = DIR_TO_VEC[self.start_dir]
- front_pos = pos + dir
- front_cell = self.grid.get(*front_pos)
- if front_cell is None or front_cell.type is 'wall':
- break
- return self.start_pos
- def connect_all(self, door_colors=COLOR_NAMES, max_itrs=5000):
- """
- Make sure that all rooms are reachable by the agent from its
- starting position
- """
- start_room = self.room_from_pos(*self.start_pos)
- added_doors = []
- def find_reach():
- reach = set()
- stack = [start_room]
- while len(stack) > 0:
- room = stack.pop()
- if room in reach:
- continue
- reach.add(room)
- for i in range(0, 4):
- if room.doors[i]:
- stack.append(room.neighbors[i])
- return reach
- num_itrs = 0
- while True:
- # This is to handle rare situations where random sampling produces
- # a level that cannot be connected, producing in an infinite loop
- if num_itrs > max_itrs:
- raise RecursionError('connect_all failed')
- num_itrs += 1
- # If all rooms are reachable, stop
- reach = find_reach()
- if len(reach) == self.num_rows * self.num_cols:
- break
- # Pick a random room and door position
- i = self._rand_int(0, self.num_cols)
- j = self._rand_int(0, self.num_rows)
- k = self._rand_int(0, 4)
- room = self.get_room(i, j)
- # If there is already a door there, skip
- if not room.door_pos[k] or room.doors[k]:
- continue
- if room.locked or room.neighbors[k].locked:
- continue
- color = self._rand_elem(door_colors)
- door, _ = self.add_door(i, j, k, color, False)
- added_doors.append(door)
- return added_doors
- def add_distractors(self, i=None, j=None, num_distractors=10, all_unique=True):
- """
- Add random objects that can potentially distract/confuse the agent.
- """
- # Collect a list of existing objects
- objs = []
- for row in self.room_grid:
- for room in row:
- for obj in room.objs:
- objs.append((obj.type, obj.color))
- # List of distractors added
- dists = []
- while len(dists) < num_distractors:
- color = self._rand_elem(COLOR_NAMES)
- type = self._rand_elem(['key', 'ball', 'box'])
- obj = (type, color)
- if all_unique and obj in objs:
- continue
- # Add the object to a random room if no room specified
- room_i = i
- room_j = j
- if room_i == None:
- room_i = self._rand_int(0, self.num_cols)
- if room_j == None:
- room_j = self._rand_int(0, self.num_rows)
- dist, pos = self.add_object(room_i, room_j, *obj)
- objs.append(obj)
- dists.append(dist)
- return dists
|