123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- """ Dynamic Recurrent Neural Network.
- TensorFlow implementation of a Recurrent Neural Network (LSTM) that performs
- dynamic computation over sequences with variable length. This example is using
- a toy dataset to classify linear sequences. The generated sequences have
- variable length.
- Links:
- [Long Short Term Memory](http://deeplearning.cs.cmu.edu/pdfs/Hochreiter97_lstm.pdf)
- Author: Aymeric Damien
- Project: https://github.com/aymericdamien/TensorFlow-Examples/
- """
- from __future__ import print_function
- import tensorflow as tf
- import random
- # ====================
- # TOY DATA GENERATOR
- # ====================
- class ToySequenceData(object):
- """ Generate sequence of data with dynamic length.
- This class generate samples for training:
- - Class 0: linear sequences (i.e. [0, 1, 2, 3,...])
- - Class 1: random sequences (i.e. [1, 3, 10, 7,...])
- NOTICE:
- We have to pad each sequence to reach 'max_seq_len' for TensorFlow
- consistency (we cannot feed a numpy array with inconsistent
- dimensions). The dynamic calculation will then be perform thanks to
- 'seqlen' attribute that records every actual sequence length.
- """
- def __init__(self, n_samples=1000, max_seq_len=20, min_seq_len=3,
- max_value=1000):
- self.data = []
- self.labels = []
- self.seqlen = []
- for i in range(n_samples):
- # Random sequence length
- len = random.randint(min_seq_len, max_seq_len)
- # Monitor sequence length for TensorFlow dynamic calculation
- self.seqlen.append(len)
- # Add a random or linear int sequence (50% prob)
- if random.random() < .5:
- # Generate a linear sequence
- rand_start = random.randint(0, max_value - len)
- s = [[float(i)/max_value] for i in
- range(rand_start, rand_start + len)]
- # Pad sequence for dimension consistency
- s += [[0.] for i in range(max_seq_len - len)]
- self.data.append(s)
- self.labels.append([1., 0.])
- else:
- # Generate a random sequence
- s = [[float(random.randint(0, max_value))/max_value]
- for i in range(len)]
- # Pad sequence for dimension consistency
- s += [[0.] for i in range(max_seq_len - len)]
- self.data.append(s)
- self.labels.append([0., 1.])
- self.batch_id = 0
- def next(self, batch_size):
- """ Return a batch of data. When dataset end is reached, start over.
- """
- if self.batch_id == len(self.data):
- self.batch_id = 0
- batch_data = (self.data[self.batch_id:min(self.batch_id +
- batch_size, len(self.data))])
- batch_labels = (self.labels[self.batch_id:min(self.batch_id +
- batch_size, len(self.data))])
- batch_seqlen = (self.seqlen[self.batch_id:min(self.batch_id +
- batch_size, len(self.data))])
- self.batch_id = min(self.batch_id + batch_size, len(self.data))
- return batch_data, batch_labels, batch_seqlen
- # ==========
- # MODEL
- # ==========
- # Parameters
- learning_rate = 0.01
- training_steps = 10000
- batch_size = 128
- display_step = 200
- # Network Parameters
- seq_max_len = 20 # Sequence max length
- n_hidden = 64 # hidden layer num of features
- n_classes = 2 # linear sequence or not
- trainset = ToySequenceData(n_samples=1000, max_seq_len=seq_max_len)
- testset = ToySequenceData(n_samples=500, max_seq_len=seq_max_len)
- # tf Graph input
- x = tf.placeholder("float", [None, seq_max_len, 1])
- y = tf.placeholder("float", [None, n_classes])
- # A placeholder for indicating each sequence length
- seqlen = tf.placeholder(tf.int32, [None])
- # Define weights
- weights = {
- 'out': tf.Variable(tf.random_normal([n_hidden, n_classes]))
- }
- biases = {
- 'out': tf.Variable(tf.random_normal([n_classes]))
- }
- def dynamicRNN(x, seqlen, weights, biases):
- # Prepare data shape to match `rnn` function requirements
- # Current data input shape: (batch_size, n_steps, n_input)
- # Required shape: 'n_steps' tensors list of shape (batch_size, n_input)
-
- # Unstack to get a list of 'n_steps' tensors of shape (batch_size, n_input)
- x = tf.unstack(x, seq_max_len, 1)
- # Define a lstm cell with tensorflow
- lstm_cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
- # Get lstm cell output, providing 'sequence_length' will perform dynamic
- # calculation.
- outputs, states = tf.contrib.rnn.static_rnn(lstm_cell, x, dtype=tf.float32,
- sequence_length=seqlen)
- # When performing dynamic calculation, we must retrieve the last
- # dynamically computed output, i.e., if a sequence length is 10, we need
- # to retrieve the 10th output.
- # However TensorFlow doesn't support advanced indexing yet, so we build
- # a custom op that for each sample in batch size, get its length and
- # get the corresponding relevant output.
- # 'outputs' is a list of output at every timestep, we pack them in a Tensor
- # and change back dimension to [batch_size, n_step, n_input]
- outputs = tf.stack(outputs)
- outputs = tf.transpose(outputs, [1, 0, 2])
- # Hack to build the indexing and retrieve the right output.
- batch_size = tf.shape(outputs)[0]
- # Start indices for each sample
- index = tf.range(0, batch_size) * seq_max_len + (seqlen - 1)
- # Indexing
- outputs = tf.gather(tf.reshape(outputs, [-1, n_hidden]), index)
- # Linear activation, using outputs computed above
- return tf.matmul(outputs, weights['out']) + biases['out']
- pred = dynamicRNN(x, seqlen, weights, biases)
- # Define loss and optimizer
- cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=y))
- optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(cost)
- # Evaluate model
- correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
- accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
- # Initialize the variables (i.e. assign their default value)
- init = tf.global_variables_initializer()
- # Start training
- with tf.Session() as sess:
- # Run the initializer
- sess.run(init)
- for step in range(1, training_steps + 1):
- batch_x, batch_y, batch_seqlen = trainset.next(batch_size)
- # Run optimization op (backprop)
- sess.run(optimizer, feed_dict={x: batch_x, y: batch_y,
- seqlen: batch_seqlen})
- if step % display_step == 0 or step == 1:
- # Calculate batch accuracy & loss
- acc, loss = sess.run([accuracy, cost], feed_dict={x: batch_x, y: batch_y,
- seqlen: batch_seqlen})
- print("Step " + str(step*batch_size) + ", Minibatch Loss= " + \
- "{:.6f}".format(loss) + ", Training Accuracy= " + \
- "{:.5f}".format(acc))
- print("Optimization Finished!")
- # Calculate accuracy
- test_data = testset.data
- test_label = testset.labels
- test_seqlen = testset.seqlen
- print("Testing Accuracy:", \
- sess.run(accuracy, feed_dict={x: test_data, y: test_label,
- seqlen: test_seqlen}))
|