12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352 |
- import math
- import gym
- from enum import IntEnum
- import numpy as np
- from gym import error, spaces, utils
- from gym.utils import seeding
- # Size in pixels of a cell in the full-scale human view
- CELL_PIXELS = 32
- # Map of color names to RGB values
- COLORS = {
- 'red' : np.array([255, 0, 0]),
- 'green' : np.array([0, 255, 0]),
- 'blue' : np.array([0, 0, 255]),
- 'purple': np.array([112, 39, 195]),
- 'yellow': np.array([255, 255, 0]),
- 'grey' : np.array([100, 100, 100])
- }
- COLOR_NAMES = sorted(list(COLORS.keys()))
- # Used to map colors to integers
- COLOR_TO_IDX = {
- 'red' : 0,
- 'green' : 1,
- 'blue' : 2,
- 'purple': 3,
- 'yellow': 4,
- 'grey' : 5
- }
- IDX_TO_COLOR = dict(zip(COLOR_TO_IDX.values(), COLOR_TO_IDX.keys()))
- # Map of object type to integers
- OBJECT_TO_IDX = {
- 'unseen' : 0,
- 'empty' : 1,
- 'wall' : 2,
- 'floor' : 3,
- 'door' : 4,
- 'key' : 5,
- 'ball' : 6,
- 'box' : 7,
- 'goal' : 8,
- 'lava' : 9,
- 'agent' : 10,
- }
- IDX_TO_OBJECT = dict(zip(OBJECT_TO_IDX.values(), OBJECT_TO_IDX.keys()))
- # Map of state names to integers
- STATE_TO_IDX = {
- 'open' : 0,
- 'closed': 1,
- 'locked': 2,
- }
- # Map of agent direction indices to vectors
- DIR_TO_VEC = [
- # Pointing right (positive X)
- np.array((1, 0)),
- # Down (positive Y)
- np.array((0, 1)),
- # Pointing left (negative X)
- np.array((-1, 0)),
- # Up (negative Y)
- np.array((0, -1)),
- ]
- class WorldObj:
- """
- Base class for grid world objects
- """
- def __init__(self, type, color):
- assert type in OBJECT_TO_IDX, type
- assert color in COLOR_TO_IDX, color
- self.type = type
- self.color = color
- self.contains = None
- # Initial position of the object
- self.init_pos = None
- # Current position of the object
- self.cur_pos = None
- def can_overlap(self):
- """Can the agent overlap with this?"""
- return False
- def can_pickup(self):
- """Can the agent pick this up?"""
- return False
- def can_contain(self):
- """Can this contain another object?"""
- return False
- def see_behind(self):
- """Can the agent see behind this object?"""
- return True
- def toggle(self, env, pos):
- """Method to trigger/toggle an action this object performs"""
- return False
- def render(self, r):
- """Draw this object with the given renderer"""
- raise NotImplementedError
- def _set_color(self, r):
- """Set the color of this object as the active drawing color"""
- c = COLORS[self.color]
- r.setLineColor(c[0], c[1], c[2])
- r.setColor(c[0], c[1], c[2])
- class Goal(WorldObj):
- def __init__(self):
- super().__init__('goal', 'green')
- def can_overlap(self):
- return True
- def render(self, r):
- self._set_color(r)
- r.drawPolygon([
- (0 , CELL_PIXELS),
- (CELL_PIXELS, CELL_PIXELS),
- (CELL_PIXELS, 0),
- (0 , 0)
- ])
- class Floor(WorldObj):
- """
- Colored floor tile the agent can walk over
- """
- def __init__(self, color='blue'):
- super().__init__('floor', color)
- def can_overlap(self):
- return True
- def render(self, r):
- # Give the floor a pale color
- c = COLORS[self.color]
- r.setLineColor(100, 100, 100, 0)
- r.setColor(*c/2)
- r.drawPolygon([
- (1 , CELL_PIXELS),
- (CELL_PIXELS, CELL_PIXELS),
- (CELL_PIXELS, 1),
- (1 , 1)
- ])
- class Lava(WorldObj):
- def __init__(self):
- super().__init__('lava', 'red')
- def can_overlap(self):
- return True
- def render(self, r):
- orange = 255, 128, 0
- r.setLineColor(*orange)
- r.setColor(*orange)
- r.drawPolygon([
- (0 , CELL_PIXELS),
- (CELL_PIXELS, CELL_PIXELS),
- (CELL_PIXELS, 0),
- (0 , 0)
- ])
- # drawing the waves
- r.setLineColor(0, 0, 0)
- r.drawPolyline([
- (.1 * CELL_PIXELS, .3 * CELL_PIXELS),
- (.3 * CELL_PIXELS, .4 * CELL_PIXELS),
- (.5 * CELL_PIXELS, .3 * CELL_PIXELS),
- (.7 * CELL_PIXELS, .4 * CELL_PIXELS),
- (.9 * CELL_PIXELS, .3 * CELL_PIXELS),
- ])
- r.drawPolyline([
- (.1 * CELL_PIXELS, .5 * CELL_PIXELS),
- (.3 * CELL_PIXELS, .6 * CELL_PIXELS),
- (.5 * CELL_PIXELS, .5 * CELL_PIXELS),
- (.7 * CELL_PIXELS, .6 * CELL_PIXELS),
- (.9 * CELL_PIXELS, .5 * CELL_PIXELS),
- ])
- r.drawPolyline([
- (.1 * CELL_PIXELS, .7 * CELL_PIXELS),
- (.3 * CELL_PIXELS, .8 * CELL_PIXELS),
- (.5 * CELL_PIXELS, .7 * CELL_PIXELS),
- (.7 * CELL_PIXELS, .8 * CELL_PIXELS),
- (.9 * CELL_PIXELS, .7 * CELL_PIXELS),
- ])
- class Wall(WorldObj):
- def __init__(self, color='grey'):
- super().__init__('wall', color)
- def see_behind(self):
- return False
- def render(self, r):
- self._set_color(r)
- r.drawPolygon([
- (0 , CELL_PIXELS),
- (CELL_PIXELS, CELL_PIXELS),
- (CELL_PIXELS, 0),
- (0 , 0)
- ])
- class Door(WorldObj):
- def __init__(self, color, is_open=False, is_locked=False):
- super().__init__('door', color)
- self.is_open = is_open
- self.is_locked = is_locked
- def can_overlap(self):
- """The agent can only walk over this cell when the door is open"""
- return self.is_open
- def see_behind(self):
- return self.is_open
- def toggle(self, env, pos):
- # If the player has the right key to open the door
- if self.is_locked:
- if isinstance(env.carrying, Key) and env.carrying.color == self.color:
- self.is_locked = False
- self.is_open = True
- return True
- return False
- self.is_open = not self.is_open
- return True
- def render(self, r):
- c = COLORS[self.color]
- r.setLineColor(c[0], c[1], c[2])
- r.setColor(c[0], c[1], c[2], 50 if self.is_locked else 0)
- if self.is_open:
- r.drawPolygon([
- (CELL_PIXELS-2, CELL_PIXELS),
- (CELL_PIXELS , CELL_PIXELS),
- (CELL_PIXELS , 0),
- (CELL_PIXELS-2, 0)
- ])
- return
- r.drawPolygon([
- (0 , CELL_PIXELS),
- (CELL_PIXELS, CELL_PIXELS),
- (CELL_PIXELS, 0),
- (0 , 0)
- ])
- r.drawPolygon([
- (2 , CELL_PIXELS-2),
- (CELL_PIXELS-2, CELL_PIXELS-2),
- (CELL_PIXELS-2, 2),
- (2 , 2)
- ])
- if self.is_locked:
- # Draw key slot
- r.drawLine(
- CELL_PIXELS * 0.55,
- CELL_PIXELS * 0.5,
- CELL_PIXELS * 0.75,
- CELL_PIXELS * 0.5
- )
- else:
- # Draw door handle
- r.drawCircle(CELL_PIXELS * 0.75, CELL_PIXELS * 0.5, 2)
- class Key(WorldObj):
- def __init__(self, color='blue'):
- super(Key, self).__init__('key', color)
- def can_pickup(self):
- return True
- def render(self, r):
- self._set_color(r)
- # Vertical quad
- r.drawPolygon([
- (16, 10),
- (20, 10),
- (20, 28),
- (16, 28)
- ])
- # Teeth
- r.drawPolygon([
- (12, 19),
- (16, 19),
- (16, 21),
- (12, 21)
- ])
- r.drawPolygon([
- (12, 26),
- (16, 26),
- (16, 28),
- (12, 28)
- ])
- r.drawCircle(18, 9, 6)
- r.setLineColor(0, 0, 0)
- r.setColor(0, 0, 0)
- r.drawCircle(18, 9, 2)
- class Ball(WorldObj):
- def __init__(self, color='blue'):
- super(Ball, self).__init__('ball', color)
- def can_pickup(self):
- return True
- def render(self, r):
- self._set_color(r)
- r.drawCircle(CELL_PIXELS * 0.5, CELL_PIXELS * 0.5, 10)
- class Box(WorldObj):
- def __init__(self, color, contains=None):
- super(Box, self).__init__('box', color)
- self.contains = contains
- def can_pickup(self):
- return True
- def render(self, r):
- c = COLORS[self.color]
- r.setLineColor(c[0], c[1], c[2])
- r.setColor(0, 0, 0)
- r.setLineWidth(2)
- r.drawPolygon([
- (4 , CELL_PIXELS-4),
- (CELL_PIXELS-4, CELL_PIXELS-4),
- (CELL_PIXELS-4, 4),
- (4 , 4)
- ])
- r.drawLine(
- 4,
- CELL_PIXELS / 2,
- CELL_PIXELS - 4,
- CELL_PIXELS / 2
- )
- r.setLineWidth(1)
- def toggle(self, env, pos):
- # Replace the box by its contents
- env.grid.set(*pos, self.contains)
- return True
- class Grid:
- """
- Represent a grid and operations on it
- """
- def __init__(self, width, height):
- assert width >= 3
- assert height >= 3
- self.width = width
- self.height = height
- self.grid = [None] * width * height
- def __contains__(self, key):
- if isinstance(key, WorldObj):
- for e in self.grid:
- if e is key:
- return True
- elif isinstance(key, tuple):
- for e in self.grid:
- if e is None:
- continue
- if (e.color, e.type) == key:
- return True
- if key[0] is None and key[1] == e.type:
- return True
- return False
- def __eq__(self, other):
- grid1 = self.encode()
- grid2 = other.encode()
- return np.array_equal(grid2, grid1)
- def __ne__(self, other):
- return not self == other
- def copy(self):
- from copy import deepcopy
- return deepcopy(self)
- def set(self, i, j, v):
- assert i >= 0 and i < self.width
- assert j >= 0 and j < self.height
- self.grid[j * self.width + i] = v
- def get(self, i, j):
- assert i >= 0 and i < self.width
- assert j >= 0 and j < self.height
- return self.grid[j * self.width + i]
- def horz_wall(self, x, y, length=None):
- if length is None:
- length = self.width - x
- for i in range(0, length):
- self.set(x + i, y, Wall())
- def vert_wall(self, x, y, length=None):
- if length is None:
- length = self.height - y
- for j in range(0, length):
- self.set(x, y + j, Wall())
- def wall_rect(self, x, y, w, h):
- self.horz_wall(x, y, w)
- self.horz_wall(x, y+h-1, w)
- self.vert_wall(x, y, h)
- self.vert_wall(x+w-1, y, h)
- def rotate_left(self):
- """
- Rotate the grid to the left (counter-clockwise)
- """
- grid = Grid(self.height, self.width)
- for i in range(self.width):
- for j in range(self.height):
- v = self.get(i, j)
- grid.set(j, grid.height - 1 - i, v)
- return grid
- def slice(self, topX, topY, width, height):
- """
- Get a subset of the grid
- """
- grid = Grid(width, height)
- for j in range(0, height):
- for i in range(0, width):
- x = topX + i
- y = topY + j
- if x >= 0 and x < self.width and \
- y >= 0 and y < self.height:
- v = self.get(x, y)
- else:
- v = Wall()
- grid.set(i, j, v)
- return grid
- def render(self, r, tile_size):
- """
- Render this grid at a given scale
- :param r: target renderer object
- :param tile_size: tile size in pixels
- """
- assert r.width == self.width * tile_size
- assert r.height == self.height * tile_size
- # Total grid size at native scale
- widthPx = self.width * CELL_PIXELS
- heightPx = self.height * CELL_PIXELS
- r.push()
- # Internally, we draw at the "large" full-grid resolution, but we
- # use the renderer to scale back to the desired size
- r.scale(tile_size / CELL_PIXELS, tile_size / CELL_PIXELS)
- # Draw the background of the in-world cells black
- r.fillRect(
- 0,
- 0,
- widthPx,
- heightPx,
- 0, 0, 0
- )
- # Draw grid lines
- r.setLineColor(100, 100, 100)
- for rowIdx in range(0, self.height):
- y = CELL_PIXELS * rowIdx
- r.drawLine(0, y, widthPx, y)
- for colIdx in range(0, self.width):
- x = CELL_PIXELS * colIdx
- r.drawLine(x, 0, x, heightPx)
- # Render the grid
- for j in range(0, self.height):
- for i in range(0, self.width):
- cell = self.get(i, j)
- if cell == None:
- continue
- r.push()
- r.translate(i * CELL_PIXELS, j * CELL_PIXELS)
- cell.render(r)
- r.pop()
- r.pop()
- def encode(self, vis_mask=None):
- """
- Produce a compact numpy encoding of the grid
- """
- if vis_mask is None:
- vis_mask = np.ones((self.width, self.height), dtype=bool)
- array = np.zeros((self.width, self.height, 3), dtype='uint8')
- for i in range(self.width):
- for j in range(self.height):
- if vis_mask[i, j]:
- v = self.get(i, j)
- if v is None:
- array[i, j, 0] = OBJECT_TO_IDX['empty']
- array[i, j, 1] = 0
- array[i, j, 2] = 0
- else:
- # State, 0: open, 1: closed, 2: locked
- state = 0
- if hasattr(v, 'is_open') and not v.is_open:
- state = 1
- if hasattr(v, 'is_locked') and v.is_locked:
- state = 2
- array[i, j, 0] = OBJECT_TO_IDX[v.type]
- array[i, j, 1] = COLOR_TO_IDX[v.color]
- array[i, j, 2] = state
- return array
- @staticmethod
- def decode(array):
- """
- Decode an array grid encoding back into a grid
- """
- width, height, channels = array.shape
- assert channels == 3
- grid = Grid(width, height)
- for i in range(width):
- for j in range(height):
- typeIdx, colorIdx, state = array[i, j]
- if typeIdx == OBJECT_TO_IDX['unseen'] or \
- typeIdx == OBJECT_TO_IDX['empty']:
- continue
- objType = IDX_TO_OBJECT[typeIdx]
- color = IDX_TO_COLOR[colorIdx]
- # State, 0: open, 1: closed, 2: locked
- is_open = state == 0
- is_locked = state == 2
- if objType == 'wall':
- v = Wall(color)
- elif objType == 'floor':
- v = Floor(color)
- elif objType == 'ball':
- v = Ball(color)
- elif objType == 'key':
- v = Key(color)
- elif objType == 'box':
- v = Box(color)
- elif objType == 'door':
- v = Door(color, is_open, is_locked)
- elif objType == 'goal':
- v = Goal()
- elif objType == 'lava':
- v = Lava()
- else:
- assert False, "unknown obj type in decode '%s'" % objType
- grid.set(i, j, v)
- return grid
- def process_vis(grid, agent_pos):
- mask = np.zeros(shape=(grid.width, grid.height), dtype=np.bool)
- mask[agent_pos[0], agent_pos[1]] = True
- for j in reversed(range(0, grid.height)):
- for i in range(0, grid.width-1):
- if not mask[i, j]:
- continue
- cell = grid.get(i, j)
- if cell and not cell.see_behind():
- continue
- mask[i+1, j] = True
- if j > 0:
- mask[i+1, j-1] = True
- mask[i, j-1] = True
- for i in reversed(range(1, grid.width)):
- if not mask[i, j]:
- continue
- cell = grid.get(i, j)
- if cell and not cell.see_behind():
- continue
- mask[i-1, j] = True
- if j > 0:
- mask[i-1, j-1] = True
- mask[i, j-1] = True
- for j in range(0, grid.height):
- for i in range(0, grid.width):
- if not mask[i, j]:
- grid.set(i, j, None)
- return mask
- class MiniGridEnv(gym.Env):
- """
- 2D grid world game environment
- """
- metadata = {
- 'render.modes': ['human', 'rgb_array', 'pixmap'],
- 'video.frames_per_second' : 10
- }
- # Enumeration of possible actions
- class Actions(IntEnum):
- # Turn left, turn right, move forward
- left = 0
- right = 1
- forward = 2
- # Pick up an object
- pickup = 3
- # Drop an object
- drop = 4
- # Toggle/activate an object
- toggle = 5
- # Done completing task
- done = 6
- def __init__(
- self,
- grid_size=None,
- width=None,
- height=None,
- max_steps=100,
- see_through_walls=False,
- seed=1337,
- agent_view_size=7
- ):
- # Can't set both grid_size and width/height
- if grid_size:
- assert width == None and height == None
- width = grid_size
- height = grid_size
- # Action enumeration for this environment
- self.actions = MiniGridEnv.Actions
- # Actions are discrete integer values
- self.action_space = spaces.Discrete(len(self.actions))
- # Number of cells (width and height) in the agent view
- self.agent_view_size = agent_view_size
- # Observations are dictionaries containing an
- # encoding of the grid and a textual 'mission' string
- self.observation_space = spaces.Box(
- low=0,
- high=255,
- shape=(self.agent_view_size, self.agent_view_size, 3),
- dtype='uint8'
- )
- self.observation_space = spaces.Dict({
- 'image': self.observation_space
- })
- # Range of possible rewards
- self.reward_range = (0, 1)
- # Renderer object used to render the whole grid (full-scale)
- self.grid_render = None
- # Renderer used to render observations (small-scale agent view)
- self.obs_render = None
- # Environment configuration
- self.width = width
- self.height = height
- self.max_steps = max_steps
- self.see_through_walls = see_through_walls
- # Current position and direction of the agent
- self.agent_pos = None
- self.agent_dir = None
- # Initialize the RNG
- self.seed(seed=seed)
- # Initialize the state
- self.reset()
- def reset(self):
- # Current position and direction of the agent
- self.agent_pos = None
- self.agent_dir = None
- # Generate a new random grid at the start of each episode
- # To keep the same grid for each episode, call env.seed() with
- # the same seed before calling env.reset()
- self._gen_grid(self.width, self.height)
- # These fields should be defined by _gen_grid
- assert self.agent_pos is not None
- assert self.agent_dir is not None
- # Check that the agent doesn't overlap with an object
- start_cell = self.grid.get(*self.agent_pos)
- assert start_cell is None or start_cell.can_overlap()
- # Item picked up, being carried, initially nothing
- self.carrying = None
- # Step count since episode start
- self.step_count = 0
- # Return first observation
- obs = self.gen_obs()
- return obs
- def seed(self, seed=1337):
- # Seed the random number generator
- self.np_random, _ = seeding.np_random(seed)
- return [seed]
- @property
- def steps_remaining(self):
- return self.max_steps - self.step_count
- def __str__(self):
- """
- Produce a pretty string of the environment's grid along with the agent.
- A grid cell is represented by 2-character string, the first one for
- the object and the second one for the color.
- """
- # Map of object types to short string
- OBJECT_TO_STR = {
- 'wall' : 'W',
- 'floor' : 'F',
- 'door' : 'D',
- 'key' : 'K',
- 'ball' : 'A',
- 'box' : 'B',
- 'goal' : 'G',
- 'lava' : 'V',
- }
- # Short string for opened door
- OPENDED_DOOR_IDS = '_'
- # Map agent's direction to short string
- AGENT_DIR_TO_STR = {
- 0: '>',
- 1: 'V',
- 2: '<',
- 3: '^'
- }
- str = ''
- for j in range(self.grid.height):
- for i in range(self.grid.width):
- if i == self.agent_pos[0] and j == self.agent_pos[1]:
- str += 2 * AGENT_DIR_TO_STR[self.agent_dir]
- continue
- c = self.grid.get(i, j)
- if c == None:
- str += ' '
- continue
- if c.type == 'door':
- if c.is_open:
- str += '__'
- elif c.is_locked:
- str += 'L' + c.color[0].upper()
- else:
- str += 'D' + c.color[0].upper()
- continue
- str += OBJECT_TO_STR[c.type] + c.color[0].upper()
- if j < self.grid.height - 1:
- str += '\n'
- return str
- def _gen_grid(self, width, height):
- assert False, "_gen_grid needs to be implemented by each environment"
- def _reward(self):
- """
- Compute the reward to be given upon success
- """
- return 1 - 0.9 * (self.step_count / self.max_steps)
- def _rand_int(self, low, high):
- """
- Generate random integer in [low,high[
- """
- return self.np_random.randint(low, high)
- def _rand_float(self, low, high):
- """
- Generate random float in [low,high[
- """
- return self.np_random.uniform(low, high)
- def _rand_bool(self):
- """
- Generate random boolean value
- """
- return (self.np_random.randint(0, 2) == 0)
- def _rand_elem(self, iterable):
- """
- Pick a random element in a list
- """
- lst = list(iterable)
- idx = self._rand_int(0, len(lst))
- return lst[idx]
- def _rand_subset(self, iterable, num_elems):
- """
- Sample a random subset of distinct elements of a list
- """
- lst = list(iterable)
- assert num_elems <= len(lst)
- out = []
- while len(out) < num_elems:
- elem = self._rand_elem(lst)
- lst.remove(elem)
- out.append(elem)
- return out
- def _rand_color(self):
- """
- Generate a random color name (string)
- """
- return self._rand_elem(COLOR_NAMES)
- def _rand_pos(self, xLow, xHigh, yLow, yHigh):
- """
- Generate a random (x,y) position tuple
- """
- return (
- self.np_random.randint(xLow, xHigh),
- self.np_random.randint(yLow, yHigh)
- )
- def place_obj(self,
- obj,
- top=None,
- size=None,
- reject_fn=None,
- max_tries=math.inf
- ):
- """
- Place an object at an empty position in the grid
- :param top: top-left position of the rectangle where to place
- :param size: size of the rectangle where to place
- :param reject_fn: function to filter out potential positions
- """
- if top is None:
- top = (0, 0)
- else:
- top = (max(top[0], 0), max(top[1], 0))
- if size is None:
- size = (self.grid.width, self.grid.height)
- num_tries = 0
- while True:
- # This is to handle with rare cases where rejection sampling
- # gets stuck in an infinite loop
- if num_tries > max_tries:
- raise RecursionError('rejection sampling failed in place_obj')
- num_tries += 1
- pos = np.array((
- self._rand_int(top[0], min(top[0] + size[0], self.grid.width)),
- self._rand_int(top[1], min(top[1] + size[1], self.grid.height))
- ))
- # Don't place the object on top of another object
- if self.grid.get(*pos) != None:
- continue
- # Don't place the object where the agent is
- if np.array_equal(pos, self.agent_pos):
- continue
- # Check if there is a filtering criterion
- if reject_fn and reject_fn(self, pos):
- continue
- break
- self.grid.set(*pos, obj)
- if obj is not None:
- obj.init_pos = pos
- obj.cur_pos = pos
- return pos
- def place_agent(
- self,
- top=None,
- size=None,
- rand_dir=True,
- max_tries=math.inf
- ):
- """
- Set the agent's starting point at an empty position in the grid
- """
- self.agent_pos = None
- pos = self.place_obj(None, top, size, max_tries=max_tries)
- self.agent_pos = pos
- if rand_dir:
- self.agent_dir = self._rand_int(0, 4)
- return pos
- @property
- def dir_vec(self):
- """
- Get the direction vector for the agent, pointing in the direction
- of forward movement.
- """
- assert self.agent_dir >= 0 and self.agent_dir < 4
- return DIR_TO_VEC[self.agent_dir]
- @property
- def right_vec(self):
- """
- Get the vector pointing to the right of the agent.
- """
- dx, dy = self.dir_vec
- return np.array((-dy, dx))
- @property
- def front_pos(self):
- """
- Get the position of the cell that is right in front of the agent
- """
- return self.agent_pos + self.dir_vec
- def get_view_coords(self, i, j):
- """
- Translate and rotate absolute grid coordinates (i, j) into the
- agent's partially observable view (sub-grid). Note that the resulting
- coordinates may be negative or outside of the agent's view size.
- """
- ax, ay = self.agent_pos
- dx, dy = self.dir_vec
- rx, ry = self.right_vec
- # Compute the absolute coordinates of the top-left view corner
- sz = self.agent_view_size
- hs = self.agent_view_size // 2
- tx = ax + (dx * (sz-1)) - (rx * hs)
- ty = ay + (dy * (sz-1)) - (ry * hs)
- lx = i - tx
- ly = j - ty
- # Project the coordinates of the object relative to the top-left
- # corner onto the agent's own coordinate system
- vx = (rx*lx + ry*ly)
- vy = -(dx*lx + dy*ly)
- return vx, vy
- def get_view_exts(self):
- """
- Get the extents of the square set of tiles visible to the agent
- Note: the bottom extent indices are not included in the set
- """
- # Facing right
- if self.agent_dir == 0:
- topX = self.agent_pos[0]
- topY = self.agent_pos[1] - self.agent_view_size // 2
- # Facing down
- elif self.agent_dir == 1:
- topX = self.agent_pos[0] - self.agent_view_size // 2
- topY = self.agent_pos[1]
- # Facing left
- elif self.agent_dir == 2:
- topX = self.agent_pos[0] - self.agent_view_size + 1
- topY = self.agent_pos[1] - self.agent_view_size // 2
- # Facing up
- elif self.agent_dir == 3:
- topX = self.agent_pos[0] - self.agent_view_size // 2
- topY = self.agent_pos[1] - self.agent_view_size + 1
- else:
- assert False, "invalid agent direction"
- botX = topX + self.agent_view_size
- botY = topY + self.agent_view_size
- return (topX, topY, botX, botY)
- def relative_coords(self, x, y):
- """
- Check if a grid position belongs to the agent's field of view, and returns the corresponding coordinates
- """
- vx, vy = self.get_view_coords(x, y)
- if vx < 0 or vy < 0 or vx >= self.agent_view_size or vy >= self.agent_view_size:
- return None
- return vx, vy
- def in_view(self, x, y):
- """
- check if a grid position is visible to the agent
- """
- return self.relative_coords(x, y) is not None
- def agent_sees(self, x, y):
- """
- Check if a non-empty grid position is visible to the agent
- """
- coordinates = self.relative_coords(x, y)
- if coordinates is None:
- return False
- vx, vy = coordinates
- obs = self.gen_obs()
- obs_grid = Grid.decode(obs['image'])
- obs_cell = obs_grid.get(vx, vy)
- world_cell = self.grid.get(x, y)
- return obs_cell is not None and obs_cell.type == world_cell.type
- def step(self, action):
- self.step_count += 1
- reward = 0
- done = False
- # Get the position in front of the agent
- fwd_pos = self.front_pos
- # Get the contents of the cell in front of the agent
- fwd_cell = self.grid.get(*fwd_pos)
- # Rotate left
- if action == self.actions.left:
- self.agent_dir -= 1
- if self.agent_dir < 0:
- self.agent_dir += 4
- # Rotate right
- elif action == self.actions.right:
- self.agent_dir = (self.agent_dir + 1) % 4
- # Move forward
- elif action == self.actions.forward:
- if fwd_cell == None or fwd_cell.can_overlap():
- self.agent_pos = fwd_pos
- if fwd_cell != None and fwd_cell.type == 'goal':
- done = True
- reward = self._reward()
- if fwd_cell != None and fwd_cell.type == 'lava':
- done = True
- # Pick up an object
- elif action == self.actions.pickup:
- if fwd_cell and fwd_cell.can_pickup():
- if self.carrying is None:
- self.carrying = fwd_cell
- self.carrying.cur_pos = np.array([-1, -1])
- self.grid.set(*fwd_pos, None)
- # Drop an object
- elif action == self.actions.drop:
- if not fwd_cell and self.carrying:
- self.grid.set(*fwd_pos, self.carrying)
- self.carrying.cur_pos = fwd_pos
- self.carrying = None
- # Toggle/activate an object
- elif action == self.actions.toggle:
- if fwd_cell:
- fwd_cell.toggle(self, fwd_pos)
- # Done action (not used by default)
- elif action == self.actions.done:
- pass
- else:
- assert False, "unknown action"
- if self.step_count >= self.max_steps:
- done = True
- obs = self.gen_obs()
- return obs, reward, done, {}
- def gen_obs_grid(self):
- """
- Generate the sub-grid observed by the agent.
- This method also outputs a visibility mask telling us which grid
- cells the agent can actually see.
- """
- topX, topY, botX, botY = self.get_view_exts()
- grid = self.grid.slice(topX, topY, self.agent_view_size, self.agent_view_size)
- for i in range(self.agent_dir + 1):
- grid = grid.rotate_left()
- # Process occluders and visibility
- # Note that this incurs some performance cost
- if not self.see_through_walls:
- vis_mask = grid.process_vis(agent_pos=(self.agent_view_size // 2 , self.agent_view_size - 1))
- else:
- vis_mask = np.ones(shape=(grid.width, grid.height), dtype=np.bool)
- # Make it so the agent sees what it's carrying
- # We do this by placing the carried object at the agent's position
- # in the agent's partially observable view
- agent_pos = grid.width // 2, grid.height - 1
- if self.carrying:
- grid.set(*agent_pos, self.carrying)
- else:
- grid.set(*agent_pos, None)
- return grid, vis_mask
- def gen_obs(self):
- """
- Generate the agent's view (partially observable, low-resolution encoding)
- """
- grid, vis_mask = self.gen_obs_grid()
- # Encode the partially observable view into a numpy array
- image = grid.encode(vis_mask)
- assert hasattr(self, 'mission'), "environments must define a textual mission string"
- # Observations are dictionaries containing:
- # - an image (partially observable view of the environment)
- # - the agent's direction/orientation (acting as a compass)
- # - a textual mission string (instructions for the agent)
- obs = {
- 'image': image,
- 'direction': self.agent_dir,
- 'mission': self.mission
- }
- return obs
- def get_obs_render(self, obs, tile_size=CELL_PIXELS//2, mode='pixmap'):
- """
- Render an agent observation for visualization
- """
- if self.obs_render == None:
- from gym_minigrid.rendering import Renderer
- self.obs_render = Renderer(
- self.agent_view_size * tile_size,
- self.agent_view_size * tile_size
- )
- r = self.obs_render
- r.beginFrame()
- grid = Grid.decode(obs)
- # Render the whole grid
- grid.render(r, tile_size)
- # Draw the agent
- ratio = tile_size / CELL_PIXELS
- r.push()
- r.scale(ratio, ratio)
- r.translate(
- CELL_PIXELS * (0.5 + self.agent_view_size // 2),
- CELL_PIXELS * (self.agent_view_size - 0.5)
- )
- r.rotate(3 * 90)
- r.setLineColor(255, 0, 0)
- r.setColor(255, 0, 0)
- r.drawPolygon([
- (-12, 10),
- ( 12, 0),
- (-12, -10)
- ])
- r.pop()
- r.endFrame()
- if mode == 'rgb_array':
- return r.getArray()
- elif mode == 'pixmap':
- return r.getPixmap()
- return r
- def render(self, mode='human', close=False, highlight=True, tile_size=CELL_PIXELS):
- """
- Render the whole-grid human view
- """
- if close:
- if self.grid_render:
- self.grid_render.close()
- return
- if self.grid_render is None or self.grid_render.window is None or (self.grid_render.width != self.width * tile_size):
- from gym_minigrid.rendering import Renderer
- self.grid_render = Renderer(
- self.width * tile_size,
- self.height * tile_size,
- True if mode == 'human' else False
- )
- r = self.grid_render
- if r.window:
- r.window.setText(self.mission)
- r.beginFrame()
- # Render the whole grid
- self.grid.render(r, tile_size)
- # Draw the agent
- ratio = tile_size / CELL_PIXELS
- r.push()
- r.scale(ratio, ratio)
- r.translate(
- CELL_PIXELS * (self.agent_pos[0] + 0.5),
- CELL_PIXELS * (self.agent_pos[1] + 0.5)
- )
- r.rotate(self.agent_dir * 90)
- r.setLineColor(255, 0, 0)
- r.setColor(255, 0, 0)
- r.drawPolygon([
- (-12, 10),
- ( 12, 0),
- (-12, -10)
- ])
- r.pop()
- # Compute which cells are visible to the agent
- _, vis_mask = self.gen_obs_grid()
- # Compute the absolute coordinates of the bottom-left corner
- # of the agent's view area
- f_vec = self.dir_vec
- r_vec = self.right_vec
- top_left = self.agent_pos + f_vec * (self.agent_view_size-1) - r_vec * (self.agent_view_size // 2)
- # For each cell in the visibility mask
- if highlight:
- for vis_j in range(0, self.agent_view_size):
- for vis_i in range(0, self.agent_view_size):
- # If this cell is not visible, don't highlight it
- if not vis_mask[vis_i, vis_j]:
- continue
- # Compute the world coordinates of this cell
- abs_i, abs_j = top_left - (f_vec * vis_j) + (r_vec * vis_i)
- # Highlight the cell
- r.fillRect(
- abs_i * tile_size,
- abs_j * tile_size,
- tile_size,
- tile_size,
- 255, 255, 255, 75
- )
- r.endFrame()
- if mode == 'rgb_array':
- return r.getArray()
- elif mode == 'pixmap':
- return r.getPixmap()
- return r
|