Update Historical Data to prepare a Complete and Up-to-Date Dataset
To make the best use of our Machine Learning models we want the most complete datasets possible. Historical datasets for training models are in great demand. Binance offers the ability to download and store large amounts of historical crypto data once to use many times later on. Binance has the complete history of every major cryptocurrency, from Ethereum (ETH) or Ripple (XRP) to more recent assets like Dogecoin (DOGE) of Bonk (BONK). How we can best do this is discussed in the post Get Historical Crypto Data from Binance. Before we can use this stored data however, we need to supplement it with the most recent data for the crypto asset in question and then save it locally in a standard format comma separated (.csv) file ready for further processing.
We use a standard format for storing time series data for financial assets. This makes it possible to use the same software for technical analysis (TA) or training machine learning models for both crypto data and stock data.
Binance makes it easy to download the latest 500 data points for a given crypto asset. We work with hourly data, so 500 hours divided by 24 hours in a day makes 20.8333 days. The Python class below, after choosing the crypto asset to handle, first checks if the historical data downloaded earlier is not older than 20 days…if it is, we first need to update the historical data with the Python script mentioned above.
The MakeTotal Class
The code uses the Python requests library to connect directly to a Binance API and several other standaard Python libraries for directory operations and data manipulation. It also uses the graphical Python add on PyQt6 but of course you don’t have to use this, even without editing the code can be used as a standalone script, see below.
Things start with importing utility libraries and the PyQt6 libraries for communicating with the Main Window. The class is initialized with the global settings and the financial asset to process. An initial check is then done to see if for this asset the needed historical data is present and recent enough. If this is the case the work-directory is cleaned up before processing starts.
Recently (May 2025) I turned this class into a worker thread that can also be used from the CL.
Initializing
# Copyright (c) 2024, 2025 Hans De Weme
# Licensed under the MIT License (https://opensource.org/licenses/MIT).
# Class MakeTotal
# Purpose: collecting the most recent (500 data points e.g. hourly data) data for crypto asset from Binance
# and merging this with previously collected historical data for the same crypto asset into a .csv file for further processing
# NB 500 hours = 20,8 days => 20 days is used as treshold in check for recent data available
# Note : Starting 01-01-2025 Binance changed it's timestamp from milliseconds into microseconds, so we added code to detect this and convert the CSV's
#
import requests
import pandas as pd
import os
import re
import shutil
from datetime import datetime
from pathlib import Path
from PyQt6.QtCore import QThread, pyqtSignal # for use in PyQt GUI Application
from PyQt6.QtWidgets import QMessageBox
import warnings
warnings.filterwarnings("ignore")
# class init arguments:
# asset - crypto asset to collect current data from Binance and previuosly stored historical data
# settings - json object used as Python dictionary
class MakeTotal(QThread):
total_successful = pyqtSignal(str) # Signal to indicate asset dumped successfully in Spot directory
progress_signal = pyqtSignal(str) # Signal to communicate progress (string message) back to the main thread
def __init__(self, asset, settings):
super().__init__() # necessary for QObject, needed for pyqtSignal
self.markt = str(asset.upper())+'USDT'
self.settings = settings
self.suc = False
pad = self.settings['spot']
pad = Path(pad)
dir = str(pad.resolve())
self.current_dir = os.getcwd()
self.do_message("Start with market-chosen: "+self.markt)
self.COLUMNS = ['open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore']
self.MARKT = self.markt
self.WORK = dir
self.MONTHS = dir+"\\monthly\\klines\\"+self.MARKT+"\\1h"
self.DAYS = dir+"\\daily\\klines\\"+self.MARKT+"\\1h"
dir_path = Path(self.MONTHS)
if dir_path.is_dir():
exist = True
else:
exist = False
if exist:
dir_path = Path(self.DAYS)
if dir_path.is_dir():
exist = True
else:
exist = False
if exist == False:
QMessageBox.information(None, '* * * NO UP-TO-DATE HISTORICAL DATA * * *', f"First get historical data for : '{self.markt.upper()}!", )
return
else:
self.suc = True
print(dir)
print(self.MONTHS)
print(self.DAYS)
if self.check_recent_spotmarket_files(self.DAYS, self.markt) == False:
QMessageBox.information(None, '* * * NO UP-TO-DATE HISTORICAL DATA * * *', f"First get historical data for : '{self.markt.upper()}!", )
self.suc = False
return
def run(self):
self.init()
# now collect current data from Binance and store in temporary csv file
self.suc = self.collect_data()
# merge current data with historical data
if self.suc == True:
self.merge_data()
# display a message in the GUI and print it on the terminal
def do_message(self, the_message):
self.progress_signal.emit(the_message)
print(the_message)
def init(self):
self.normalized = 0 # counter for .csv datafiles with the timestamp converted
# clear current dir fist: remove csv files
files_in_directory = os.listdir(self.current_dir)
file_list = [(os.path.join(self.current_dir, file)) for file in files_in_directory if file.endswith('.csv')]
if(file_list):
for file in file_list:
os.remove(file)
self.do_message('* * * old files deleted from: '+self.current_dir)
else:
self.do_message('* * * no old files found in: '+self.current_dir)
Processing
As we have seen operations start with checking whether the data in the historical archive is recent enough, e.g. within the range of 20 days from now, so we can fill the gap with the current data consisting of 500 hourly datapoints. If so processing continues with collecting the most recent data. After this all historical data from the archive is merged with the latest data into 1 complete and up-to-data dataset saved as a .csv file for further processing.
Detecting and Converting Timestamps in Microseconds
Starting 01-01-2025 Binance changed it’s timestamp from milliseconds into microseconds, so we added code to detect this and convert the CSV’s. For this purpose we use a dedicated method.
# Opens a CSV file, detects if timestamps are in microseconds, converts to milliseconds if necessary, and overwrites the CSV.
def normalize_csv_timestamp(self, file_path, time_column='open_time'):
# Columns expected (define inside function)
columns = ['open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore']
# Peek first line manually
with open(file_path, 'r') as f:
first_line = f.readline()
# Load correctly
if 'open_time' in first_line:
df = pd.read_csv(file_path)
else:
df = pd.read_csv(file_path, header=None, names=columns)
# Now safe to access open_time
sample_ts = df[time_column].iloc[0]
# Ensure sample is numeric
if isinstance(sample_ts, str):
try:
sample_ts = int(sample_ts)
except ValueError:
raise ValueError(f"Cannot interpret {sample_ts} as a numeric timestamp.")
# Detection logic
if sample_ts > 10**14: # > 100 trillion => microseconds
# print(f"[{file_path}] Detected microseconds, converting to milliseconds...")
# Convert open_time and close_time (if they exist)
for col in [time_column, 'close_time']:
if col in df.columns:
df[col] = (df[col] // 1000) # integer division
df.to_csv(file_path, index=False) # Save back
# print(f"[{file_path}] Normalized and saved.")
self.normalized += 1
PythonWe implement the use of this method in the two loops for copying the monthly and daily .csv datafiles.
# check if historic data available and up-to-date (less than 20 days old)
def check_recent_spotmarket_files(self, directory, spotmarket):
current_date = datetime.now()
# Define a pattern to extract the date from the filename: 'XRPUSDT-1h-2024-09-04.csv' -> Extract '2024-09-04'
pattern = re.compile(rf'{spotmarket}-\d+[a-zA-Z]+-(\d{{4}}-\d{{2}}-\d{{2}})\.csv')
valid_dates = []
for file in Path(directory).iterdir():
if file.is_file():
match = pattern.search(file.name)
if match: # Extract date from filename
file_date_str = match.group(1) # '2024-09-04'
file_date = datetime.strptime(file_date_str, '%Y-%m-%d') # Add the extracted date to the list
valid_dates.append(file_date)
if not valid_dates: # Check if we found any valid files
self.do_message(f"\nNo data files found for the spotmarket: {spotmarket}")
return False
most_recent_date = max(valid_dates) # Find the most recent date
if (current_date - most_recent_date).days < 20: # Check if the most recent date is within 20 calendar days from the current date
self.do_message(f"\nThe most recent file is from {most_recent_date.date()}, which is within 20 days of today.")
return True
else:
self.do_message(f"\nThe most recent file is from {most_recent_date.date()}, which is more than 20 days old.")
return False
def collect_data(self):
# get most recent hourly data from Binance and save in the current dir
tick_interval = '1h'
url = 'https://api.binance.com/api/v3/klines?symbol='+self.MARKT+'&interval='+tick_interval
try:
data = requests.get(url).json()
except:
print('\n* * * Failed to obtain latest data from Binance')
return False
df = pd.DataFrame(data)
now = datetime.now()
d = now.strftime("%d")
m = now.strftime("%m")
j = now.strftime("%Y")
NOW = j+'-'+m+'-'+d
self.current_data = self.MARKT+'-'+NOW+'.csv'
df.to_csv(self.current_data, header=self.COLUMNS, index=False)
self.do_message('* * * current data collected from Binance and saved in dataframe: ')
print(df)
return True
def merge_data(self):
# be sure to clean the workdir
files_in_directory = os.listdir(self.WORK)
file_list = [(os.path.join(self.WORK, file)) for file in files_in_directory if file.endswith('.csv')]
if(file_list):
for file in file_list:
os.remove(file)
self.do_message('* * * old files deleted from: '+self.WORK)
else:
self.do_message('* * * no old files fund in: '+self.WORK)
# 1. move most recent data to cwd
shutil.move(self.current_data, os.path.join(self.WORK, self.current_data))
# 2. copy previously stored monthly data to cwd
tel=0
for file_name in os.listdir(self.MONTHS):
if file_name.endswith('.csv'):
tel+=1
source_file = os.path.join(self.MONTHS, file_name)
self.normalize_csv_timestamp(source_file, 'open_time') # check timestamp, if necessary convert microseconds to milliseconds
destination_file = os.path.join(self.WORK, file_name)
shutil.copy(source_file, destination_file)
self.do_message('* * * number of monthly files copied:'+str(tel))
# 3. copy previously stored daily data to cwd
tel=0
for file_name in os.listdir(self.DAYS):
if file_name.endswith('.csv'):
tel+=1
source_file = os.path.join(self.DAYS, file_name)
self.normalize_csv_timestamp(source_file, 'open_time') # check timestamp, if necessary convert microseconds to milliseconds
destination_file = os.path.join(self.WORK, file_name)
shutil.copy(source_file, destination_file)
self.do_message('* * * number of daily files copied:'+str(tel))
self.do_message('* * * number of files with timestamp converted from microseconds to milliseconds:'+str(self.normalized))
# concat all csv files (1+2+3) in work dir to one total csv
files_in_directory = os.listdir(self.WORK)
file_list = [(os.path.join(self.WORK, file)) for file in files_in_directory if file.endswith('.csv')]
self.do_message('* * * total number of files to process: '+ str(len(file_list)))
# first insert header per csv file
for file in file_list:
csv = pd.read_csv(file)
os.remove(file)
csv.to_csv(file, header=self.COLUMNS, index=False)
# then append all files together
df_append = pd.DataFrame()
df_append = pd.concat([pd.read_csv(file) for file in file_list], ignore_index=True)
df_append = df_append.drop_duplicates()
self.do_message('* * * total number of hourly datapoints resulting: '+ str(len(df_append)))
# save the resulting csv in current work dir
df_append.to_csv(os.path.join(self.current_dir, self.MARKT+'-total.csv'), header=self.COLUMNS, index=False)
# clean workdir
for file in file_list:
os.remove(file)
#done, signal success to Main Window
self.do_message('* * * temporary files delted from: '+self.WORK)
tekst = "* * * "+self.MARKT+"-total.csv is ready for use in: "+self.current_dir
self.total_successful.emit(tekst)
print(tekst)
print('* * * done!')
PythonBonus: Run It Stand-alone
Add this special main method to the end of the script and you can run it conveniently from the CL.
# to run this script stand-alone from the Command Line
if __name__ == "__main__":
import sys
import json
from PyQt6.QtWidgets import QApplication
# Required for any PyQt signal/slot logic to function
app = QApplication(sys.argv)
settings_path = "settings.json"
with open(settings_path, 'r') as f:
settings = json.load(f)
# Create and use analysis class
asset = 'XRP' # Replace this symbol with asset listed on Binance
mt = MakeTotal(asset, settings)
if mt.suc == False:
sys.exit()
else:
mt.finished.connect(app.quit) # Clean exit when the thread finishes
mt.start()
sys.exit(app.exec()) # Starts Qt event loop (and allows signals/threads to function)PythonMakeTotal Class Overview
The MakeTotal class combines the most recent cryptocurrency market data (500 data points, e.g., hourly data) from Binance with previously stored historical data (accumulated daily and monthly), obtained with Bdumper Class, into a single CSV file. This consolidated data is prepared for further processing, such as technical analysis or model training. The class is designed to work within a PyQt6 GUI application and emits a signal when the operation completes successfully.

Purpose
The MakeTotal class:
- Collects the latest 500 hourly data points for a specified cryptocurrency asset from Binance.
- Validates whether historical data is up-to-date (within 20 days).
- Merges the latest and historical data into a single, clean CSV file.
- Ensures the work directory is free of temporary or outdated files.
Prerequisites
Dependencies:
- Libraries: requests, pandas, os, re, shutil, PyQt6, warnings. Install dependencies via pip install if not already available.
- Settings File: A JSON object or Python dictionary that must include a ‘spot’ key specifying the base directory for data storage. Example settings:
{
“spot”: “../data”
}
Features
- Historical Data Validation: Ensures existing data is less than 20 days old before proceeding.
- Signal Emission: Emits a PyQt signal (total_successful) upon completion.
- Automatic Data Management:
- Deletes old temporary files.Consolidates and cleans all relevant data into a single CSV file.
Initialization
Constructor
MakeTotal(asset: str, settings: dict)
- Parameters:
- asset (str): The cryptocurrency asset to process (e.g., “BTC”).
- settings (dict): A JSON object with a ‘spot’ key defining the base directory for storing data.
- Initialization Process:
- Sets up paths for daily, monthly, and working directories.
- Validates whether historical data for the asset is up-to-date.
- If valid, initiates data collection and merging; otherwise, displays an error message.
Workflow
First Step: Validation
- The check_recent_spotmarket_files method ensures that the most recent historical data is no older than 20 days.
- If no up-to-date data is found, the process halts with a user notification.
Second Step: Data Collection
- Downloads the latest 500 hourly data points from Binance.
- Saves the data to a CSV file in the working directory.
Third Step: Historical Data Integration
- Copies historical data (daily and monthly) into the working directory.
- Combines the new and historical data into a single CSV file.
Fourth Step: Cleanup
- Deletes temporary files from the working directory.
- Outputs the consolidated CSV file, ready for further use.
- Emits the
total_successfulsignal upon successful completion.
Methods
- check_recent_spotmarket_files
- Purpose: Validates whether historical data files are available and up-to-date (within 20 days).
- Parameters:directory: The directory containing historical files. spotmarket: The cryptocurrency asset ticker (e.g., “BTCUSDT”).
- Returns: True if data is valid; False otherwise.
- collect_data
- Purpose: Fetches the most recent hourly Kline data for the specified market from Binance and saves it as a temporary CSV file.
- Returns: True if data is valid; False otherwise.
- merge_data()
- Purpose: Combines the most recent data from Binance with previously stored historical data to create a consolidated dataset.
- Steps:
- Deletes old files in the working directory.
- Moves the latest data file to the working directory.
- Copies historical monthly and daily files to the working directory.
- Merges all files into a single consolidated CSV.
- Removes duplicate records.
- Saves the final CSV file (
[MARKET]-totaal.csv) to the current working directory.
Directory Structure
Input Data
- Historical Data:
- Monthly: <base_directory>/monthly/klines/<asset>/1h/
- Daily: <base_directory>/daily/klines/<asset>/1h/
- Latest Data:
- Fetched directly from Binance API.
Output Data
- Consolidated CSV file saved in the current working directory: <asset>-totaal.csv
Error Handling
- Historical Data Missing: If no up-to-date historical data is available, a user notification is displayed.
- Temporary Files Cleanup: Ensures no residual files remain in the working directory after processing.
Example Usage
1. PyQt Integration
from PyQt6.QtCore import QCoreApplication
import sys
if __name__ == “__main__”:
app = QCoreApplication(sys.argv)
settings = {“spot”: “../data”}
total_processor = MakeTotal(“BTC”, settings)
total_processor.total_successful.connect(lambda market: print(f”Data processing complete for: {market}”))
sys.exit(app.exec())
2. Command-Line Execution
if __name__ == “__main__”:
settings = {“spot”: “../data”}
total_processor = MakeTotal(“BTC”, settings)
Logs
The class provides detailed logs for:
- Validation of historical data availability.
- Data collection progress from Binance.
- Cleanup and merging steps.
- Final consolidated file creation.
- Requires an active internet connection for Binance API access.
- Relies on valid directory paths and properly named files for historical data.
Future Enhancements
- Add error handling for network failures during API calls.
- Include support for additional time intervals and data types.