123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- #!/usr/bin/env python
- # coding: utf-8
- # %%
- # ## Introduction
- #
- # This example demonstrates how to implement an autoregressive language model
- # using a miniature version of the GPT model.
- # The model consists of a single Transformer block with causal masking
- # in its attention layer.
- #
- #
- # **References:**
- #
- # - [GPT](https://www.semanticscholar.org/paper/Improving-Language-Understanding-by-Generative-Radford/cd18800a0fe0b668a1cc19f2ec95b5003d0a5035)
- # - [GPT-2](https://www.semanticscholar.org/paper/Language-Models-are-Unsupervised-Multitask-Learners-Radford-Wu/9405cc0d6169988371b2755e573cc28650d14dfe)
- # - [GPT-3](https://arxiv.org/abs/2005.14165)
- # ## Setup
- import argparse
- import tensorflow as tf
- from tensorflow import keras
- from tensorflow.keras import layers
- from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
- import numpy as np
- import re
- import string
- import random
- import os
- import sys
- import time
- # Disable warning , info etc.
- os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
- os.environ['CUDA_VISIBLE_DEVICES']='0'
- ### Prepare the data for word-level language modelling
- def parse_args():
- parser = argparse.ArgumentParser()
- parser.add_argument("--batch-size", type=int, default=256, help="Batch size")
- args = parser.parse_args()
- return args
- # ## Implement a Transformer block as a layer
- def causal_attention_mask(batch_size, n_dest, n_src, dtype):
- """
- Mask the upper half of the dot product matrix in self attention.
- This prevents flow of information from future tokens to current token.
- 1's in the lower triangle, counting from the lower right corner.
- """
- i = tf.range(n_dest)[:, None]
- j = tf.range(n_src)
- m = i >= j - n_src + n_dest
- mask = tf.cast(m, dtype)
- mask = tf.reshape(mask, [1, n_dest, n_src])
- mult = tf.concat(
- [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
- )
- return tf.tile(mask, mult)
- class TransformerBlock(layers.Layer):
- def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
- super(TransformerBlock, self).__init__()
- self.att = layers.MultiHeadAttention(num_heads, embed_dim)
- self.ffn = keras.Sequential(
- [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
- )
- self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
- self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
- self.dropout1 = layers.Dropout(rate)
- self.dropout2 = layers.Dropout(rate)
- def call(self, inputs):
- input_shape = tf.shape(inputs)
- batch_size = input_shape[0]
- seq_len = input_shape[1]
- causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
- attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
- attention_output = self.dropout1(attention_output)
- out1 = self.layernorm1(inputs + attention_output)
- ffn_output = self.ffn(out1)
- ffn_output = self.dropout2(ffn_output)
- return self.layernorm2(out1 + ffn_output)
- # ## Implement an embedding layer
- #
- # Create two seperate embedding layers: one for tokens and one for token index
- # (positions).
- class TokenAndPositionEmbedding(layers.Layer):
- def __init__(self, maxlen, vocab_size, embed_dim):
- super(TokenAndPositionEmbedding, self).__init__()
- self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
- self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)
- def call(self, x):
- maxlen = tf.shape(x)[-1]
- positions = tf.range(start=0, limit=maxlen, delta=1)
- positions = self.pos_emb(positions)
- x = self.token_emb(x)
- return x + positions
- def create_model():
- inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
- embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
- x = embedding_layer(inputs)
- transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
- x = transformer_block(x)
- outputs = layers.Dense(vocab_size)(x)
-
- model = keras.Model(inputs=inputs, outputs=[outputs, x])
-
- loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
-
- model.compile(
- "adam", loss=[loss_fn, None],
- ) # No loss and optimization based on word embeddings from transformer block
- return model
- def main():
- args = parse_args()
- global g_args
- g_args = args
- batch_size = args.batch_size
- print("Batch size: "+str(batch_size))
- ### Implement the miniature GPT model
- global vocab_size
- vocab_size = 20000 # Only consider the top 20k words
- global maxlen
- maxlen = 80 # Max sequence size
- global embed_dim
- embed_dim = 256 # Embedding size for each token
- global num_heads
- num_heads = 2 # Number of attention heads
- global feed_forward_dim
- feed_forward_dim = 256 # Hidden layer size in feed forward network inside transformer
- # The dataset contains each review in a separate text file
- # The text files are present in four different folders
- # Create a list all files
- filenames = []
- directories = [
- "/workspace/python/source_code/Data/wikitext-2"
- ]
- for dir in directories:
- for f in os.listdir(dir):
- filenames.append(os.path.join(dir, f))
- # print(f"{len(filenames)} files")
- # Create a dataset from text files
- random.shuffle(filenames)
- text_ds = tf.data.TextLineDataset(filenames)
- text_ds = text_ds.shuffle(buffer_size=256)
- text_ds = text_ds.batch(batch_size)
- def custom_standardization(input_string):
- """ Remove html line-break tags and handle punctuation """
- lowercased = tf.strings.lower(input_string)
- stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")
- return tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")
- # Create a vectorization layer and adapt it to the text
- vectorize_layer = TextVectorization(
- standardize=custom_standardization,
- max_tokens=vocab_size - 1,
- output_mode="int",
- output_sequence_length=maxlen + 1,
- )
- vectorize_layer.adapt(text_ds)
- vocab = vectorize_layer.get_vocabulary() # To get words back from token indices
- def prepare_lm_inputs_labels(text):
- """
- Shift word sequences by 1 position so that the target for position (i) is
- word at position (i+1). The model will use all words up till position (i)
- to predict the next word.
- """
- text = tf.expand_dims(text, -1)
- tokenized_sentences = vectorize_layer(text)
- x = tokenized_sentences[:, :-1]
- y = tokenized_sentences[:, 1:]
- return x, y
- text_ds = text_ds.map(prepare_lm_inputs_labels)
- text_ds = text_ds.prefetch(tf.data.experimental.AUTOTUNE)
- # ## Implement a Keras callback for generating text
- # %%
- class PrintLR(tf.keras.callbacks.Callback):
- def __init__(self, total_images=0):
- self.total_images = total_images
- def on_epoch_begin(self, epoch, logs=None):
- self.epoch_start_time = time.time()
- def on_epoch_end(self, epoch, logs=None):
- epoch_time = time.time() - self.epoch_start_time
- print('Epoch time : {}'.format(epoch_time))
- images_per_sec = round(self.total_images / epoch_time, 2)
- print('Units/sec: {}'.format(images_per_sec))
- model = create_model()
- model.fit(text_ds, verbose=1, epochs=3, callbacks=[PrintLR(total_images=44880)])
- main()
|