| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 | from .minigrid import *def reject_next_to(env, pos):    """    Function to filter out object positions that are right next to    the agent's starting point    """    sx, sy = env.start_pos    x, y = pos    d = abs(sx - x) + abs(sy - y)    return d < 2class Room:    def __init__(        self,        top,        size    ):        # Top-left corner and size (tuples)        self.top = top        self.size = size        # List of door objects and door positions        # Order of the doors is right, down, left, up        self.doors = [None] * 4        self.door_pos = [None] * 4        # List of rooms adjacent to this one        # Order of the neighbors is right, down, left, up        self.neighbors = [None] * 4        # Indicates if this room is behind a locked door        self.locked = False        # List of objects contained        self.objs = []    def rand_pos(self, env):        topX, topY = self.top        sizeX, sizeY = self.size        return env._randPos(            topX + 1, topX + sizeX - 1,            topY + 1, topY + sizeY - 1        )    def pos_inside(self, x, y):        """        Check if a position is within the bounds of this room        """        topX, topY = self.top        sizeX, sizeY = self.size        if x < topX or y < topY:            return False        if x >= topX + sizeX or y >= topY + sizeY:            return False        return Trueclass RoomGrid(MiniGridEnv):    """    Environment with multiple rooms and random objects.    This is meant to serve as a base class for other environments.    """    def __init__(        self,        room_size=7,        num_rows=3,        num_cols=3,        max_steps=100,        seed=0    ):        assert room_size > 0        assert room_size >= 3        assert num_rows > 0        assert num_cols > 0        self.room_size = room_size        self.num_rows = num_rows        self.num_cols = num_cols        height = (room_size - 1) * num_rows + 1        width = (room_size - 1) * num_cols + 1        grid_size = max(width, height)        # By default, this environment has no mission        self.mission = ''        super().__init__(            grid_size=grid_size,            max_steps=max_steps,            see_through_walls=False,            seed=seed        )    def room_from_pos(self, x, y):        """Get the room a given position maps to"""        assert x >= 0        assert y >= 0        i = x // (self.room_size-1)        j = y // (self.room_size-1)        assert i < self.num_cols        assert j < self.num_rows        return self.room_grid[j][i]    def get_room(self, i, j):        assert i < self.num_cols        assert j < self.num_rows        return self.room_grid[j][i]    def _gen_grid(self, width, height):        # Create the grid        self.grid = Grid(width, height)        self.room_grid = []        # For each row of rooms        for j in range(0, self.num_rows):            row = []            # For each column of rooms            for i in range(0, self.num_cols):                room = Room(                    (i * (self.room_size-1), j * (self.room_size-1)),                    (self.room_size, self.room_size)                )                row.append(room)                # Generate the walls for this room                self.grid.wall_rect(*room.top, *room.size)            self.room_grid.append(row)        # For each row of rooms        for j in range(0, self.num_rows):            # For each column of rooms            for i in range(0, self.num_cols):                room = self.room_grid[j][i]                x_l, y_l = (room.top[0] + 1, room.top[1] + 1)                x_m, y_m = (room.top[0] + room.size[0] - 1, room.top[1] + room.size[1] - 1)                # Door positions, order is right, down, left, up                if i < self.num_cols - 1:                    room.neighbors[0] = self.room_grid[j][i+1]                    room.door_pos[0] = (x_m, self._rand_int(y_l, y_m))                if j < self.num_rows - 1:                    room.neighbors[1] = self.room_grid[j+1][i]                    room.door_pos[1] = (self._rand_int(x_l, x_m), y_m)                if i > 0:                    room.neighbors[2] = self.room_grid[j][i-1]                    room.door_pos[2] = room.neighbors[2].door_pos[0]                if j > 0:                    room.neighbors[3] = self.room_grid[j-1][i]                    room.door_pos[3] = room.neighbors[3].door_pos[1]        # The agent starts in the middle, facing right        self.start_pos = (            (self.num_cols // 2) * (self.room_size-1) + (self.room_size // 2),            (self.num_rows // 2) * (self.room_size-1) + (self.room_size // 2)        )        self.start_dir = 0    def place_in_room(self, i, j, obj):        """        Add an existing object to room (i, j)        """        room = self.get_room(i, j)        pos = self.place_obj(            obj,            room.top,            room.size,            reject_fn=reject_next_to,            max_tries=1000        )        room.objs.append(obj)        return obj, pos    def add_object(self, i, j, kind=None, color=None):        """        Add a new object to room (i, j)        """        if kind == None:            kind = self._rand_elem(['key', 'ball', 'box'])        if color == None:            color = self._rand_color()        # TODO: we probably want to add an Object.make helper function        assert kind in ['key', 'ball', 'box']        if kind == 'key':            obj = Key(color)        elif kind == 'ball':            obj = Ball(color)        elif kind == 'box':            obj = Box(color)        return self.place_in_room(i, j, obj)    def add_door(self, i, j, door_idx=None, color=None, locked=None):        """        Add a door to a room, connecting it to a neighbor        """        room = self.get_room(i, j)        if door_idx == None:            # Need to make sure that there is a neighbor along this wall            # and that there is not already a door            while True:                door_idx = self._rand_int(0, 4)                if room.neighbors[door_idx] and room.doors[door_idx] is None:                    break        if color == None:            color = self._rand_color()        if locked is None:            locked = self._rand_bool()        assert room.doors[door_idx] is None, "door already exists"        if locked:            door = LockedDoor(color)            room.locked = True        else:            door = Door(color)        pos = room.door_pos[door_idx]        self.grid.set(*pos, door)        neighbor = room.neighbors[door_idx]        room.doors[door_idx] = door        neighbor.doors[(door_idx+2) % 4] = door        return door, pos    def remove_wall(self, i, j, wall_idx):        """        Remove a wall between two rooms        """        room = self.get_room(i, j)        assert wall_idx >= 0 and wall_idx < 4        assert room.doors[wall_idx] is None, "door exists on this wall"        assert room.neighbors[wall_idx], "invalid wall"        neighbor = room.neighbors[wall_idx]        tx, ty = room.top        w, h = room.size        # Ordering of walls is right, down, left, up        if wall_idx == 0:            for i in range(1, h - 1):                self.grid.set(tx + w - 1, ty + i, None)        elif wall_idx == 1:            for i in range(1, w - 1):                self.grid.set(tx + i, ty + h - 1, None)        elif wall_idx == 2:            for i in range(1, h - 1):                self.grid.set(tx, ty + i, None)        elif wall_idx == 3:            for i in range(1, w - 1):                self.grid.set(tx + i, ty, None)        else:            assert False, "invalid wall index"        # Mark the rooms as connected        room.doors[wall_idx] = True        neighbor.doors[(wall_idx+2) % 4] = True    def place_agent(self, i=None, j=None, rand_dir=True):        """        Place the agent in a room        """        if i == None:            i = self._rand_int(0, self.num_cols)        if j == None:            j = self._rand_int(0, self.num_rows)        room = self.room_grid[j][i]        # Find a position that is not right in front of an object        while True:            super().place_agent(room.top, room.size, rand_dir, max_tries=1000)            pos = self.start_pos            dir = DIR_TO_VEC[self.start_dir]            front_pos = pos + dir            front_cell = self.grid.get(*front_pos)            if front_cell is None or front_cell.type is 'wall':                break        return self.start_pos    def connect_all(self, door_colors=COLOR_NAMES, max_itrs=5000):        """        Make sure that all rooms are reachable by the agent from its        starting position        """        start_room = self.room_from_pos(*self.start_pos)        added_doors = []        def find_reach():            reach = set()            stack = [start_room]            while len(stack) > 0:                room = stack.pop()                if room in reach:                    continue                reach.add(room)                for i in range(0, 4):                    if room.doors[i]:                        stack.append(room.neighbors[i])            return reach        num_itrs = 0        while True:            # This is to handle rare situations where random sampling produces            # a level that cannot be connected, producing in an infinite loop            if num_itrs > max_itrs:                raise RecursionError('connect_all failed')            num_itrs += 1            # If all rooms are reachable, stop            reach = find_reach()            if len(reach) == self.num_rows * self.num_cols:                break            # Pick a random room and door position            i = self._rand_int(0, self.num_cols)            j = self._rand_int(0, self.num_rows)            k = self._rand_int(0, 4)            room = self.get_room(i, j)            # If there is already a door there, skip            if not room.door_pos[k] or room.doors[k]:                continue            if room.locked or room.neighbors[k].locked:                continue            color = self._rand_elem(door_colors)            door, _ = self.add_door(i, j, k, color, False)            added_doors.append(door)        return added_doors    def add_distractors(self, i=None, j=None, num_distractors=10, all_unique=True):        """        Add random objects that can potentially distract/confuse the agent.        """        # Collect a list of existing objects        objs = []        for row in self.room_grid:            for room in row:                for obj in room.objs:                    objs.append((obj.type, obj.color))        # List of distractors added        dists = []        while len(dists) < num_distractors:            color = self._rand_elem(COLOR_NAMES)            type = self._rand_elem(['key', 'ball', 'box'])            obj = (type, color)            if all_unique and obj in objs:                continue            # Add the object to a random room if no room specified            room_i = i            room_j = j            if room_i == None:                room_i = self._rand_int(0, self.num_cols)            if room_j == None:                room_j = self._rand_int(0, self.num_rows)            dist, pos = self.add_object(room_i, room_j, *obj)            objs.append(obj)            dists.append(dist)        return dists
 |