| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 | #!/usr/bin/env python3import timeimport randomimport numpy as npimport gymfrom gym_minigrid.register import env_listfrom gym_minigrid.minigrid import Grid, OBJECT_TO_IDXimport babyaiimport torchimport torch.nn as nnimport torch.optim as optimimport torch.nn.functional as Ffrom torch.autograd import Variableimport torchvisionimport numpy as npimport cv2import PIL##############################################################################def make_var(arr):    arr = np.ascontiguousarray(arr)    #arr = torch.from_numpy(arr).float()    arr = torch.from_numpy(arr)    arr = Variable(arr)    if torch.cuda.is_available():        arr = arr.cuda()    return arrdef init_weights(m):    classname = m.__class__.__name__    if classname.startswith('Conv'):        nn.init.orthogonal_(m.weight.data)        m.bias.data.fill_(0)    elif classname.find('Linear') != -1:        nn.init.xavier_uniform_(m.weight)        m.bias.data.fill_(0)    elif classname.find('BatchNorm') != -1:        m.weight.data.normal_(1.0, 0.02)        m.bias.data.fill_(0)class ImageBOWEmbedding(nn.Module):   def __init__(self, max_value, embedding_dim):       super(ImageBOWEmbedding, self).__init__()       self.max_value = max_value       self.embedding_dim = embedding_dim       self.embedding = nn.Embedding(3 * max_value, embedding_dim)   def forward(self, inputs):       offsets = torch.Tensor([0, self.max_value, 2 * self.max_value]).to(inputs.device)       inputs = (inputs + offsets[None, :, None, None]).long()       return self.embedding(inputs).sum(1).permute(0, 3, 1, 2)class Flatten(nn.Module):    """    Flatten layer, to flatten convolutional layer output    """    def forward(self, input):        return input.view(input.size(0), -1)def num_params(model):    pp=0    for p in list(model.parameters()):        nn=1        for s in list(p.size()):            nn = nn*s        pp += nn    return ppclass Model(nn.Module):    def __init__(self):        super().__init__()        self.layers = nn.Sequential(            ImageBOWEmbedding(765, embedding_dim=32),            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=1),            nn.LeakyReLU(),            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=1),            nn.LeakyReLU(),            nn.Conv2d(in_channels=64, out_channels=2, kernel_size=1),            nn.LeakyReLU(),            nn.Conv2d(in_channels=2, out_channels=2, kernel_size=7),            nn.LeakyReLU(),            Flatten(),            # Two output heads, one for each class            nn.Linear(2, 2)        )        self.apply(init_weights)    def forward(self, obs):        #obs = obs / 16        out = self.layers(obs)        return out    def present_prob(self, obs):        obs = make_var(obs).unsqueeze(0)        logits = self(obs)        probs = F.softmax(logits, dim=-1)        probs = probs.detach().cpu().squeeze().numpy()        return probs[1]env = gym.make('BabyAI-GoToRedBall-v0')def sample_batch(batch_size=128):    imgs = []    labels = []    for i in range(batch_size):        obs = env.reset()['image']        ball_visible = ('red', 'ball') in Grid.decode(obs)        obs = obs.transpose([2, 0, 1])        imgs.append(np.copy(obs))        labels.append(ball_visible)    imgs = np.stack(imgs).astype(np.float32)    labels = np.array(labels, dtype=np.long)    return imgs, labelsprint('Generating test set')test_imgs, test_labels = sample_batch(256)def eval_model(model):    num_true = 0    for idx in range(test_imgs.shape[0]):        img = test_imgs[idx]        label = test_labels[idx]        p = model.present_prob(img)        out_label = p > 0.5        #print(out_label)        if np.equal(out_label, label):            num_true += 1        #else:        #    if label:        #        print("incorrectly predicted as absent")        #    else:        #        print("incorrectly predicted as present")    acc = 100 * (num_true / test_imgs.shape[0])    return acc##############################################################################batch_size = 128model = Model()model.cuda()print('Num params:', num_params(model))optimizer = optim.Adam(    model.parameters(),    lr=5e-4)criterion = nn.CrossEntropyLoss()running_loss = Nonefor batch_no in range(1, 10000):    batch_imgs, labels = sample_batch(batch_size)    batch_imgs = make_var(batch_imgs)    labels = make_var(labels)    pred = model(batch_imgs)    loss = criterion(pred, labels)    optimizer.zero_grad()    loss.backward()    optimizer.step()    loss = loss.data.detach().item()    running_loss = loss if running_loss is None else 0.99 * running_loss + 0.01 * loss    print('batch #{}, frames={}, loss={:.5f}'.format(        batch_no,        batch_no * batch_size,        running_loss    ))    if batch_no % 25 == 0:        acc = eval_model(model)        print('accuracy: {:.2f}%'.format(acc))
 |