利用深层LSTM实现sin函数预测
先来简短的说一波概念问题~
RNN的主要思想以及问题:
每次的输入需要依赖上层节点的状态,如果网络结构非常深的话则初始状态就可能会被遗忘,但是在应用中会发现对于当前状态的输出有时需要依赖初始状态,这么说可能不太明白,比如我要预测“随着时间的流逝北京慢慢进入的夏天,....,北京城内种植了大量的柳树、杨树,这导致天空中到处都飘舞着___”,我需要根据夏天这个关键字以及柳树等来预测这里应该出现的是“柳絮”,但夏天这个关键字距离预测点太远,如果利用RNN进行预测很可能会不准确。
LSTM的主要思想:
LSTM依靠“输入门”,‘输出门’,‘遗忘门’来有选择的让信息传输至下一节点,其实这些‘门’就是sigmoid和一个按位做乘法的操作。sigmoid用来将这些值映射到(0,1)之间,该值用于描述当前输入有多少信息量可以通过。具体这些门的介绍可以看如下这篇文章:LSTM详解
另外在TF中,LSTM的函数都已经封装好了,只需要调用就行,可以说是非常方便了,但是这也导致了对于LSTM的理解肥肠片面,有点类似于黑箱算法了。
实践:
下面是基于LSTM来预测sin函数的取值,这里需要先将sin函数的值离散化(因为LSTM只能预测离散时刻的取值),也就是通过有限个采样点模拟一个连续的曲线。这里设置的是每隔0.1就采一次数据
# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
#HIDDEN_SIZE:隐藏节点的个数也就是一个LSTM cell的输出维度,NUM_LAYERS:深层LSTM网络模型的层数
#TIMESTEPS:训练序列长度,TRAINING_STEP:训练次数,BATCH_SIZE:批大小
#TRAIN_EXAMPLE:训练样例数,TEST_EXAMPLE:测试样例数,SAMPLE_GAP:采样间隔
HIDDEN_SIZE = 30
NUM_LAYERS = 2
TIMESTEPS = 10
TRAINING_STEP = 10000
BATCH_SIZE = 32
TRAIN_EXAMPLE = 10000
TEST_EXAMPLE = 1000
SAMPLE_GAP = 0.01
#利用切片的方法生成数据
def generate_data(seq):
x = []
y = []
#len(seq) - TIMESTEPS是为了保证不溢出
for i in range(len(seq) - TIMESTEPS):
#从[i,i+TIMESTEPS)左闭右开为一个输入序列
#以i+TIMESTEPS时刻的sin值作为输出
#即用TIMESTEPS的信息预测第i+TIMESTEPS的值
x.append([seq[i:i + TIMESTEPS]])
y.append([seq[i + TIMESTEPS]])
return np.array(x,dtype=np.float32),np.array(y,dtype=np.float32)
#构建LSTM模型
def lstm_model(x,y,is_training):
#tf.nn.rnn_cell.BasicLSTMCell()该函数可以理解为是一个LSTM块
#一共构建HIDDEN_SIZE块,NUM_LAYERS层
#tf.nn.rnn_cell.MultiRNNCell()构建深层RNN模型的函数
cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE)
for _ in range(NUM_LAYERS)])
#tf.nn.dynamic_rnn()该函数是输出RNN模型中最后一层的各个节点的输出值以及状态值
#output的维度是[batch_size,time,HIDDEN_SIZE]
#我们这里只要取出最终的输出就行所以取output[:,-1,:]
output,_ = tf.nn.dynamic_rnn(cell,x,dtype=tf.float32)
output = output[:,-1,:]
#利用一个全连接层输出预测值
prediction = tf.contrib.layers.fully_connected(output,1,activation_fn=None)
#如果是测试阶段则只输出预测值就行了
if not is_training:
return prediction,None,None
#利用平方差计算损失
loss = tf.losses.mean_squared_error(labels=y,predictions=prediction)
#利用Adagrad优化器优化
train_op = tf.contrib.layers.optimize_loss(loss,tf.train.get_global_step(),
optimizer='Adagrad',learning_rate=0.1)
return prediction,loss,train_op
#训练阶段
def train(sess,train_x,train_y):
#用TF中集成好的Dataset框架读取数据
ds = tf.data.Dataset.from_tensor_slices((train_x,train_y))
ds = ds.repeat().shuffle(1000).batch(BATCH_SIZE)
#定义迭代器获取下一批数据
x,y = ds.make_one_shot_iterator().get_next()
with tf.variable_scope('model'):
prediction,loss,train_op = lstm_model(x,y,True)
sess.run(tf.global_variables_initializer())
print('Begin training the model...')
for i in range(TRAIN_EXAMPLE):
_,loss_value = sess.run([train_op,loss])
#打印出损失
if i % 100 == 0 and i != 0:
print('Step: %d ,loss: %4f' % (i, loss_value))
#测试阶段和训练很像,注意要复用训练阶段训练的参数(reuse=True)
def test(sess,test_x,test_y):
ds = tf.data.Dataset.from_tensor_slices((test_x,test_y))
ds = ds.batch(1)
x,y = ds.make_one_shot_iterator().get_next()
with tf.variable_scope('model',reuse=True):
prediction,_,_ = lstm_model(x,[0.0],False)
predictions = []
labels = []
print('Begin testing the model...')
for i in range(TEST_EXAMPLE):
p,l = sess.run([prediction,y])
predictions.append(p)
labels.append(l)
predictions = np.array(predictions).squeeze()
labels = np.array(labels).squeeze()
rmse = np.sqrt((predictions-labels)**2).mean(axis=0)
print('Mean square error is: %4f'% rmse)
#可视化测试结果
plt.figure()
plt.plot(predictions,label='prediction')
plt.plot(labels,label='label')
plt.legend()
plt.show()
#构造数据集
test_start = (TRAIN_EXAMPLE+TIMESTEPS) * SAMPLE_GAP
test_end = test_start + (TEST_EXAMPLE + TIMESTEPS) * SAMPLE_GAP
#np.linspace()是构造一个等差数列,参数为:起点,终点,序列长度
train_x,train_y = generate_data(np.sin(np.linspace(0,test_start,
TRAIN_EXAMPLE + TIMESTEPS,dtype=np.float32)))
test_x,test_y = generate_data(np.sin(np.linspace(test_start,test_end,
TEST_EXAMPLE + TIMESTEPS,dtype=np.float32)))
with tf.Session() as sess:
train(sess,train_x,train_y)
print('Train over.')
test(sess,test_x,test_y)