|
@@ -2,7 +2,8 @@ from gym_minigrid.minigrid import *
|
|
from gym_minigrid.register import register
|
|
from gym_minigrid.register import register
|
|
|
|
|
|
class Room:
|
|
class Room:
|
|
- def __init__(self,
|
|
|
|
|
|
+ def __init__(
|
|
|
|
+ self,
|
|
top,
|
|
top,
|
|
size
|
|
size
|
|
):
|
|
):
|
|
@@ -11,19 +12,21 @@ class Room:
|
|
self.size = size
|
|
self.size = size
|
|
|
|
|
|
# List of door objects and door positions
|
|
# List of door objects and door positions
|
|
- self.doors = []
|
|
|
|
- self.doorPos = []
|
|
|
|
|
|
+ # Order of the doors is right, down, left, up
|
|
|
|
+ self.doors = [None] * 4
|
|
|
|
+ self.door_pos = [None] * 4
|
|
|
|
+
|
|
|
|
+ # List of rooms this is connected to
|
|
|
|
+ # Order of the neighbors is right, down, left, up
|
|
|
|
+ self.neighbors = [None] * 4
|
|
|
|
|
|
# Indicates if this room is locked
|
|
# Indicates if this room is locked
|
|
self.locked = False
|
|
self.locked = False
|
|
|
|
|
|
- # Set of rooms this is connected to
|
|
|
|
- self.neighbors = set()
|
|
|
|
-
|
|
|
|
# List of objects contained
|
|
# List of objects contained
|
|
self.objs = []
|
|
self.objs = []
|
|
|
|
|
|
- def randPos(self, env):
|
|
|
|
|
|
+ def rand_pos(self, env):
|
|
topX, topY = self.top
|
|
topX, topY = self.top
|
|
sizeX, sizeY = self.size
|
|
sizeX, sizeY = self.size
|
|
return env._randPos(
|
|
return env._randPos(
|
|
@@ -38,96 +41,170 @@ class RoomGrid(MiniGridEnv):
|
|
"""
|
|
"""
|
|
|
|
|
|
def __init__(
|
|
def __init__(
|
|
- self,
|
|
|
|
- roomSize=6,
|
|
|
|
- numCols=4,
|
|
|
|
- maxObsPerRoom=3,
|
|
|
|
- lockedRooms=False
|
|
|
|
|
|
+ self,
|
|
|
|
+ room_size=6,
|
|
|
|
+ num_cols=4,
|
|
|
|
+ lockedRooms=False
|
|
):
|
|
):
|
|
- assert roomSize > 0
|
|
|
|
- assert roomSize >= 4
|
|
|
|
- assert numCols > 0
|
|
|
|
- self.roomSize = roomSize
|
|
|
|
- self.numCols = numCols
|
|
|
|
- self.numRows = numCols
|
|
|
|
- self.maxObsPerRoom = maxObsPerRoom
|
|
|
|
|
|
+ assert room_size > 0
|
|
|
|
+ assert room_size >= 4
|
|
|
|
+ assert num_cols > 0
|
|
|
|
+ self.room_size = room_size
|
|
|
|
+ self.num_cols = num_cols
|
|
|
|
+ self.num_rows = num_cols
|
|
self.lockedRooms = False
|
|
self.lockedRooms = False
|
|
|
|
|
|
- gridSize = (roomSize - 1) * numCols + 1
|
|
|
|
- super().__init__(gridSize=gridSize, maxSteps=6*gridSize)
|
|
|
|
|
|
+ grid_size = (room_size - 1) * num_cols + 1
|
|
|
|
+ super().__init__(gridSize=grid_size, maxSteps=6*grid_size)
|
|
|
|
|
|
self.reward_range = (0, 1)
|
|
self.reward_range = (0, 1)
|
|
|
|
|
|
- def getRoom(self, x, y):
|
|
|
|
|
|
+ def room_from_pos(self, x, y):
|
|
"""Get the room a given position maps to"""
|
|
"""Get the room a given position maps to"""
|
|
|
|
|
|
assert x >= 0
|
|
assert x >= 0
|
|
assert y >= 0
|
|
assert y >= 0
|
|
|
|
|
|
- i = x // self.roomSize
|
|
|
|
- j = y // self.roomSize
|
|
|
|
|
|
+ i = x // self.room_size
|
|
|
|
+ j = y // self.room_size
|
|
|
|
|
|
- assert i < self.numCols
|
|
|
|
- assert j < self.numRows
|
|
|
|
|
|
+ assert i < self.num_cols
|
|
|
|
+ assert j < self.num_rows
|
|
|
|
|
|
- return self.roomGrid[j][i]
|
|
|
|
|
|
+ return self.room_grid[j][i]
|
|
|
|
+
|
|
|
|
+ def get_room(self, i, j):
|
|
|
|
+ assert i < self.num_cols
|
|
|
|
+ assert j < self.num_rows
|
|
|
|
+ return self.room_grid[j][i]
|
|
|
|
|
|
def _genGrid(self, width, height):
|
|
def _genGrid(self, width, height):
|
|
# Create the grid
|
|
# Create the grid
|
|
self.grid = Grid(width, height)
|
|
self.grid = Grid(width, height)
|
|
|
|
|
|
- self.roomGrid = []
|
|
|
|
- self.rooms = []
|
|
|
|
|
|
+ self.room_grid = []
|
|
|
|
|
|
# For each row of rooms
|
|
# For each row of rooms
|
|
- for j in range(0, self.numRows):
|
|
|
|
|
|
+ for j in range(0, self.num_rows):
|
|
row = []
|
|
row = []
|
|
|
|
|
|
# For each column of rooms
|
|
# For each column of rooms
|
|
- for i in range(0, self.numCols):
|
|
|
|
|
|
+ for i in range(0, self.num_cols):
|
|
room = Room(
|
|
room = Room(
|
|
- (i * (self.roomSize-1), j * (self.roomSize-1)),
|
|
|
|
- (self.roomSize, self.roomSize)
|
|
|
|
|
|
+ (i * (self.room_size-1), j * (self.room_size-1)),
|
|
|
|
+ (self.room_size, self.room_size)
|
|
)
|
|
)
|
|
|
|
|
|
row.append(room)
|
|
row.append(room)
|
|
- self.rooms.append(room)
|
|
|
|
|
|
|
|
# Generate the walls for this room
|
|
# Generate the walls for this room
|
|
self.grid.wallRect(*room.top, *room.size)
|
|
self.grid.wallRect(*room.top, *room.size)
|
|
|
|
|
|
- self.roomGrid.append(row)
|
|
|
|
-
|
|
|
|
- # Randomize the player start position and orientation
|
|
|
|
- self.placeAgent()
|
|
|
|
-
|
|
|
|
- # Find which room the agent was placed in
|
|
|
|
- startRoom = self.getRoom(*self.startPos)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ self.room_grid.append(row)
|
|
|
|
|
|
|
|
+ # For each row of rooms
|
|
|
|
+ for j in range(0, self.num_rows):
|
|
|
|
+ # For each column of rooms
|
|
|
|
+ for i in range(0, self.num_cols):
|
|
|
|
+ room = self.room_grid[j][i]
|
|
|
|
+
|
|
|
|
+ # Door positions, order is right, down, left, up
|
|
|
|
+ if i < self.num_cols - 1:
|
|
|
|
+ room.door_pos[0] = (room.top[0] + self.room_size - 1, room.top[1] + self.room_size // 2)
|
|
|
|
+ room.neighbors[0] = self.room_grid[j][i+1]
|
|
|
|
+ if j < self.num_rows - 1:
|
|
|
|
+ room.door_pos[1] = (room.top[0] + self.room_size // 2, room.top[1] + self.room_size - 1)
|
|
|
|
+ room.neighbors[1] = self.room_grid[j+1][i]
|
|
|
|
+ if i > 0:
|
|
|
|
+ room.door_pos[2] = (room.top[0], room.top[1] + self.room_size // 2)
|
|
|
|
+ room.neighbors[2] = self.room_grid[j][i-1]
|
|
|
|
+ if j > 0:
|
|
|
|
+ room.door_pos[3] = (room.top[0] + self.room_size // 2, room.top[1])
|
|
|
|
+ room.neighbors[3] = self.room_grid[j-1][i]
|
|
|
|
+
|
|
|
|
+ # The agent starts in the middle, facing right
|
|
|
|
+ self.startPos = (
|
|
|
|
+ (self.num_cols // 2) * (self.room_size-1) + (self.room_size // 2),
|
|
|
|
+ (self.num_rows // 2) * (self.room_size-1) + (self.room_size // 2)
|
|
|
|
+ )
|
|
|
|
+ self.startDir = 0
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- # TODO: respect maxObsPerRoom
|
|
|
|
-
|
|
|
|
- # Place random objects in the world
|
|
|
|
- types = ['key', 'ball', 'box']
|
|
|
|
- for i in range(0, 12):
|
|
|
|
- objType = self._randElem(types)
|
|
|
|
- objColor = self._randElem(COLOR_NAMES)
|
|
|
|
- if objType == 'key':
|
|
|
|
- obj = Key(objColor)
|
|
|
|
- elif objType == 'ball':
|
|
|
|
- obj = Ball(objColor)
|
|
|
|
- elif objType == 'box':
|
|
|
|
- obj = Box(objColor)
|
|
|
|
- self.placeObj(obj)
|
|
|
|
-
|
|
|
|
- # TODO: curriculum generation
|
|
|
|
|
|
+ # By default, this environment has no mission
|
|
self.mission = ''
|
|
self.mission = ''
|
|
|
|
|
|
|
|
+ def add_object(self, i, j, kind, color):
|
|
|
|
+ """
|
|
|
|
+ Add a new object to room (i, j)
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ # TODO: we probably want to add an Object.make helper function
|
|
|
|
+ assert kind in ['key', 'ball', 'box']
|
|
|
|
+ if kind == 'key':
|
|
|
|
+ obj = Key(color)
|
|
|
|
+ elif kind == 'ball':
|
|
|
|
+ obj = Ball(color)
|
|
|
|
+ elif kind == 'box':
|
|
|
|
+ obj = Box(color)
|
|
|
|
+
|
|
|
|
+ room = self.get_room(i, j)
|
|
|
|
+
|
|
|
|
+ self.placeObj(obj, room.top, room.size)
|
|
|
|
+
|
|
|
|
+ room.objs.append(obj)
|
|
|
|
+
|
|
|
|
+ return obj
|
|
|
|
+
|
|
|
|
+ def add_door(self, i, j, k, color):
|
|
|
|
+ """
|
|
|
|
+ Add a door to a room, connecting it to a neighbor
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ room = self.get_room(i, j)
|
|
|
|
+ assert room.doors[k] is None, "door already exists"
|
|
|
|
+
|
|
|
|
+ door = Door(color)
|
|
|
|
+ self.grid.set(*room.door_pos[k], door)
|
|
|
|
+
|
|
|
|
+ neighbor = room.neighbors[k]
|
|
|
|
+ room.doors[k] = door
|
|
|
|
+ neighbor.doors[(k+2) % 4] = door
|
|
|
|
+
|
|
|
|
+ def connect_all(self):
|
|
|
|
+ """
|
|
|
|
+ Make sure that all rooms are reachable by the agent from its
|
|
|
|
+ starting position
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ start_room = self.room_from_pos(*self.startPos)
|
|
|
|
+
|
|
|
|
+ def find_reach():
|
|
|
|
+ reach = set()
|
|
|
|
+ stack = [start_room]
|
|
|
|
+ while len(stack) > 0:
|
|
|
|
+ room = stack.pop()
|
|
|
|
+ if room in reach:
|
|
|
|
+ continue
|
|
|
|
+ reach.add(room)
|
|
|
|
+ for i in range(0, 4):
|
|
|
|
+ if room.doors[i]:
|
|
|
|
+ stack.append(room.neighbors[i])
|
|
|
|
+ return reach
|
|
|
|
+
|
|
|
|
+ while True:
|
|
|
|
+ # If all rooms are reachable, stop
|
|
|
|
+ reach = find_reach()
|
|
|
|
+ if len(reach) == self.num_rows * self.num_cols:
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ # Add a random door to a random room
|
|
|
|
+ i = self._randInt(0, self.num_cols)
|
|
|
|
+ j = self._randInt(0, self.num_rows)
|
|
|
|
+ k = self._randInt(0, 4)
|
|
|
|
+ room = self.get_room(i, j)
|
|
|
|
+ if room.door_pos[k] and not room.doors[k]:
|
|
|
|
+ color = self._randElem(COLOR_NAMES)
|
|
|
|
+ self.add_door(i, j, k, color)
|
|
|
|
+
|
|
def step(self, action):
|
|
def step(self, action):
|
|
obs, reward, done, info = super().step(action)
|
|
obs, reward, done, info = super().step(action)
|
|
return obs, reward, done, info
|
|
return obs, reward, done, info
|