123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #!/usr/bin/env python3
- import time
- import random
- import numpy as np
- import gym
- from gym_minigrid.register import env_list
- from gym_minigrid.minigrid import Grid, OBJECT_TO_IDX
- from gym_minigrid.wrappers import *
- import babyai
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- from torch.autograd import Variable
- import torchvision
- import numpy as np
- import cv2
- import PIL
- ##############################################################################
- def make_var(arr):
- arr = np.ascontiguousarray(arr)
- #arr = torch.from_numpy(arr).float()
- arr = torch.from_numpy(arr)
- arr = Variable(arr)
- if torch.cuda.is_available():
- arr = arr.cuda()
- return arr
- def init_weights(m):
- classname = m.__class__.__name__
- if classname.startswith('Conv'):
- nn.init.orthogonal_(m.weight.data)
- m.bias.data.fill_(0)
- elif classname.find('Linear') != -1:
- nn.init.xavier_uniform_(m.weight)
- m.bias.data.fill_(0)
- elif classname.find('BatchNorm') != -1:
- m.weight.data.normal_(1.0, 0.02)
- m.bias.data.fill_(0)
- class Flatten(nn.Module):
- """
- Flatten layer, to flatten convolutional layer output
- """
- def forward(self, input):
- return input.view(input.size(0), -1)
- class Model(nn.Module):
- def __init__(self):
- super().__init__()
- self.layers = nn.Sequential(
- nn.Conv2d(in_channels=20, out_channels=32, kernel_size=1),
- nn.LeakyReLU(),
- nn.Conv2d(in_channels=32, out_channels=32, kernel_size=1),
- nn.LeakyReLU(),
- nn.Conv2d(in_channels=32, out_channels=2, kernel_size=1),
- nn.LeakyReLU(),
- nn.Conv2d(in_channels=2, out_channels=2, kernel_size=7),
- nn.LeakyReLU(),
- Flatten(),
- # Two output heads, one for each class
- nn.Linear(2, 2)
- )
- self.apply(init_weights)
- def forward(self, obs):
- obs = obs / 16
- out = self.layers(obs)
- return out
- def present_prob(self, obs):
- obs = make_var(obs).unsqueeze(0)
- logits = self(obs)
- probs = F.softmax(logits, dim=-1)
- probs = probs.detach().cpu().squeeze().numpy()
- return probs[1]
- seed = 0
- env = gym.make('BabyAI-GoToRedBall-v0')
- env = OneHotPartialObsWrapper(env)
- def sample_batch(batch_size=128):
- global seed
- imgs = []
- labels = []
- for i in range(batch_size):
- seed += 1
- env.seed(i)
- unwrapped_obs = env.unwrapped.reset()['image']
- env.seed(i)
- obs = env.reset()['image']
- ball_visible = ('red', 'ball') in Grid.decode(unwrapped_obs)
- obs = obs.transpose([2, 0, 1])
- imgs.append(np.copy(obs))
- labels.append(ball_visible)
- imgs = np.stack(imgs).astype(np.float32)
- labels = np.array(labels, dtype=np.long)
- return imgs, labels
- print('Generating test set')
- test_imgs, test_labels = sample_batch(256)
- def eval_model(model):
- num_true = 0
- for idx in range(test_imgs.shape[0]):
- img = test_imgs[idx]
- label = test_labels[idx]
- p = model.present_prob(img)
- out_label = p > 0.5
- #print(out_label)
- if np.equal(out_label, label):
- num_true += 1
- #else:
- # if label:
- # print("incorrectly predicted as absent")
- # else:
- # print("incorrectly predicted as present")
- acc = 100 * (num_true / test_imgs.shape[0])
- return acc
- ##############################################################################
- batch_size = 128
- model = Model()
- model.cuda()
- optimizer = optim.Adam(
- model.parameters(),
- lr=5e-4
- )
- criterion = nn.CrossEntropyLoss()
- running_loss = None
- for batch_no in range(1, 10000):
- batch_imgs, labels = sample_batch(batch_size)
- batch_imgs = make_var(batch_imgs)
- labels = make_var(labels)
- pred = model(batch_imgs)
- loss = criterion(pred, labels)
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- loss = loss.data.detach().item()
- running_loss = loss if running_loss is None else 0.99 * running_loss + 0.01 * loss
- print('batch #{}, frames={}, loss={:.5f}'.format(
- batch_no,
- batch_no * batch_size,
- running_loss
- ))
- if batch_no % 25 == 0:
- acc = eval_model(model)
- print('accuracy: {:.2f}%'.format(acc))
|