前一段时间股市有一些大的波动,其实很早就有人说过,股市有风险,投资需谨慎。不过如果能有一个比较适合的模型进行预测就好了。今天我介绍一下,时间序列预测分析。
本人也是深度循环网络的小白,所以也算是与大家共勉了哈。有问题还希望大家多多指正。闲话少说,上代码
首先先读入numpy中的数据准备作为data
test_input_array.py
# coding: utf-8
from __future__ import print_function
import numpy as np
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib.timeseries.python.timeseries import NumpyReader
x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 100) + x / 200. + noise
plt.plot(x, y)
plt.savefig('timeseries_y.jpg')
data = {
tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}
reader = NumpyReader(data)
with tf.Session() as sess:
full_data = reader.read_full()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
print(sess.run(full_data))
coord.request_stop()
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
reader, batch_size=2, window_size=10)
with tf.Session() as sess:
batch_data = train_input_fn.create_batch()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
one_batch = sess.run(batch_data[0])
coord.request_stop()
print('one_batch_data:', one_batch)
读入Excel中的数据,适用于数据较大的情况
test_input_csv.py
# coding: utf-8
from __future__ import print_function
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
csv_file_name = './data/period_trend.csv'
reader = tf.contrib.timeseries.CSVReader(csv_file_name)
with tf.Session() as sess:
data = reader.read_full()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
print(sess.run(data))
coord.request_stop()
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(reader, batch_size=4, window_size=16)
with tf.Session() as sess:
data = train_input_fn.create_batch()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
batch1 = sess.run(data[0])
batch2 = sess.run(data[0])
coord.request_stop()
print('batch1:', batch1)
print('batch2:', batch2)
采用AR自回归模型进行预测
train_array.py
# coding: utf-8
from __future__ import print_function
import numpy as np
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib.timeseries.python.timeseries import NumpyReader
def main(_):
x = np.array(range(1000))
noise = np.random.uniform(-0.2, 0.2, 1000)
y = np.sin(np.pi * x / 100) + x / 200. + noise
plt.plot(x, y)
plt.savefig('timeseries_y.jpg')
data = {
tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}
reader = NumpyReader(data)
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
reader, batch_size=16, window_size=40)
ar = tf.contrib.timeseries.ARRegressor(
periodicities=200, input_window_size=30, output_window_size=10,
num_features=1,
loss=tf.contrib.timeseries.ARModel.NORMAL_LIKELIHOOD_LOSS)
ar.train(input_fn=train_input_fn, steps=6000)
evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
# keys of evaluation: ['covariance', 'loss', 'mean', 'observed', 'start_tuple', 'times', 'global_step']
evaluation = ar.evaluate(input_fn=evaluation_input_fn, steps=1)
(predictions,) = tuple(ar.predict(
input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
evaluation, steps=250)))
plt.figure(figsize=(15, 5))
plt.plot(data['times'].reshape(-1), data['values'].reshape(-1), label='origin')
plt.plot(evaluation['times'].reshape(-1), evaluation['mean'].reshape(-1), label='evaluation')
plt.plot(predictions['times'].reshape(-1), predictions['mean'].reshape(-1), label='prediction')
plt.xlabel('time_step')
plt.ylabel('values')
plt.legend(loc=4)
plt.savefig('predict_result.jpg')
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
以下是AR模型的概念;
AR模型描述在同一样本期间内的n个变量(内生变量)可以作为它们过去值的线性函数。
例1.Yt = α+βXt-1 + ut, t = 1,2,…,n
本例中Y的现期值与X的一期滞后值相联系,比较一般的情况是:
Yt = α+β0Xt +β1Xt-1 +……+βsXt-s + ut,
t = 1,2,…,n
即Y的现期值不仅依赖于X的现期值,而且依赖于X的若干期滞后值。这类模型称为分布滞后模型,因为X变量的影响分布于若干周期。
例2.Yt = α+βYt-1 + ut, t = 1,2,…,n
本例中Y的现期值与它自身的一期滞后值相联系,即依赖于它的过去值。一般情况可能是:
Yt = f (Yt-1, Yt-2, … , X2t, X3t, … )
即Y的现期值依赖于它自身若干期滞后值,还依赖于其它解释变量。
train_lstm.py
采用LSTM网络进行模型的迭代后进行推理预测
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from os import path
import numpy as np
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
from tensorflow.contrib.timeseries.python.timeseries import estimators as ts_estimators
from tensorflow.contrib.timeseries.python.timeseries import model as ts_model
from tensorflow.contrib.timeseries.python.timeseries import NumpyReader
import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt
class _LSTMModel(ts_model.SequentialTimeSeriesModel):
"""A time series model-building example using an RNNCell."""
def __init__(self, num_units, num_features, dtype=np.float32):
"""Initialize/configure the model object.
Note that we do not start graph building here. Rather, this object is a
configurable factory for TensorFlow graphs which are run by an Estimator.
Args:
num_units: The number of units in the model's LSTMCell.
num_features: The dimensionality of the time series (features per
timestep).
dtype: The floating point data type to use.
"""
super(_LSTMModel, self).__init__(
# Pre-register the metrics we'll be outputting (just a mean here).
train_output_names=["mean"],
predict_output_names=["mean"],
num_features=num_features,
dtype=dtype)
self._num_units = num_units
# Filled in by initialize_graph()
self._lstm_cell = None
self._lstm_cell_run = None
self._predict_from_lstm_output = None
def initialize_graph(self, input_statistics):
"""Save templates for components, which can then be used repeatedly.
This method is called every time a new graph is created. It's safe to start
adding ops to the current default graph here, but the graph should be
constructed from scratch.
Args:
input_statistics: A math_utils.InputStatistics object.
"""
super(_LSTMModel, self).initialize_graph(input_statistics=input_statistics)
self._lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=self._num_units)
# Create templates so we don't have to worry about variable reuse.
self._lstm_cell_run = tf.make_template(
name_="lstm_cell",
func_=self._lstm_cell,
create_scope_now_=True)
# Transforms LSTM output into mean predictions.
self._predict_from_lstm_output = tf.make_template(
name_="predict_from_lstm_output",
func_=lambda inputs: tf.layers.dense(inputs=inputs, units=self.num_features),
create_scope_now_=True)
def get_start_state(self):
"""Return initial state for the time series model."""
return (
# Keeps track of the time associated with this state for error checking.
tf.zeros([], dtype=tf.int64),
# The previous observation or prediction.
tf.zeros([self.num_features], dtype=self.dtype),
# The state of the RNNCell (batch dimension removed since this parent
# class will broadcast).
[tf.squeeze(state_element, axis=0)
for state_element
in self._lstm_cell.zero_state(batch_size=1, dtype=self.dtype)])
def _transform(self, data):
"""Normalize data based on input statistics to encourage stable training."""
mean, variance = self._input_statistics.overall_feature_moments
return (data - mean) / variance
def _de_transform(self, data):
"""Transform data back to the input scale."""
mean, variance = self._input_statistics.overall_feature_moments
return data * variance + mean
def _filtering_step(self, current_times, current_values, state, predictions):
"""Update model state based on observations.
Note that we don't do much here aside from computing a loss. In this case
it's easier to update the RNN state in _prediction_step, since that covers
running the RNN both on observations (from this method) and our own
predictions. This distinction can be important for probabilistic models,
where repeatedly predicting without filtering should lead to low-confidence
predictions.
Args:
current_times: A [batch size] integer Tensor.
current_values: A [batch size, self.num_features] floating point Tensor
with new observations.
state: The model's state tuple.
predictions: The output of the previous `_prediction_step`.
Returns:
A tuple of new state and a predictions dictionary updated to include a
loss (note that we could also return other measures of goodness of fit,
although only "loss" will be optimized).
"""
state_from_time, prediction, lstm_state = state
with tf.control_dependencies(
[tf.assert_equal(current_times, state_from_time)]):
transformed_values = self._transform(current_values)
# Use mean squared error across features for the loss.
predictions["loss"] = tf.reduce_mean(
(prediction - transformed_values) ** 2, axis=-1)
# Keep track of the new observation in model state. It won't be run
# through the LSTM until the next _imputation_step.
new_state_tuple = (current_times, transformed_values, lstm_state)
return (new_state_tuple, predictions)
def _prediction_step(self, current_times, state):
"""Advance the RNN state using a previous observation or prediction."""
_, previous_observation_or_prediction, lstm_state = state
lstm_output, new_lstm_state = self._lstm_cell_run(
inputs=previous_observation_or_prediction, state=lstm_state)
next_prediction = self._predict_from_lstm_output(lstm_output)
new_state_tuple = (current_times, next_prediction, new_lstm_state)
return new_state_tuple, {"mean": self._de_transform(next_prediction)}
def _imputation_step(self, current_times, state):
"""Advance model state across a gap."""
# Does not do anything special if we're jumping across a gap. More advanced
# models, especially probabilistic ones, would want a special case that
# depends on the gap size.
return state
def _exogenous_input_step(
self, current_times, current_exogenous_regressors, state):
"""Update model state based on exogenous regressors."""
raise NotImplementedError(
"Exogenous inputs are not implemented for this example.")
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
x = np.array(range(600))
noise = np.random.uniform(-0.2, 0.2, 600)
y = np.sin(np.pi * x / 50 ) + np.cos(np.pi * x / 50) + np.sin(np.pi * x / 25) + noise
data = {
tf.contrib.timeseries.TrainEvalFeatures.TIMES: x,
tf.contrib.timeseries.TrainEvalFeatures.VALUES: y,
}
reader = NumpyReader(data)
train_input_fn = tf.contrib.timeseries.RandomWindowInputFn(
reader, batch_size=4, window_size=100)
estimator = ts_estimators.TimeSeriesRegressor(
model=_LSTMModel(num_features=1, num_units=128),
optimizer=tf.train.AdamOptimizer(0.005))
estimator.train(input_fn=train_input_fn, steps=2000)
evaluation_input_fn = tf.contrib.timeseries.WholeDatasetInputFn(reader)
evaluation = estimator.evaluate(input_fn=evaluation_input_fn, steps=1)
# Predict starting after the evaluation
(predictions,) = tuple(estimator.predict(
input_fn=tf.contrib.timeseries.predict_continuation_input_fn(
evaluation, steps=600)))
observed_times = evaluation["times"][0]
observed = evaluation["observed"][0, :, :]
evaluated_times = evaluation["times"][0]
evaluated = evaluation["mean"][0]
predicted_times = predictions['times']
predicted = predictions["mean"]
plt.figure(figsize=(15, 5))
plt.axvline(599, linestyle="dotted", linewidth=4, color='r')
observed_lines = plt.plot(observed_times, observed, label="observation", color="k")
evaluated_lines = plt.plot(evaluated_times, evaluated, label="evaluation", color="g")
predicted_lines = plt.plot(predicted_times, predicted, label="prediction", color="r")
plt.legend(handles=[observed_lines[0], evaluated_lines[0], predicted_lines[0]],
loc="upper left")
plt.savefig('predict_result.jpg')
最终效果如下:
完整的代码已经开源在本人的GitHub上,可以自己进行下载哈,同时在GitHub上的账号上有基于多重元素的LSTM时间序列预测以及读取文件中的data数据,欢迎star,哈哈
GitHub地址:https://github.com/qianyuqianxun-DeepLearning/LSTM-process