baby_ai_bot.py 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. from __future__ import annotations
  2. import numpy as np
  3. from minigrid.core.world_object import WorldObj
  4. from minigrid.envs.babyai.core.verifier import (
  5. AfterInstr,
  6. AndInstr,
  7. BeforeInstr,
  8. GoToInstr,
  9. ObjDesc,
  10. OpenInstr,
  11. PickupInstr,
  12. PutNextInstr,
  13. )
  14. class DisappearedBoxError(Exception):
  15. """
  16. Error that's thrown when a box is opened.
  17. We make the assumption that the bot cannot accomplish the mission when it happens.
  18. """
  19. def __init__(self, value):
  20. self.value = value
  21. def __str__(self):
  22. return repr(self.value)
  23. def manhattan_distance(pos, target):
  24. return np.abs(target[0] - pos[0]) + np.abs(target[1] - pos[1])
  25. class Subgoal:
  26. """The base class for all possible Bot subgoals.
  27. Args:
  28. bot (BabyAIBot): The bot whose subgoal this is.
  29. datum (object): The first parameter of the subgoal, e.g. a location or an object description.
  30. reason (str): Why this subgoal was created. Subgoals created for different reasons require
  31. """
  32. def __init__(self, bot: BabyAIBot, datum=None, reason=None):
  33. self.bot = bot
  34. self.datum = datum
  35. self.reason = reason
  36. self.update_agent_attributes()
  37. self.actions = self.bot.mission.unwrapped.actions
  38. def __repr__(self):
  39. """Mainly for debugging purposes"""
  40. representation = "("
  41. representation += type(self).__name__
  42. if self.datum is not None:
  43. representation += f": {self.datum}"
  44. if self.reason is not None:
  45. representation += f", reason: {self.reason}"
  46. representation += ")"
  47. return representation
  48. def update_agent_attributes(self):
  49. """Should be called at each step before the replanning methods."""
  50. self.pos = self.bot.mission.unwrapped.agent_pos
  51. self.dir_vec = self.bot.mission.unwrapped.dir_vec
  52. self.right_vec = self.bot.mission.unwrapped.right_vec
  53. self.fwd_pos = self.pos + self.dir_vec
  54. self.fwd_cell = self.bot.mission.unwrapped.grid.get(*self.fwd_pos)
  55. self.carrying = self.bot.mission.unwrapped.carrying
  56. def replan_before_action(self):
  57. """Change the plan if needed and return a suggested action.
  58. This method is called at every iteration for the top-most subgoal
  59. from the stack. It is supposed to return a suggested action if
  60. it is clear how to proceed towards achieving the current subgoal.
  61. If the subgoal is already achieved, or if it is not clear how it
  62. can be achieved, or if is clear that a better plan exists,
  63. this method can replan by pushing new subgoals
  64. from the stack or popping the top one.
  65. Returns:
  66. action (object): A suggested action if known, `None` the stack has been
  67. altered and further replanning is required.
  68. """
  69. raise NotImplementedError()
  70. def replan_after_action(self, action_taken):
  71. """Change the plan when the taken action is known.
  72. The action actually taken by the agent can be different from the one
  73. suggested by `replan_before_action` is the bot can be used in
  74. advising mode. This method is supposed to adjust the plan in the view
  75. of the actual action taken.
  76. """
  77. pass
  78. def is_exploratory(self):
  79. """Whether the subgoal is exploratory or not.
  80. Exploratory subgoals can be removed from the stack by the bot, e.g.
  81. when no more exploration is required.
  82. """
  83. return False
  84. def _plan_undo_action(self, action_taken):
  85. """Plan how to undo the taken action."""
  86. if action_taken == self.actions.forward:
  87. # check if the 'forward' action was successful
  88. if not np.array_equal(self.bot.prev_agent_pos, self.pos):
  89. self.bot.stack.append(GoNextToSubgoal(self.bot, self.pos))
  90. elif action_taken == self.actions.left:
  91. old_fwd_pos = self.pos + self.right_vec
  92. self.bot.stack.append(GoNextToSubgoal(self.bot, old_fwd_pos))
  93. elif action_taken == self.actions.right:
  94. old_fwd_pos = self.pos - self.right_vec
  95. self.bot.stack.append(GoNextToSubgoal(self.bot, old_fwd_pos))
  96. elif (
  97. action_taken == self.actions.drop
  98. and self.bot.prev_carrying != self.carrying
  99. ):
  100. # get that thing back, if dropping was successful
  101. assert self.fwd_cell.type in ("key", "box", "ball")
  102. self.bot.stack.append(PickupSubgoal(self.bot))
  103. elif (
  104. action_taken == self.actions.pickup
  105. and self.bot.prev_carrying != self.carrying
  106. ):
  107. # drop that thing where you found it
  108. fwd_cell = self.bot.mission.unwrapped.grid.get(*self.fwd_pos)
  109. self.bot.stack.append(DropSubgoal(self.bot))
  110. elif action_taken == self.actions.toggle:
  111. # if you opened or closed a door, bring it back in the original state
  112. fwd_cell = self.bot.mission.unwrapped.grid.get(*self.fwd_pos)
  113. if (
  114. fwd_cell
  115. and fwd_cell.type == "door"
  116. and self.bot.fwd_door_was_open != fwd_cell.is_open
  117. ):
  118. self.bot.stack.append(
  119. CloseSubgoal(self.bot)
  120. if fwd_cell.is_open
  121. else OpenSubgoal(self.bot)
  122. )
  123. class CloseSubgoal(Subgoal):
  124. def replan_before_action(self):
  125. assert self.fwd_cell is not None, "Forward cell is empty"
  126. assert self.fwd_cell.type == "door", "Forward cell has to be a door"
  127. assert self.fwd_cell.is_open, "Forward door must be open"
  128. return self.actions.toggle
  129. def replan_after_action(self, action_taken):
  130. if action_taken is None or action_taken == self.actions.toggle:
  131. self.bot.stack.pop()
  132. elif action_taken in [
  133. self.actions.forward,
  134. self.actions.left,
  135. self.actions.right,
  136. ]:
  137. self._plan_undo_action(action_taken)
  138. class OpenSubgoal(Subgoal):
  139. """Subgoal for opening doors.
  140. Args:
  141. reason (str): `None`, `"Unlock"`, or `"UnlockAndKeepKey"`. If the reason is
  142. `"Unlock"`, the agent will plan dropping the key somewhere after it opens the
  143. door (see `replan_after_action`). When the agent faces the door, and the reason
  144. is `None`, this subgoals replaces itself with a similar one, but with with the
  145. reason `"Unlock"`. `reason="UnlockAndKeepKey` means that the agent should not
  146. schedule the dropping of the key when it faces a locked door, and should instead
  147. keep the key.
  148. """
  149. def replan_before_action(self):
  150. assert self.fwd_cell is not None, "Forward cell is empty"
  151. assert self.fwd_cell.type == "door", "Forward cell has to be a door"
  152. # If the door is locked, go find the key and then return
  153. # TODO: do we really need to be in front of the locked door
  154. # to realize that we need the key for it ?
  155. got_the_key = (
  156. self.carrying
  157. and self.carrying.type == "key"
  158. and self.carrying.color == self.fwd_cell.color
  159. )
  160. if self.fwd_cell.is_locked and not got_the_key:
  161. # Find the key
  162. key_desc = ObjDesc("key", self.fwd_cell.color)
  163. key_desc.find_matching_objs(self.bot.mission)
  164. # If we're already carrying something
  165. if self.carrying:
  166. self.bot.stack.pop()
  167. # Find a location to drop what we're already carrying
  168. drop_pos_cur = self.bot._find_drop_pos()
  169. # Take back the object being carried
  170. self.bot.stack.append(PickupSubgoal(self.bot))
  171. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos_cur))
  172. # Go back to the door and open it
  173. self.bot.stack.append(OpenSubgoal(self.bot))
  174. self.bot.stack.append(GoNextToSubgoal(self.bot, tuple(self.fwd_pos)))
  175. # Go to the key and pick it up
  176. self.bot.stack.append(PickupSubgoal(self.bot))
  177. self.bot.stack.append(GoNextToSubgoal(self.bot, key_desc))
  178. # Drop the object being carried
  179. self.bot.stack.append(DropSubgoal(self.bot))
  180. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos_cur))
  181. else:
  182. # This branch is will be used very rarely, given that
  183. # GoNextToSubGoal(..., reason='Open') should plan
  184. # going to the key before we get to stand right in front of a door.
  185. # But the agent can be spawned right in front of a open door,
  186. # for which we case we do need this code.
  187. self.bot.stack.pop()
  188. # Go back to the door and open it
  189. self.bot.stack.append(OpenSubgoal(self.bot))
  190. self.bot.stack.append(GoNextToSubgoal(self.bot, tuple(self.fwd_pos)))
  191. # Go to the key and pick it up
  192. self.bot.stack.append(PickupSubgoal(self.bot))
  193. self.bot.stack.append(GoNextToSubgoal(self.bot, key_desc))
  194. return
  195. if self.fwd_cell.is_open:
  196. self.bot.stack.append(CloseSubgoal(self.bot))
  197. return
  198. if self.fwd_cell.is_locked and self.reason is None:
  199. self.bot.stack.pop()
  200. self.bot.stack.append(OpenSubgoal(self.bot, reason="Unlock"))
  201. return
  202. return self.actions.toggle
  203. def replan_after_action(self, action_taken):
  204. if action_taken is None or action_taken == self.actions.toggle:
  205. self.bot.stack.pop()
  206. if self.reason == "Unlock":
  207. # The reason why this has to be planned after the action is taken
  208. # is because if the position for dropping is chosen in advance,
  209. # then by the time the key is dropped there, it might already
  210. # be occupied.
  211. drop_key_pos = self.bot._find_drop_pos()
  212. self.bot.stack.append(DropSubgoal(self.bot))
  213. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_key_pos))
  214. else:
  215. self._plan_undo_action(action_taken)
  216. class DropSubgoal(Subgoal):
  217. def replan_before_action(self):
  218. assert self.bot.mission.unwrapped.carrying
  219. assert not self.fwd_cell
  220. return self.actions.drop
  221. def replan_after_action(self, action_taken):
  222. if action_taken is None or action_taken == self.actions.drop:
  223. self.bot.stack.pop()
  224. elif action_taken in [
  225. self.actions.forward,
  226. self.actions.left,
  227. self.actions.right,
  228. ]:
  229. self._plan_undo_action(action_taken)
  230. class PickupSubgoal(Subgoal):
  231. def replan_before_action(self):
  232. assert not self.bot.mission.unwrapped.carrying
  233. return self.actions.pickup
  234. def replan_after_action(self, action_taken):
  235. if action_taken is None or action_taken == self.actions.pickup:
  236. self.bot.stack.pop()
  237. elif action_taken in [self.actions.left, self.actions.right]:
  238. self._plan_undo_action(action_taken)
  239. class GoNextToSubgoal(Subgoal):
  240. """The subgoal for going next to objects or positions.
  241. Args:
  242. datum (int, int): tuple or `ObjDesc` or object reference
  243. The position or the description of the object or
  244. the object to which we are going.
  245. reason (str): One of the following:
  246. - `None`: go the position (object) and face it
  247. - `"PutNext"`: go face an empty position next to the object specified by `datum`
  248. - `"Explore"`: going to a position, just like when the reason is `None`. The only
  249. difference is that with this reason the subgoal will be considered exploratory
  250. """
  251. def replan_before_action(self):
  252. target_obj = None
  253. if isinstance(self.datum, ObjDesc):
  254. target_obj, target_pos = self.bot._find_obj_pos(
  255. self.datum, self.reason == "PutNext"
  256. )
  257. if not target_pos:
  258. # No path found -> Explore the world
  259. self.bot.stack.append(ExploreSubgoal(self.bot))
  260. return
  261. elif isinstance(self.datum, WorldObj):
  262. target_obj = self.datum
  263. target_pos = target_obj.cur_pos
  264. else:
  265. target_pos = tuple(self.datum)
  266. # Suppore we are walking towards the door that we would like to open,
  267. # it is locked, and we don't have the key. What do we do? If we are carrying
  268. # something, it makes to just continue, as we still need to bring this object
  269. # close to the door. If we are not carrying anything though, then it makes
  270. # sense to change the plan and go straight for the required key.
  271. if (
  272. self.reason == "Open"
  273. and target_obj
  274. and target_obj.type == "door"
  275. and target_obj.is_locked
  276. ):
  277. key_desc = ObjDesc("key", target_obj.color)
  278. key_desc.find_matching_objs(self.bot.mission)
  279. if not self.carrying:
  280. # No we need to commit to going to this particular door
  281. self.bot.stack.pop()
  282. self.bot.stack.append(
  283. GoNextToSubgoal(self.bot, target_obj, reason="Open")
  284. )
  285. self.bot.stack.append(PickupSubgoal(self.bot))
  286. self.bot.stack.append(GoNextToSubgoal(self.bot, key_desc))
  287. return
  288. # The position we are on is the one we should go next to
  289. # -> Move away from it
  290. if manhattan_distance(target_pos, self.pos) == (
  291. 1 if self.reason == "PutNext" else 0
  292. ):
  293. def steppable(cell):
  294. return cell is None or (cell.type == "door" and cell.is_open)
  295. if steppable(self.fwd_cell):
  296. return self.actions.forward
  297. if steppable(
  298. self.bot.mission.unwrapped.grid.get(*(self.pos + self.right_vec))
  299. ):
  300. return self.actions.right
  301. if steppable(
  302. self.bot.mission.unwrapped.grid.get(*(self.pos - self.right_vec))
  303. ):
  304. return self.actions.left
  305. # Spin and hope for the best
  306. return self.actions.left
  307. # We are facing the target cell
  308. # -> subgoal completed
  309. if self.reason == "PutNext":
  310. if manhattan_distance(target_pos, self.fwd_pos) == 1:
  311. if self.fwd_cell is None:
  312. self.bot.stack.pop()
  313. return
  314. if self.fwd_cell.type == "door" and self.fwd_cell.is_open:
  315. # We can't drop an object in the cell where the door is.
  316. # Instead, we add a subgoal on the stack that will force
  317. # the bot to move the target object.
  318. self.bot.stack.append(
  319. GoNextToSubgoal(self.bot, self.fwd_pos + 2 * self.dir_vec)
  320. )
  321. return
  322. else:
  323. if np.array_equal(target_pos, self.fwd_pos):
  324. self.bot.stack.pop()
  325. return
  326. # We are still far from the target
  327. # -> try to find a non-blocker path
  328. path, _, _ = self.bot._shortest_path(
  329. lambda pos, cell: pos == target_pos,
  330. )
  331. # No non-blocker path found and
  332. # reexploration within the room is not allowed or there is nothing to explore
  333. # -> Look for blocker paths
  334. if not path:
  335. path, _, _ = self.bot._shortest_path(
  336. lambda pos, cell: pos == target_pos, try_with_blockers=True
  337. )
  338. # No path found
  339. # -> explore the world
  340. if not path:
  341. self.bot.stack.append(ExploreSubgoal(self.bot))
  342. return
  343. # So there is a path (blocker, or non-blockers)
  344. # -> try following it
  345. next_cell = np.asarray(path[0])
  346. # Choose the action in the case when the forward cell
  347. # is the one we should go next to
  348. if np.array_equal(next_cell, self.fwd_pos):
  349. if self.fwd_cell:
  350. if self.fwd_cell.type == "door":
  351. assert not self.fwd_cell.is_locked
  352. if not self.fwd_cell.is_open:
  353. self.bot.stack.append(OpenSubgoal(self.bot))
  354. return
  355. else:
  356. return self.actions.forward
  357. if self.carrying:
  358. drop_pos_cur = self.bot._find_drop_pos()
  359. drop_pos_block = self.bot._find_drop_pos(drop_pos_cur)
  360. # Take back the object being carried
  361. self.bot.stack.append(PickupSubgoal(self.bot))
  362. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos_cur))
  363. # Pick up the blocking object and drop it
  364. self.bot.stack.append(DropSubgoal(self.bot))
  365. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos_block))
  366. self.bot.stack.append(PickupSubgoal(self.bot))
  367. self.bot.stack.append(GoNextToSubgoal(self.bot, self.fwd_pos))
  368. # Drop the object being carried
  369. self.bot.stack.append(DropSubgoal(self.bot))
  370. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos_cur))
  371. return
  372. else:
  373. drop_pos = self.bot._find_drop_pos()
  374. self.bot.stack.append(DropSubgoal(self.bot))
  375. self.bot.stack.append(GoNextToSubgoal(self.bot, drop_pos))
  376. self.bot.stack.append(PickupSubgoal(self.bot))
  377. return
  378. else:
  379. return self.actions.forward
  380. # The forward cell is not the one we should go to
  381. # -> turn towards the direction we need to go
  382. if np.array_equal(next_cell - self.pos, self.right_vec):
  383. return self.actions.right
  384. elif np.array_equal(next_cell - self.pos, -self.right_vec):
  385. return self.actions.left
  386. # If we reach this point in the code, then the cell is behind us.
  387. # Instead of choosing left or right randomly,
  388. # let's do something that might be useful:
  389. # Because when we're GoingNextTo for the purpose of exploring,
  390. # things might change while on the way to the position we're going to, we should
  391. # pick this right or left wisely.
  392. # The simplest thing we should do is: pick the one
  393. # that doesn't lead you to face a non empty cell.
  394. # One better thing would be to go to the direction
  395. # where the closest wall/door is the furthest
  396. distance_right = self.bot._closest_wall_or_door_given_dir(
  397. self.pos, self.right_vec
  398. )
  399. distance_left = self.bot._closest_wall_or_door_given_dir(
  400. self.pos, -self.right_vec
  401. )
  402. if distance_left > distance_right:
  403. return self.actions.left
  404. return self.actions.right
  405. def replan_after_action(self, action_taken):
  406. if action_taken in [
  407. self.actions.pickup,
  408. self.actions.drop,
  409. self.actions.toggle,
  410. ]:
  411. self._plan_undo_action(action_taken)
  412. def is_exploratory(self):
  413. return self.reason == "Explore"
  414. class ExploreSubgoal(Subgoal):
  415. def replan_before_action(self):
  416. # Find the closest unseen position
  417. _, unseen_pos, with_blockers = self.bot._shortest_path(
  418. lambda pos, cell: not self.bot.vis_mask[pos], try_with_blockers=True
  419. )
  420. if unseen_pos:
  421. self.bot.stack.append(
  422. GoNextToSubgoal(self.bot, unseen_pos, reason="Explore")
  423. )
  424. return None
  425. # Find the closest unlocked unopened door
  426. def unopened_unlocked_door(pos, cell):
  427. return (
  428. cell and cell.type == "door" and not cell.is_locked and not cell.is_open
  429. )
  430. # Find the closest unopened door
  431. def unopened_door(pos, cell):
  432. return cell and cell.type == "door" and not cell.is_open
  433. # Try to find an unlocked door first.
  434. # We do this because otherwise, opening a locked door as
  435. # a subgoal may try to open the same door for exploration,
  436. # resulting in an infinite loop.
  437. _, door_pos, _ = self.bot._shortest_path(
  438. unopened_unlocked_door, try_with_blockers=True
  439. )
  440. if not door_pos:
  441. # Try to find a locker door if an unlocked one is not available.
  442. _, door_pos, _ = self.bot._shortest_path(
  443. unopened_door, try_with_blockers=True
  444. )
  445. # Open the door
  446. if door_pos:
  447. door_obj = self.bot.mission.unwrapped.grid.get(*door_pos)
  448. # If we are going to a locked door, there are two cases:
  449. # - we already have the key, then we should not drop it
  450. # - we don't have the key, in which case eventually we should drop it
  451. got_the_key = (
  452. self.carrying
  453. and self.carrying.type == "key"
  454. and self.carrying.color == door_obj.color
  455. )
  456. open_reason = "KeepKey" if door_obj.is_locked and got_the_key else None
  457. self.bot.stack.pop()
  458. self.bot.stack.append(OpenSubgoal(self.bot, reason=open_reason))
  459. self.bot.stack.append(GoNextToSubgoal(self.bot, door_obj, reason="Open"))
  460. return
  461. assert False, "0nothing left to explore"
  462. def is_exploratory(self):
  463. return True
  464. class BabyAIBot:
  465. """A bot that can solve all BabyAI levels*.
  466. The bot maintains a plan, represented as a stack of the so-called
  467. subgoals. The initial set of subgoals is generated from the instruction.
  468. The subgoals are then executed one after another, unless a change of
  469. plan is required (e.g. the location of the target object is not known
  470. or there other objects in the way). In this case, the bot changes the plan.
  471. The bot can also be used to advice a suboptimal agent, e.g. play the
  472. role of an oracle in algorithms like DAGGER. It changes the plan based on
  473. the actual action that the agent took.
  474. The main method of the bot (and the only one you are supposed to use) is `replan`.
  475. * The bot can solve all BabyAI levels from the original paper. It can also solve
  476. most of the bonus levels from the original BabyAI repository, but fails to solve the
  477. following:
  478. - "BabyAI-PutNextS5N2Carrying-v0",
  479. - "BabyAI-PutNextS6N3Carrying-v0",
  480. - "BabyAI-PutNextS7N4Carrying-v0",
  481. - "BabyAI-KeyInBox-v0".
  482. Args:
  483. mission: a freshly created BabyAI environment
  484. """
  485. def __init__(self, mission):
  486. # Mission to be solved
  487. self.mission = mission
  488. # Grid containing what has been mapped out
  489. # self.grid = Grid(mission.unwrapped.width, mission.unwrapped.height)
  490. # Visibility mask. True for explored/seen, false for unexplored.
  491. self.vis_mask = np.zeros(
  492. shape=(mission.unwrapped.width, mission.unwrapped.height), dtype=bool
  493. )
  494. # Stack of tasks/subtasks to complete (tuples)
  495. self.stack = []
  496. # Process/parse the instructions
  497. self._process_instr(mission.unwrapped.instrs)
  498. # How many BFS searches this bot has performed
  499. self.bfs_counter = 0
  500. # How many steps were made in total in all BFS searches
  501. # performed by this bot
  502. self.bfs_step_counter = 0
  503. def replan(self, action_taken=None):
  504. """Replan and suggest an action.
  505. Call this method once per every iteration of the environment.
  506. Args:
  507. action_taken: The last action that the agent took. Can be `None`, in which
  508. case the bot assumes that the action it suggested was taken (or that it is
  509. the first iteration).
  510. Returns:
  511. suggested_action: The action that the bot suggests. Can be `done` if the
  512. bot thinks that the mission has been accomplished.
  513. """
  514. self._process_obs()
  515. # Check that no box has been opened
  516. self._check_erroneous_box_opening(action_taken)
  517. # TODO: instead of updating all subgoals, just add a couple
  518. # properties to the `Subgoal` class.
  519. for subgoal in self.stack:
  520. subgoal.update_agent_attributes()
  521. if self.stack:
  522. self.stack[-1].replan_after_action(action_taken)
  523. # Clear the stack from the non-essential subgoals
  524. while self.stack and self.stack[-1].is_exploratory():
  525. self.stack.pop()
  526. suggested_action = None
  527. while self.stack:
  528. subgoal = self.stack[-1]
  529. suggested_action = subgoal.replan_before_action()
  530. # If is not clear what can be done for the current subgoal
  531. # (because it is completed, because there is blocker,
  532. # or because exploration is required), keep replanning
  533. if suggested_action is not None:
  534. break
  535. if not self.stack:
  536. suggested_action = self.mission.unwrapped.actions.done
  537. self._remember_current_state()
  538. return suggested_action
  539. def _find_obj_pos(self, obj_desc, adjacent=False):
  540. """Find the position of the closest visible object matching a given description."""
  541. assert len(obj_desc.obj_set) > 0
  542. best_distance_to_obj = 999
  543. best_pos = None
  544. best_obj = None
  545. for i in range(len(obj_desc.obj_set)):
  546. if obj_desc.obj_set[i].type == "wall":
  547. continue
  548. try:
  549. if obj_desc.obj_set[i] == self.mission.unwrapped.carrying:
  550. continue
  551. obj_pos = obj_desc.obj_poss[i]
  552. if self.vis_mask[obj_pos]:
  553. shortest_path_to_obj, _, with_blockers = self._shortest_path(
  554. lambda pos, cell: pos == obj_pos, try_with_blockers=True
  555. )
  556. assert shortest_path_to_obj is not None
  557. distance_to_obj = len(shortest_path_to_obj)
  558. if with_blockers:
  559. # The distance should take into account the steps necessary
  560. # to unblock the way. Instead of computing it exactly,
  561. # we can use a lower bound on this number of steps
  562. # which is 4 when the agent is not holding anything
  563. # (pick, turn, drop, turn back
  564. # and 7 if the agent is carrying something
  565. # (turn, drop, turn back, pick,
  566. # turn to other direction, drop, turn back)
  567. distance_to_obj = len(shortest_path_to_obj) + (
  568. 7 if self.mission.unwrapped.carrying else 4
  569. )
  570. # If we looking for a door and we are currently in that cell
  571. # that contains the door, it will take us at least 2
  572. # (3 if `adjacent == True`) steps to reach the goal.`
  573. if distance_to_obj == 0:
  574. distance_to_obj = 3 if adjacent else 2
  575. # If what we want is to face a location that is adjacent to an object,
  576. # and if we are already right next to this object,
  577. # then we should not prefer this object to those at distance 2
  578. if adjacent and distance_to_obj == 1:
  579. distance_to_obj = 3
  580. if distance_to_obj < best_distance_to_obj:
  581. best_distance_to_obj = distance_to_obj
  582. best_pos = obj_pos
  583. best_obj = obj_desc.obj_set[i]
  584. except IndexError:
  585. # Suppose we are tracking red keys, and we just used a red key to open a door,
  586. # then for the last i, accessing obj_desc.obj_poss[i] will raise an IndexError
  587. # -> Solution: Not care about that red key we used to open the door
  588. pass
  589. return best_obj, best_pos
  590. def _process_obs(self):
  591. """Parse the contents of an observation/image and update our state."""
  592. grid, vis_mask = self.mission.unwrapped.gen_obs_grid()
  593. view_size = self.mission.unwrapped.agent_view_size
  594. pos = self.mission.unwrapped.agent_pos
  595. f_vec = self.mission.unwrapped.dir_vec
  596. r_vec = self.mission.unwrapped.right_vec
  597. # Compute the absolute coordinates of the top-left corner
  598. # of the agent's view area
  599. top_left = pos + f_vec * (view_size - 1) - r_vec * (view_size // 2)
  600. # Mark everything in front of us as visible
  601. for vis_j in range(0, view_size):
  602. for vis_i in range(0, view_size):
  603. if not vis_mask[vis_i, vis_j]:
  604. continue
  605. # Compute the world coordinates of this cell
  606. abs_i, abs_j = top_left - (f_vec * vis_j) + (r_vec * vis_i)
  607. if abs_i < 0 or abs_i >= self.vis_mask.shape[0]:
  608. continue
  609. if abs_j < 0 or abs_j >= self.vis_mask.shape[1]:
  610. continue
  611. self.vis_mask[abs_i, abs_j] = True
  612. def _remember_current_state(self):
  613. self.prev_agent_pos = self.mission.unwrapped.agent_pos
  614. self.prev_carrying = self.mission.unwrapped.carrying
  615. fwd_cell = self.mission.unwrapped.grid.get(
  616. *self.mission.unwrapped.agent_pos + self.mission.unwrapped.dir_vec
  617. )
  618. if fwd_cell and fwd_cell.type == "door":
  619. self.fwd_door_was_open = fwd_cell.is_open
  620. self.prev_fwd_cell = fwd_cell
  621. def _closest_wall_or_door_given_dir(self, position, direction):
  622. distance = 1
  623. while True:
  624. position_to_try = position + distance * direction
  625. # If the current position is outside the field of view,
  626. # stop everything and return the previous one
  627. if not self.mission.unwrapped.in_view(*position_to_try):
  628. return distance - 1
  629. cell = self.mission.unwrapped.grid.get(*position_to_try)
  630. if cell and (cell.type.endswith("door") or cell.type == "wall"):
  631. return distance
  632. distance += 1
  633. def _breadth_first_search(self, initial_states, accept_fn, ignore_blockers):
  634. """Performs breadth first search.
  635. This is pretty much your textbook BFS. The state space is agent's locations,
  636. but the current direction is also added to the queue to slightly prioritize
  637. going straight over turning.
  638. """
  639. self.bfs_counter += 1
  640. queue = [(state, None) for state in initial_states]
  641. grid = self.mission.unwrapped.grid
  642. previous_pos = dict()
  643. while len(queue) > 0:
  644. state, prev_pos = queue[0]
  645. queue = queue[1:]
  646. i, j, di, dj = state
  647. if (i, j) in previous_pos:
  648. continue
  649. self.bfs_step_counter += 1
  650. cell = grid.get(i, j)
  651. previous_pos[(i, j)] = prev_pos
  652. # If we reached a position satisfying the acceptance condition
  653. if accept_fn((i, j), cell):
  654. path = []
  655. pos = (i, j)
  656. while pos:
  657. path.append(pos)
  658. pos = previous_pos[pos]
  659. return path, (i, j), previous_pos
  660. # If this cell was not visually observed, don't expand from it
  661. if not self.vis_mask[i, j]:
  662. continue
  663. if cell:
  664. if cell.type == "wall":
  665. continue
  666. # If this is a door
  667. elif cell.type == "door":
  668. # If the door is closed, don't visit neighbors
  669. if not cell.is_open:
  670. continue
  671. elif not ignore_blockers:
  672. continue
  673. # Location to which the bot can get without turning
  674. # are put in the queue first
  675. for k, l in [(di, dj), (dj, di), (-dj, -di), (-di, -dj)]:
  676. next_pos = (i + k, j + l)
  677. next_dir_vec = (k, l)
  678. next_state = (*next_pos, *next_dir_vec)
  679. queue.append((next_state, (i, j)))
  680. # Path not found
  681. return None, None, previous_pos
  682. def _shortest_path(self, accept_fn, try_with_blockers=False):
  683. """
  684. Finds the path to any of the locations that satisfy `accept_fn`.
  685. Prefers the paths that avoid blockers for as long as possible.
  686. """
  687. # Initial states to visit (BFS)
  688. initial_states = [
  689. (*self.mission.unwrapped.agent_pos, *self.mission.unwrapped.dir_vec)
  690. ]
  691. path = finish = None
  692. with_blockers = False
  693. path, finish, previous_pos = self._breadth_first_search(
  694. initial_states, accept_fn, ignore_blockers=False
  695. )
  696. if not path and try_with_blockers:
  697. with_blockers = True
  698. path, finish, _ = self._breadth_first_search(
  699. [(i, j, 1, 0) for i, j in previous_pos], accept_fn, ignore_blockers=True
  700. )
  701. if path:
  702. # `path` now contains the path to a cell that is reachable without
  703. # blockers. Now let's add the path to this cell
  704. pos = path[-1]
  705. extra_path = []
  706. while pos:
  707. extra_path.append(pos)
  708. pos = previous_pos[pos]
  709. path = path + extra_path[1:]
  710. if path:
  711. # And the starting position is not required
  712. path = path[::-1]
  713. path = path[1:]
  714. # Note, that with_blockers only makes sense if path is not None
  715. return path, finish, with_blockers
  716. def _find_drop_pos(self, except_pos=None):
  717. """
  718. Find a position where an object can be dropped, ideally without blocking anything.
  719. """
  720. grid = self.mission.unwrapped.grid
  721. def match_unblock(pos, cell):
  722. # Consider the region of 8 neighboring cells around the candidate cell.
  723. # If dropping the object in the candidate makes this region disconnected,
  724. # then probably it is better to drop elsewhere.
  725. i, j = pos
  726. agent_pos = tuple(self.mission.unwrapped.agent_pos)
  727. if np.array_equal(pos, agent_pos):
  728. return False
  729. if except_pos and np.array_equal(pos, except_pos):
  730. return False
  731. if not self.vis_mask[i, j] or grid.get(i, j):
  732. return False
  733. # We distinguish cells of three classes:
  734. # class 0: the empty ones, including open doors
  735. # class 1: those that are not interesting (just walls so far)
  736. # class 2: all the rest, including objects and cells that are current not visible,
  737. # and hence may contain objects, and also `except_pos` at it may soon contain
  738. # an object
  739. # We want to ensure that empty cells are connected, and that one can reach
  740. # any object cell from any other object cell.
  741. cell_class = []
  742. for k, l in [
  743. (-1, -1),
  744. (0, -1),
  745. (1, -1),
  746. (1, 0),
  747. (1, 1),
  748. (0, 1),
  749. (-1, 1),
  750. (-1, 0),
  751. ]:
  752. nb_pos = (i + k, j + l)
  753. cell = grid.get(*nb_pos)
  754. # completely blocked
  755. if self.vis_mask[nb_pos] and cell and cell.type == "wall":
  756. cell_class.append(1)
  757. # empty
  758. elif (
  759. self.vis_mask[nb_pos]
  760. and (
  761. not cell
  762. or (cell.type == "door" and cell.is_open)
  763. or nb_pos == agent_pos
  764. )
  765. and nb_pos != except_pos
  766. ):
  767. cell_class.append(0)
  768. # an object cell
  769. else:
  770. cell_class.append(2)
  771. # Now we need to check that empty cells are connected. To do that,
  772. # let's check how many times empty changes to non-empty
  773. changes = 0
  774. for i in range(8):
  775. if bool(cell_class[(i + 1) % 8]) != bool(cell_class[i]):
  776. changes += 1
  777. # Lastly, we need check that every object has an adjacent empty cell
  778. for i in range(8):
  779. next_i = (i + 1) % 8
  780. prev_i = (i + 7) % 8
  781. if (
  782. cell_class[i] == 2
  783. and cell_class[prev_i] != 0
  784. and cell_class[next_i] != 0
  785. ):
  786. return False
  787. return changes <= 2
  788. def match_empty(pos, cell):
  789. i, j = pos
  790. if np.array_equal(pos, self.mission.unwrapped.agent_pos):
  791. return False
  792. if except_pos and np.array_equal(pos, except_pos):
  793. return False
  794. if not self.vis_mask[pos] or grid.get(*pos):
  795. return False
  796. return True
  797. _, drop_pos, _ = self._shortest_path(match_unblock)
  798. if not drop_pos:
  799. _, drop_pos, _ = self._shortest_path(match_empty)
  800. if not drop_pos:
  801. _, drop_pos, _ = self._shortest_path(match_unblock, try_with_blockers=True)
  802. if not drop_pos:
  803. _, drop_pos, _ = self._shortest_path(match_empty, try_with_blockers=True)
  804. return drop_pos
  805. def _process_instr(self, instr):
  806. """
  807. Translate instructions into an internal form the agent can execute
  808. """
  809. if isinstance(instr, GoToInstr):
  810. self.stack.append(GoNextToSubgoal(self, instr.desc))
  811. return
  812. if isinstance(instr, OpenInstr):
  813. self.stack.append(OpenSubgoal(self))
  814. self.stack.append(GoNextToSubgoal(self, instr.desc, reason="Open"))
  815. return
  816. if isinstance(instr, PickupInstr):
  817. # We pick up and immediately drop so
  818. # that we may carry other objects
  819. self.stack.append(DropSubgoal(self))
  820. self.stack.append(PickupSubgoal(self))
  821. self.stack.append(GoNextToSubgoal(self, instr.desc))
  822. return
  823. if isinstance(instr, PutNextInstr):
  824. self.stack.append(DropSubgoal(self))
  825. self.stack.append(GoNextToSubgoal(self, instr.desc_fixed, reason="PutNext"))
  826. self.stack.append(PickupSubgoal(self))
  827. self.stack.append(GoNextToSubgoal(self, instr.desc_move))
  828. return
  829. if isinstance(instr, BeforeInstr) or isinstance(instr, AndInstr):
  830. self._process_instr(instr.instr_b)
  831. self._process_instr(instr.instr_a)
  832. return
  833. if isinstance(instr, AfterInstr):
  834. self._process_instr(instr.instr_a)
  835. self._process_instr(instr.instr_b)
  836. return
  837. assert False, "unknown instruction type"
  838. def _check_erroneous_box_opening(self, action):
  839. """
  840. When the agent opens a box, we raise an error and mark the task unsolvable.
  841. This is a tad conservative, because maybe the box is irrelevant to the mission.unwrapped.
  842. """
  843. if (
  844. action == self.mission.unwrapped.actions.toggle
  845. and self.prev_fwd_cell is not None
  846. and self.prev_fwd_cell.type == "box"
  847. ):
  848. raise DisappearedBoxError("A box was opened. I am not sure I can help now.")