PyTorch Lightning,训练金价走势时序任务LSTM, Timeseries, Clean code
Published on Aug. 22, 2023, 12:10 p.m.
Import dependencies
# Re-loads all imports every time the cell is ran.
%load_ext autoreload
%autoreload 2
from time import time
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:,.5f}'.format
from IPython.display import display
# Sklearn tools
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# Neural Networks
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.loggers.csv_logs import CSVLogger
# Plotting
%matplotlib inline
import matplotlib.pyplot as plt
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
for filename in filenames:
print(os.path.join(dirname, filename))
/kaggle/input/electric-power-consumption-data-set/household_power_consumption.txt
Prediction task
We are going to predict hourly levels of global active power one step ahead.
TimeseriesDataset
class TimeseriesDataset(Dataset):
'''
Custom Dataset subclass.
Serves as input to DataLoader to transform X
into sequence data using rolling window.
DataLoader using this dataset will output batches
of `(batch_size, seq_len, n_features)` shape.
Suitable as an input to RNNs.
'''
def __init__(self, X: np.ndarray, y: np.ndarray, seq_len: int = 1):
self.X = torch.tensor(X).float()
self.y = torch.tensor(y).float()
self.seq_len = seq_len
def __len__(self):
return self.X.__len__() - (self.seq_len-1)
def __getitem__(self, index):
return (self.X[index:index+self.seq_len], self.y[index+self.seq_len-1])
DataModule
class PowerConsumptionDataModule(pl.LightningDataModule):
'''
PyTorch Lighting DataModule subclass:
https://pytorch-lightning.readthedocs.io/en/latest/datamodules.html
Serves the purpose of aggregating all data loading
and processing work in one place.
'''
def __init__(self, seq_len = 1, batch_size = 128, num_workers=0):
super().__init__()
self.seq_len = seq_len
self.batch_size = batch_size
self.num_workers = num_workers
self.X_train = None
self.y_train = None
self.X_val = None
self.y_val = None
self.X_test = None
self.X_test = None
self.columns = None
self.preprocessing = None
def prepare_data(self):
pass
def setup(self, stage=None):
'''
Data is resampled to hourly intervals.
Both 'np.nan' and '?' are converted to 'np.nan'
'Date' and 'Time' columns are merged into 'dt' index
'''
if stage == 'fit' and self.X_train is not None:
return
if stage == 'test' and self.X_test is not None:
return
if stage is None and self.X_train is not None and self.X_test is not None:
return
path = '/kaggle/input/electric-power-consumption-data-set/household_power_consumption.txt'
df = pd.read_csv(
path,
sep=';',
parse_dates={'dt' : ['Date', 'Time']},
infer_datetime_format=True,
low_memory=False,
na_values=['nan','?'],
index_col='dt'
)
df_resample = df.resample('h').mean()
X = df_resample.dropna().copy()
y = X['Global_active_power'].shift(-1).ffill()
self.columns = X.columns
X_cv, X_test, y_cv, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False
)
X_train, X_val, y_train, y_val = train_test_split(
X_cv, y_cv, test_size=0.25, shuffle=False
)
preprocessing = StandardScaler()
preprocessing.fit(X_train)
if stage == 'fit' or stage is None:
self.X_train = preprocessing.transform(X_train)
self.y_train = y_train.values.reshape((-1, 1))
self.X_val = preprocessing.transform(X_val)
self.y_val = y_val.values.reshape((-1, 1))
if stage == 'test' or stage is None:
self.X_test = preprocessing.transform(X_test)
self.y_test = y_test.values.reshape((-1, 1))
def train_dataloader(self):
train_dataset = TimeseriesDataset(self.X_train,
self.y_train,
seq_len=self.seq_len)
train_loader = DataLoader(train_dataset,
batch_size = self.batch_size,
shuffle = False,
num_workers = self.num_workers)
return train_loader
def val_dataloader(self):
val_dataset = TimeseriesDataset(self.X_val,
self.y_val,
seq_len=self.seq_len)
val_loader = DataLoader(val_dataset,
batch_size = self.batch_size,
shuffle = False,
num_workers = self.num_workers)
return val_loader
def test_dataloader(self):
test_dataset = TimeseriesDataset(self.X_test,
self.y_test,
seq_len=self.seq_len)
test_loader = DataLoader(test_dataset,
batch_size = self.batch_size,
shuffle = False,
num_workers = self.num_workers)
return test_loader
Model
Implement LSTM regressor using pytorch-lighting module
class LSTMRegressor(pl.LightningModule):
'''
Standard PyTorch Lightning module:
https://pytorch-lightning.readthedocs.io/en/latest/lightning_module.html
'''
def __init__(self,
n_features,
hidden_size,
seq_len,
batch_size,
num_layers,
dropout,
learning_rate,
criterion):
super(LSTMRegressor, self).__init__()
self.n_features = n_features
self.hidden_size = hidden_size
self.seq_len = seq_len
self.batch_size = batch_size
self.num_layers = num_layers
self.dropout = dropout
self.criterion = criterion
self.learning_rate = learning_rate
self.lstm = nn.LSTM(input_size=n_features,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
batch_first=True)
self.linear = nn.Linear(hidden_size, 1)
def forward(self, x):
# lstm_out = (batch_size, seq_len, hidden_size)
lstm_out, _ = self.lstm(x)
y_pred = self.linear(lstm_out[:,-1])
return y_pred
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.learning_rate)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
result = pl.TrainResult(loss)
result.log('train_loss', loss)
return result
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
result = pl.EvalResult(checkpoint_on=loss)
result.log('val_loss', loss)
return result
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
result = pl.EvalResult()
result.log('test_loss', loss)
return result
Parameters
'''
All parameters are aggregated in one place.
This is useful for reporting experiment params to experiment tracking software
'''
p = dict(
seq_len = 24,
batch_size = 70,
criterion = nn.MSELoss(),
max_epochs = 10,
n_features = 7,
hidden_size = 100,
num_layers = 2,
dropout = 0.2,
learning_rate = 0.001,
)
Train loop
seed_everything(1)
csv_logger = CSVLogger('./', name='lstm', version='0'),
trainer = Trainer(
max_epochs=p['max_epochs'],
logger=csv_logger,
gpus=1,
row_log_interval=1,
progress_bar_refresh_rate=2,
)
model = LSTMRegressor(
n_features = p['n_features'],
hidden_size = p['hidden_size'],
seq_len = p['seq_len'],
batch_size = p['batch_size'],
criterion = p['criterion'],
num_layers = p['num_layers'],
dropout = p['dropout'],
learning_rate = p['learning_rate']
)
dm = PowerConsumptionDataModule(
seq_len = p['seq_len'],
batch_size = p['batch_size']
)
trainer.fit(model, dm)
trainer.test(model, datamodule=dm)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
CUDA_VISIBLE_DEVICES: [0]
| Name | Type | Params
--------------------------------------
0 | criterion | MSELoss | 0
1 | lstm | LSTM | 124 K
2 | linear | Linear | 101
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Training', layout=Layout(flex='2'), max…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validating', layout=Layout(flex='2'), m…
Saving latest checkpoint..
HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': tensor(0.2447, device='cuda:0')}
--------------------------------------------------------------------------------
[{'test_loss': 0.24472205340862274}]
Plot report
metrics = pd.read_csv('./lstm/0/metrics.csv')
train_loss = metrics[['train_loss', 'step', 'epoch']][~np.isnan(metrics['train_loss'])]
val_loss = metrics[['val_loss', 'epoch']][~np.isnan(metrics['val_loss'])]
test_loss = metrics['test_loss'].iloc[-1]
fig, axes = plt.subplots(1, 2, figsize=(16, 5), dpi=100)
axes[0].set_title('Train loss per batch')
axes[0].plot(train_loss['step'], train_loss['train_loss'])
axes[1].set_title('Validation loss per epoch')
axes[1].plot(val_loss['epoch'], val_loss['val_loss'], color='orange')
plt.show(block = True)
print('MSE:')
print(f"Train loss: {train_loss['train_loss'].iloc[-1]:.3f}")
print(f"Val loss: {val_loss['val_loss'].iloc[-1]:.3f}")
print(f'Test loss: {test_loss:.3f}')
MSE:
Train loss: 0.312
Val loss: 0.264
Test loss: 0.245