Helpers for LSTM models
For predicting time series, such as the daily or hourly price history of financial assets like cryptocurrencies or stock tickers, LSTM (Long Short-Term-Memory) models are probably the most complex tools in the arsenal of deep learning, machine learning, AI technology. To optimize the parameters of such a model, we can rely on specialized tools such as Keras Tuner for LSTM, which we discuss in this article. Next to that we also need a LSTM Sequence Length Helper.
How LSTM models consume Data
Do you know, we can also optimize how we present the data to the model? To clarify this, first a short explanation of how it works. An LSTM model needs to read data structured in a certain way. LSTM models wants data structured into 3 dimensions. A stack size to process, the ‘batch’ , a sequence of ‘predictors’ of a certain length and the number of ‘targets’ to predict.
In predictive models, ‘predictors’ determine the ‘target’. In time series of financial assets this mainly concerns the sequence of past prices, which should predict the future price. A time series generator, which we build here, using a sequence length of for example 3 and 1 target then composes series, batches, of 3 consecutive values, which should always predict the next 4th value: In a very simple strictly linear series like [1, 2, 3, 4, 5, 6, 7] with length=3, this gives the following image:
X (input sequence) y (target)
[1, 2, 3] 4
[2, 3, 4] 5
[3, 4, 5] 6
[4, 5, 6] 7
During training the model thus learns the pattern by which the input sequence should produce the desired output.
A LSTM Sequence Length Helper
There is no clear rule for how we determine the best length of such ‘predictive’ time series. There are guidelines, if there is a cyclical pattern of for example a day, so 24 hours, then we can only do justice to that pattern if the series includes at least 24 data points of an hour. What we can do however, is to simulate the practice of the LSTM model with such knowledge in mind.
We create our own LSTM Sequence Length Helper as follows: We use a minimal model, which we feed with different lengths of predictive sequences of the same data and then compare the accuracy of the predictions to determine the best ‘sequence’ length for this dataset.
Fortunately, there are statistical tools to help us on our way. For example, there is the auto-correlation function (ACF) that can examine a time series for recurring patterns in the succession of values, such as price, over time. This gives us a starting point for an initial sequence length with which we can start the benchmark. This is also the approach of our LSTMSequenceLengthHelper Python class, which we describe below.
LSTM Sequence Length Helper Class
As always, first the necessary libraries are loaded. These are the normal libraries for data handling, plotting, building the model and some statistical tooling but also two classifiers we will use for a special purpose.
# Copyright (c) 2025 Hans De Weme
# Licensed under the MIT License (https://opensource.org/licenses/MIT)
# Class: LSTMSequenceLengthHelper
# Purpose: Next to Keras Tuner that helps in finding the parameters for the Model,
# this class helps in finding the best way to structure the Data for LSTM model training in using
import numpy as np
import pandas as pd
import json
import os
import plotly.graph_objects as go
from statsmodels.tsa.stattools import acf
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import find_peaks
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import EarlyStopping
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from PyQt6.QtCore import QThread, pyqtSignal
PythonInit, orchestration and utilities
The class is connected with the calling Main Window, global settings are read. The run method lays out the plan for processing: analyzing the data ‘raw’ and after differencing. Probing ACF for the start of benchmarking and presenting and saving the results for later use.
class LSTMSequenceLengthHelper(QThread):
progress_signal = pyqtSignal(str) # Signal to communicate progress (string message) back to the main thread
def __init__(self, parent=None, data=None, set=None, asset=None):
super().__init__() # necessary for QObject, needed for pyqtSignal
self.parent = parent
if data is None:
return
else:
self.data = data
self.settings = set
self.asset=asset
self.max_lag = 168 # 7 x 24 = 168 #hours per week
self.alpha = 0.05 # alpha determines the width of the (1 - alpha) confidence interval around the autocorrelation values.
self.data = self.data.dropna()
self.series = self.data['close'].copy()
self.seqlen_param = asset.upper()+'_SL' # param to store in global settings for future use in LSTM model training with this asset
# main method orchstrating the processing
def run(self):
self.do_message(f"Dataset contains {len(self.series)} points.")
self.do_message('Analyzing First Without Differencing')
acf_result = self.analyze(use_diff=False, use_log_return=False, plot=True)
flat_str = ", ".join(f"{k}={v}" for k, v in acf_result.items())
self.do_message(flat_str)
self.do_message('Analyzing Second With Differencing')
acf_result = self.analyze(use_diff=True, use_log_return=False, plot=True)
flat_str = ", ".join(f"{k}={v}" for k, v in acf_result.items())
self.do_message(flat_str)
benchmark = self.benchmark_lstm(lags_to_test=[acf_result['first_insignificant_lag'] * i for i in range(1, 5)])
flat_str = ", ".join(f"{k}={v}" for k, v in benchmark.items())
self.do_message(flat_str)
classifiers = {
'XGB Classifier': self.benchmark_classifier('xgb', lag=acf_result['first_insignificant_lag']),
'RF Classifier': self.benchmark_classifier('rf', lag=acf_result['first_insignificant_lag'])
}
self.best_lag = self.leaderboard(benchmark, classifiers)
self.store_result()
# display a message in the GUI and print it on the terminal
def do_message(self, the_message):
self.progress_signal.emit(the_message)
print(the_message)
def store_result(self):
if self.best_lag is None:
return
self.settings.update({self.seqlen_param:self.best_lag})
directory = self.settings['home']
file = 'settings.json'
file_path = os.path.join(directory, file)
with open(file_path, 'w') as f:
json.dump(self.settings, f, indent=4)
mess = self.seqlen_param+' '+str(self.best_lag)+' '+'added to settings.json'
self.do_message(mess)
PythonACF Analyzing, finding a start for Benchmarking
The analyze() method controls the running of the ACF, analyzing the results with and without differencing and plotting these for insight into data ‘lags’ dependencies.
def _compute_acf(self, series):
acf_vals, confint = acf(series, nlags=self.max_lag, alpha=self.alpha)
return acf_vals, confint
def _analyze_acf(self, acf_vals, confint):
cutoff_lag = next((i for i, (val, ci) in enumerate(zip(acf_vals, confint)) if ci[0] < 0 and ci[1] > 0), self.max_lag)
sig_lags = np.where((confint[:, 0] > 0) | (confint[:, 1] < 0))[0]
longest_sig_lag = max(sig_lags) if len(sig_lags) > 0 else 1
peaks, _ = find_peaks(acf_vals[1:], height=0.1, distance=24)
seasonal_peak = (peaks[0] + 1) if len(peaks) > 0 else None
return cutoff_lag, longest_sig_lag, seasonal_peak
def _plot_acf(self, acf_vals, confint, label):
lags = np.arange(len(acf_vals))
fig = go.Figure()
fig.add_trace(go.Scatter(x=lags, y=acf_vals, mode='markers+lines', name=f'ACF ({label})', line=dict(color='blue'), marker=dict(size=5)))
fig.add_trace(go.Scatter(x=lags, y=confint[:, 0], mode='lines', name='95% CI', line=dict(width=0)))
fig.add_trace(go.Scatter(x=lags, y=confint[:, 1], fill='tonexty', mode='lines', line=dict(width=0), fillcolor='rgba(0, 0, 255, 0.2)', name='95% CI'))
fig.add_trace(go.Scatter(x=[0, self.max_lag], y=[0, 0], mode='lines', line=dict(dash='dash', color='black'), showlegend=False))
fig.update_layout(title=f"Autocorrelation Function (ACF) - {label}", xaxis_title='Lag', yaxis_title='Autocorrelation', template='plotly_white')
fig.show()
def analyze(self, use_diff=False, use_log_return=False, plot=True):
series = self.series.copy()
label = 'Raw'
if use_log_return:
label = 'Log Returns'
series = np.log(series).diff().dropna()
elif use_diff:
label = 'Differenced'
series = series.diff().dropna()
acf_vals, confint = self._compute_acf(series)
cutoff_lag, longest_sig_lag, seasonal_peak = self._analyze_acf(acf_vals, confint)
if plot:
self._plot_acf(acf_vals, confint, label)
return {
'first_insignificant_lag': cutoff_lag,
'max_significant_lag': longest_sig_lag,
'seasonal_peak_lag': seasonal_peak
}
Python

Benchmarking
Armed with the suggested starting point of the ACF, benchmarking different sequence lengths with a basic LSTM model is started.
def prepare_univariate_sequences(self, df, sequence_length=24):
# If it's a Series, convert to DataFrame
if isinstance(df, pd.Series):
df = df.to_frame(name='close')
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df[['close']].values)
X, y = [], []
for i in range(sequence_length, len(scaled_data)):
X.append(scaled_data[i-sequence_length:i])
y.append(scaled_data[i, 0])
return np.array(X), np.array(y), scaler
def benchmark_lstm(self, lags_to_test=[4, 8, 12, 16], epochs=10, verbose=0):
df = self.series.dropna()
sequence_length=max(lags_to_test)
if len(df) < sequence_length + 1:
self.do_message("❌ Not enough data for sequence length {sequence_length}")
return {}
X, y, _ = self.prepare_univariate_sequences(df, sequence_length=max(lags_to_test))
results = {}
i = 0
for seq_len in lags_to_test:
i += 1
progress_mess = str(i)+'th '+ 'Benchmark LSTM with Sequence Length = '+' '+str(seq_len)
self.do_message(progress_mess)
X_seq = np.array([x[-seq_len:] for x in X])
split_idx = int(len(X_seq) * 0.8)
X_train, X_test = X_seq[:split_idx], X_seq[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
model = Sequential([
LSTM(16, input_shape=(seq_len, 1)),
Dense(1)
])
model.compile(optimizer='adam', loss='mse')
try:
model.fit(X_train, y_train, epochs=epochs, verbose=verbose, validation_split=0.1, callbacks=[EarlyStopping(patience=3)])
except Exception as e:
self.do_message(f"❌ LSTM training failed: {e}")
continue
preds = model.predict(X_test).flatten()
mse = mean_squared_error(y_test, preds)
real_direction = np.sign(np.diff(y_test))
pred_direction = np.sign(np.diff(preds))
directional_accuracy = accuracy_score(real_direction, pred_direction)
results[seq_len] = {'mse': mse, 'directional_accuracy': directional_accuracy}
self.do_message(f"[UNI] Lag {seq_len}: MSE = {mse:.5f}, DirAcc = {directional_accuracy:.3f}")
return results
Python
This MSE already suggests that a sequence length of 8 might be a very good candidate. However we test things once again, now with 2 classifiers: Random Forest and Gradient Boost, to proof especially the directional accuracy.
def benchmark_classifier(self, classifier='xgb', lag=6):
df = self.series.dropna()
if isinstance(df, pd.Series):
df = df.to_frame(name='close')
df['target'] = (df['close'].shift(-1) > df['close']).astype(int)
df = df.dropna()
X = []
for i in range(lag, len(df)):
window = df.iloc[i-lag:i].drop(columns='target').values.flatten()
X.append(window)
X = np.array(X)
y = df['target'].iloc[lag:].values
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
if classifier == 'rf':
clf = RandomForestClassifier(n_estimators=100)
else:
clf = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
clf.fit(X_train, y_train)
acc = clf.score(X_test, y_test)
return acc
def leaderboard(self, lstm_results, classifier_results):
leaderboard = []
for lag, metrics in lstm_results.items():
leaderboard.append({
'model': f'LSTM (lag {lag})',
'lag': lag,
'mse': metrics['mse'],
'directional_accuracy': metrics['directional_accuracy']
})
for model, acc in classifier_results.items():
leaderboard.append({
'model': model,
'mse': None,
'directional_accuracy': acc
})
leaderboard.sort(key=lambda x: x['directional_accuracy'], reverse=True)
print("\nModel Leaderboard (sorted by directional accuracy):")
for entry in leaderboard:
print(f"{entry['model']:<20} | DirAcc: {entry['directional_accuracy']:.3f} | MSE: {entry['mse'] if entry['mse'] is not None else '—'}")
# Decide best sequence length from LSTM entries only
lstm_only = [entry for entry in leaderboard if 'LSTM' in entry['model']]
if lstm_only:
best_by_mse = min(lstm_only, key=lambda x: x['mse'])
best_by_da = max(lstm_only, key=lambda x: x['directional_accuracy'])
if best_by_mse['lag'] == best_by_da['lag']:
self.do_message(f"\n✅ Best sequence length is {best_by_mse['lag']} (lowest MSE and highest DA)")
return best_by_mse['lag']
else:
print(f"\n⚖️ Conflict: Lowest MSE at lag {best_by_mse['lag']} vs. Highest DA at lag {best_by_da['lag']}")
combined = min(lstm_only, key=lambda x: x['mse'] * (1 - x['directional_accuracy']))
self.do_message(f"📌 Suggesting lag {combined['lag']} as best tradeoff")
return combined['lag']
else:
self.do_message("\n⚠️ No LSTM models found to evaluate sequence length.")
return None
PythonFinal suggestion
After this last benchmark we evaluate the results and reach a final suggestion, based mainly on the MSE we already saw above, that is saved for later use with this asset.
