工具包导入+数据读取
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import torch.nn as nn
import torch
import matplotlib.pyplot as plt
df = pd.read_csv('/opt/cyc/week_task/xiaoliangyuce/data_qingxi/0804/data_0803_300_new.csv')
df1=df[df['uuid']=='B0F59QC63ZUS']
df2=df1[['sale_list_time','sale_list_rank']]
数据集划分+数据集归一化+滑动窗口构造
#划分数据集
train_data,test_data=df2['sale_list_rank'][:-20],df2['sale_list_rank'][-20:]
#训练集归一化
scaler=MinMaxScaler()
train_data_scale=scaler.fit_transform(train_data.values.reshape(-1,1))
test_data_scale=scaler.fit_transform(test_data.values.reshape(-1,1))
#构造滑动窗口
def slid_window_data(data,window_size):
X,Y=[],[]
for i in range(len(data)-window_size):
X.append(data[i:i+window_size])
Y.append(data[i+window_size:i+window_size+1])
return np.array(X),np.array(Y)
#lstm需要的形状:(样本数,时序长度,特征数)
"""
[1,2,3,4,5]:
滑动窗口为3
[1,2,3][2,3,4]
[4][5]
最后得到两条数据,形状
[2,3,1]
"""
X_train,Y_train=slid_window_data(data=train_data_scale,window_size=20)
lstm模型
class LSTM_MODEL(nn.Module):
def __init__(self, input_size=1, hidden_size=100,num_layers=1):
super(LSTM_MODEL,self).__init__()
self.hidden_size=hidden_size
self.lstm=nn.LSTM(input_size,hidden_size,num_layers, batch_first=True)
self.fc=nn.Linear(hidden_size,1)
def forward(self,x):
out,_=self.lstm(x)
batch_size,seq_len,hidden_size=out.shape
#seq_len:序列长度,在NLP中就是句子长度,一般都会用pad_sequence补齐长度
#batch:每次喂给网络的数据条数,在NLP中就是一次喂给网络多少个句子
#input_size:特征维度,和前面定义网络结构的input_size一致。
x=self.fc(out)
x = x[:,-1,:]
return x
模型训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).to(device)
model=LSTM_MODEL()
model = model.to(device)
criterion=torch.nn.MSELoss()
optimizer=torch.optim.Adam(model.parameters(),lr=0.01)
epoch=100
for epoch in range(epoch):
model.train()
output=model(X_train)
loss=criterion(output,Y_train)
loss.backward()
optimizer.step()
print(f"Epoch [{epoch+1}],LOSS:{loss.item():.4f}")
预测未来值
#制作未来预测数据输入
fur_len=20
train_data_normalized = torch.FloatTensor(train_data_scale).view(-1)
test_input=train_data_normalized[-fur_len:].tolist()
model.eval()
with torch.no_grad():
model.hidden_cell = (
torch.zeros(1, 1, model.hidden_size).to(device),
torch.zeros(1, 1, model.hidden_size).to(device)
)
for i in range(fur_len):
#print(test_input[-fur_len:])
seq = torch.FloatTensor(test_input[-fur_len:])
seq = seq.to(device).unsqueeze(0).unsqueeze(2) # [1, time_step, 1]
test_input.append(model(seq).item())
test_input[fur_len:]
可视化对比
actual_predictions = scaler.inverse_transform(np.array(test_input[fur_len:] ).reshape(-1, 1))
plt.plot(list(range(len(test_data))),test_data,'ro-' )
plt.plot(list(range(len(actual_predictions))),actual_predictions,'bo-' )
plt.legend(["true","pred"])
plt.show()