利用深层LSTM实现sin函数预测

先来简短的说一波概念问题~

RNN的主要思想以及问题:

每次的输入需要依赖上层节点的状态,如果网络结构非常深的话则初始状态就可能会被遗忘,但是在应用中会发现对于当前状态的输出有时需要依赖初始状态,这么说可能不太明白,比如我要预测“随着时间的流逝北京慢慢进入的夏天,....,北京城内种植了大量的柳树、杨树,这导致天空中到处都飘舞着___”,我需要根据夏天这个关键字以及柳树等来预测这里应该出现的是“柳絮”,但夏天这个关键字距离预测点太远,如果利用RNN进行预测很可能会不准确。

LSTM的主要思想:

LSTM依靠“输入门”,‘输出门’,‘遗忘门’来有选择的让信息传输至下一节点,其实这些‘门’就是sigmoid和一个按位做乘法的操作。sigmoid用来将这些值映射到(0,1)之间,该值用于描述当前输入有多少信息量可以通过。具体这些门的介绍可以看如下这篇文章:LSTM详解

另外在TF中,LSTM的函数都已经封装好了,只需要调用就行,可以说是非常方便了,但是这也导致了对于LSTM的理解肥肠片面,有点类似于黑箱算法了。

实践:

下面是基于LSTM来预测sin函数的取值,这里需要先将sin函数的值离散化(因为LSTM只能预测离散时刻的取值),也就是通过有限个采样点模拟一个连续的曲线。这里设置的是每隔0.1就采一次数据

# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt

#HIDDEN_SIZE:隐藏节点的个数也就是一个LSTM cell的输出维度,NUM_LAYERS:深层LSTM网络模型的层数
#TIMESTEPS:训练序列长度,TRAINING_STEP:训练次数,BATCH_SIZE:批大小
#TRAIN_EXAMPLE:训练样例数,TEST_EXAMPLE:测试样例数,SAMPLE_GAP:采样间隔
HIDDEN_SIZE = 30
NUM_LAYERS = 2

TIMESTEPS = 10
TRAINING_STEP = 10000
BATCH_SIZE = 32

TRAIN_EXAMPLE = 10000
TEST_EXAMPLE = 1000
SAMPLE_GAP = 0.01

#利用切片的方法生成数据
def generate_data(seq):
    x = []
    y = []
    #len(seq) - TIMESTEPS是为了保证不溢出
    for i in range(len(seq) - TIMESTEPS):
        #从[i,i+TIMESTEPS)左闭右开为一个输入序列
        #以i+TIMESTEPS时刻的sin值作为输出
        #即用TIMESTEPS的信息预测第i+TIMESTEPS的值
        x.append([seq[i:i + TIMESTEPS]])
        y.append([seq[i + TIMESTEPS]])
    return np.array(x,dtype=np.float32),np.array(y,dtype=np.float32)

#构建LSTM模型
def lstm_model(x,y,is_training):
    #tf.nn.rnn_cell.BasicLSTMCell()该函数可以理解为是一个LSTM块
    #一共构建HIDDEN_SIZE块,NUM_LAYERS层
    #tf.nn.rnn_cell.MultiRNNCell()构建深层RNN模型的函数
    cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
                                       for _ in range(NUM_LAYERS)])
    #tf.nn.dynamic_rnn()该函数是输出RNN模型中最后一层的各个节点的输出值以及状态值
    #output的维度是[batch_size,time,HIDDEN_SIZE]
    #我们这里只要取出最终的输出就行所以取output[:,-1,:]
    output,_ = tf.nn.dynamic_rnn(cell,x,dtype=tf.float32)
    output = output[:,-1,:]
    #利用一个全连接层输出预测值
    prediction = tf.contrib.layers.fully_connected(output,1,activation_fn=None)
    #如果是测试阶段则只输出预测值就行了
    if not is_training:
        return prediction,None,None
    #利用平方差计算损失
    loss = tf.losses.mean_squared_error(labels=y,predictions=prediction)
    #利用Adagrad优化器优化
    train_op = tf.contrib.layers.optimize_loss(loss,tf.train.get_global_step(),
                                               optimizer='Adagrad',learning_rate=0.1)
    return prediction,loss,train_op

#训练阶段
def train(sess,train_x,train_y):
    #用TF中集成好的Dataset框架读取数据
    ds = tf.data.Dataset.from_tensor_slices((train_x,train_y))
    ds = ds.repeat().shuffle(1000).batch(BATCH_SIZE)
    #定义迭代器获取下一批数据
    x,y = ds.make_one_shot_iterator().get_next()

    with tf.variable_scope('model'):
        prediction,loss,train_op = lstm_model(x,y,True)
    sess.run(tf.global_variables_initializer())
    print('Begin training the model...')
    for i in range(TRAIN_EXAMPLE):
        _,loss_value = sess.run([train_op,loss])
        #打印出损失
        if i % 100 == 0 and i != 0:
            print('Step: %d ,loss: %4f' % (i, loss_value))

#测试阶段和训练很像,注意要复用训练阶段训练的参数(reuse=True)
def test(sess,test_x,test_y):
    ds = tf.data.Dataset.from_tensor_slices((test_x,test_y))
    ds = ds.batch(1)
    x,y = ds.make_one_shot_iterator().get_next()
    with tf.variable_scope('model',reuse=True):
        prediction,_,_ = lstm_model(x,[0.0],False)
    predictions = []
    labels = []
    print('Begin testing the model...')
    for i in range(TEST_EXAMPLE):
        p,l = sess.run([prediction,y])
        predictions.append(p)
        labels.append(l)
    predictions = np.array(predictions).squeeze()
    labels = np.array(labels).squeeze()
    rmse = np.sqrt((predictions-labels)**2).mean(axis=0)
    print('Mean square error is: %4f'% rmse)
    #可视化测试结果
    plt.figure()
    plt.plot(predictions,label='prediction')
    plt.plot(labels,label='label')
    plt.legend()
    plt.show()
#构造数据集
test_start = (TRAIN_EXAMPLE+TIMESTEPS) * SAMPLE_GAP
test_end = test_start + (TEST_EXAMPLE + TIMESTEPS) * SAMPLE_GAP
#np.linspace()是构造一个等差数列,参数为:起点,终点,序列长度
train_x,train_y = generate_data(np.sin(np.linspace(0,test_start,
                                                   TRAIN_EXAMPLE + TIMESTEPS,dtype=np.float32)))
test_x,test_y = generate_data(np.sin(np.linspace(test_start,test_end,
                                                 TEST_EXAMPLE + TIMESTEPS,dtype=np.float32)))
with tf.Session() as sess:
    train(sess,train_x,train_y)
    print('Train over.')
    test(sess,test_x,test_y)

结果:

利用深层LSTM实现sin函数预测利用深层LSTM实现sin函数预测