使用seq2seq预测当天的股票价格

1.tensorflow中的seq2seq

在tensorflow中有两套seq2seq的接口。一套是tensorflow1.0版本之前的旧接口,在tf.contrib.legacy_seq2seq下;另一套为tensorflow1.0版本之后推出的新接口,在tf.contrib.seq2seq下。本文使用的是seq2seq的旧接口,旧接口的功能相对简单,是静态的展开模式,新接口功能更加强大,使用的是动态展开网络模型。想要了解新接口的使用情况,推荐这个网址https://blog.****.net/thriving_fcl/article/details/74165062

2.实力描述

使用seq2seq模式对某个股票前段时间的数据训练学习,拟合特征,从而达到第二天预测股票价格的效果。

3.导入股票数据

首先得准备一个股票数据,本人使用的是参考书中所提供的数据,想要数据集的可以在私信我。

添加股票载入函数loadstock,实例中将收盘价格载入内存做样本生成。

import random
import math
        
import tensorflow as tf 
import numpy as np
import matplotlib.pyplot as plt  


import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
def loadstock(window_size):
    names = ['date',
         'code',
         'name',
         'Close',
         'top_price',
         'low_price',
         'opening_price',
         'bef_price',
         'floor_price',
         'floor',
         'exchange',
         'Volume',
         'amount',
         '总市值',
         '流通市值']
    data = pd.read_csv('600000.csv', names=names, header=None,encoding = "gbk")
    #predictor_names = ["Close",'top_price',"low_price","opening_price"]
    predictor_names = ["Close"]
    training_features = np.asarray(data[predictor_names], dtype = "float32")
    kept_values = training_features[1000:]

    X = []
    Y = []
    for i in range(len(kept_values) - window_size * 2):#  x ;前window_size,y后window_size
        X.append(kept_values[i:i + window_size])
        Y.append(kept_values[i + window_size:i + window_size * 2])

    X = np.reshape(X,[-1,window_size,len(predictor_names)])
    Y = np.reshape(Y,[-1,window_size,len(predictor_names)])
    print(np.shape(X))

    return X, Y

3.生成样本

X_train = []
Y_train = []
X_test = []
Y_test = []


def generate_data(isTrain, batch_size):        
    # 用前40个样本预测后40个样本.
    
    seq_length = 40   
    seq_length_test = 80

    global Y_train
    global X_train
    global X_test
    global Y_test
    # First load, with memoization:
    if len(Y_train) == 0:       
        X, Y= loadstock( window_size=seq_length)
        #X, Y = normalizestock(X, Y)

        # Split 80-20:
        X_train = X[:int(len(X) * 0.8)]
        Y_train = Y[:int(len(Y) * 0.8)]


    if len(Y_test) == 0:
        X, Y  = loadstock( window_size=seq_length_test)
        #X, Y = normalizestock(X, Y)

        # Split 80-20:
        X_test = X[int(len(X) * 0.8):]
        Y_test = Y[int(len(Y) * 0.8):]

    if isTrain:
        return do_generate_x_y(X_train, Y_train, batch_size)
    else:
        return do_generate_x_y(X_test,  Y_test,  batch_size)


def do_generate_x_y(X, Y, batch_size):
    assert X.shape == Y.shape, (X.shape, Y.shape)
    idxes = np.random.randint(X.shape[0], size=batch_size)
    X_out = np.array(X[idxes]).transpose((1, 0, 2))
    Y_out = np.array(Y[idxes]).transpose((1, 0, 2))
    return X_out, Y_out

4.设置参数及网络结构

网络模型定义为2层循环网络,每层12个GRUcell,使用MultiRNNcell将多个RNN连到一块,一起传入basic_rnn_seq2seq中。生成结果每时刻有12个grucell输出,所以还需要通过循环在每个时刻下加一个全连接层,将其转为输出维度为1的节点。

sample_now, sample_f = generate_data(isTrain=True, batch_size=3)
print("training examples : ")
print(sample_now.shape)
print("(seq_length, batch_size, output_dim)")


seq_length = sample_now.shape[0]
batch_size = 100

output_dim = input_dim = sample_now.shape[-1]
hidden_dim = 12 #12个grucell
layers_num = 2#两层循环网络

# Optmizer:
learning_rate =0.04#学习率设为0.04
#nb_iters = 100
nb_iters = 100000
lambda_l2_reg = 0.003  # L2调节因子设为0.003

tf.reset_default_graph()



encoder_input = []
expected_output = []
decode_input =[]
for i in range(seq_length):
    encoder_input.append( tf.placeholder(tf.float32, shape=( None, input_dim)) )
    expected_output.append( tf.placeholder(tf.float32, shape=( None, output_dim)) )
    decode_input.append( tf.placeholder(tf.float32, shape=( None, input_dim)) )

    
tcells = []
for i in range(layers_num):
    tcells.append(tf.contrib.rnn.GRUCell(hidden_dim))
Mcell = tf.contrib.rnn.MultiRNNCell(tcells)

dec_outputs, dec_memory = tf.contrib.legacy_seq2seq.basic_rnn_seq2seq(encoder_input,decode_input,Mcell)

reshaped_outputs = []
for ii in dec_outputs :
    reshaped_outputs.append( tf.contrib.layers.fully_connected(ii,output_dim,activation_fn=None))


# 计算l2的loss值
output_loss = 0
for _y, _Y in zip(reshaped_outputs, expected_output):
    output_loss += tf.reduce_mean( tf.pow(_y - _Y, 2) )
   
# 求正则化损失
reg_loss = 0
for tf_var in tf.trainable_variables():
    if not ("fully_connected" in tf_var.name ):
        #print(tf_var.name)
        reg_loss += tf.reduce_mean(tf.nn.l2_loss(tf_var))

loss = output_loss + lambda_l2_reg * reg_loss
train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)   

4.训练样本

在session中将训练和测试单独封装成了两个函数,在train_batch函数里先取指定批次数据,通过循环增来将数据添加到encoder_input和expect_output列表中。

sess = tf.InteractiveSession()
        
def train_batch(batch_size):

    X, Y = generate_data(isTrain=True, batch_size=batch_size)
    feed_dict = {encoder_input[t]: X[t] for t in range(len(encoder_input))}
    feed_dict.update({expected_output[t]: Y[t] for t in range(len(expected_output))})

    c =np.concatenate(( [np.zeros_like(Y[0])],Y[:-1]),axis = 0)

    feed_dict.update({decode_input[t]: c[t] for t in range(len(c))})

    _, loss_t = sess.run([train_op, loss], feed_dict)
    return loss_t


def test_batch(batch_size):
    X, Y = generate_data(isTrain=True, batch_size=batch_size)
    feed_dict = {encoder_input[t]: X[t] for t in range(len(encoder_input))}
    feed_dict.update({expected_output[t]: Y[t] for t in range(len(expected_output))})
    c =np.concatenate(( [np.zeros_like(Y[0])],Y[:-1]),axis = 0)#来预测最后一个序列
    feed_dict.update({decode_input[t]: c[t] for t in range(len(c))})    
    output_lossv,reg_lossv,loss_t = sess.run([output_loss,reg_loss,loss], feed_dict)
    print("-----------------")    
    print(output_lossv,reg_lossv)
    return loss_t


# Training
train_losses = []
test_losses = []

sess.run(tf.global_variables_initializer())
for t in range(nb_iters + 1):
    train_loss = train_batch(batch_size)
    train_losses.append(train_loss)
    if t % 50 == 0:
        test_loss = test_batch(batch_size)
        test_losses.append(test_loss)
        print("Step {}/{}, train loss: {}, \tTEST loss: {}".format(t,nb_iters, train_loss, test_loss))
print("Fin. train loss: {}, \tTEST loss: {}".format(train_loss, test_loss))        
        

5.准备可视化数据

在可视化部分,取时间序列2倍样本,前一部分用于输入模型,产生最后一天的预测值,后一部分用于数据显示,对比真实值与预测值之间差距。

# 测试
nb_predictions = 5
print("visualize {} predictions data:".format(nb_predictions))

preout =[]
X, Y = generate_data(isTrain=False, batch_size=nb_predictions)
print(np.shape(X),np.shape(Y))
for tt in  range(seq_length):
    feed_dict = {encoder_input[t]: X[t+tt] for t in range(seq_length)}
    feed_dict.update({expected_output[t]: Y[t+tt] for t in range(len(expected_output))})
    c =np.concatenate(( [np.zeros_like(Y[0])],Y[tt:seq_length+tt-1]),axis = 0)  #从前15个的最后一个开始预测  

    feed_dict.update({decode_input[t]: c[t] for t in range(len(c))})
    outputs = np.array(sess.run([reshaped_outputs], feed_dict)[0])
    preout.append(outputs[-1])

print(np.shape(preout))#将每个未知预测值收集起来准备显示出来。
preout =np.reshape(preout,[seq_length,nb_predictions,output_dim])

6.图画显示数据

for j in range(nb_predictions):
    plt.figure(figsize=(12, 3))

    for k in range(output_dim):
        past = X[:, j, k]
        expected = Y[seq_length-1:, j, k]#对应预测值的打印

        pred = preout[:, j, k]

        label1 = "past" if k == 0 else "_nolegend_"
        label2 = "future" if k == 0 else "_nolegend_"
        label3 = "Pred" if k == 0 else "_nolegend_"
        plt.plot(range(len(past)), past, "o--b", label=label1)
        plt.plot(range(len(past), len(expected) + len(past)),
                 expected, "x--b", label=label2)
        plt.plot(range(len(past), len(pred) + len(past)),
                 pred, "o--y", label=label3)

    plt.legend(loc='best')
    plt.title("Predictions vs. future")
    plt.show()

7.结果显示

使用seq2seq预测当天的股票价格

                                                                               loss函数图

使用seq2seq预测当天的股票价格

                                                                        股票示例结果图

序列80—120之间的点蓝色的*代表真实值,灰色的点代表预测值,总体预测结果与真实值相差不大

参考书籍:  李金洪老师的 深度学习之tensorflow