Automate Your Edge: A DIY Crypto Signal Bot with Python and Windows Tools

Abstract

This guide provides a complete walkthrough for building and automating a personal crypto trading signal bot using Python, with Windows Task Scheduler as the execution platform. Tailored for traders who rely on technical indicators like RSI, Moving Averages, and Bollinger Bands, the bot systematically scans the market for buy and sell signals using real-time data from the Binance API. The guide not only covers the bot’s logic and signal strategy in detail, but also demonstrates how to wrap the script in a batch file and configure automated execution on a local Windows machine. Additional extensions such as automated log emailing illustrate how simple components can be orchestrated into a practical, self-running signal system—without the need for expensive cloud services or external infrastructure.

This guide has also been published on LinkedIn

A Signal Bot

Suppose you follow the crypto market and would like to receive signals of opportunities to buy at favorable prices or to sell at the right time. You could of course use services like CoinGecko or CoinMarketCap to send you alerts when certain price-levels are reached for the assets you’re monitoring. If that is all you need, you’re good to go. However you might want more context than just prices. Perhaps a combination of technical indicators like RSI, Moving Averages and others. Especially if you rely on your own preferred set of momentum indicators, it would be useful to have a personal bot that automatically provides exactly the insights information you need.

What is a Bot?

Let’s start with a bit of background. A bot (short for robot) is a software application we design to perform automated tasks –  often over the internet. Bots can execute repetitive tasks much faster and more accurately than humans and they can operate independently, without human intervention.

Building and Running a Bot

In this guide, you’ll learn how to build a Python-based trading signal-bot and run it on your own Windows machine using Task Scheduler. To achieve this, you’ll need three things:

  • The Python script, the core engine that generates signals using live data via the Binance API.
  • A small batch file that runs the Python script.
  • A scheduling tool, to trigger the batch file at regular intervals, in this case, Windows Task Scheduler.

Building the Signal Generating App

We start with the core application that collects, processes, and evaluates real-time market data. Then determines whether an entry, exit, or no signal at all should be generated.

 The SignalBot Class

The script defines a class responsible for managing the signal logic. It begins by specifying a hardcoded list of cryptocurrencies to monitor. Within the init() method, it also sets default values for various operational switches. Such as whether to clean up temporary files, export results to Excel, or the detail frequency of the downloaded candlestick (kline) data. Additionally, it defines thresholds for how many technical indicators must align before issuing a buy or sell signal.

Upon initialization, the bot reads a configuration file from a specified path. This file allows the user to override the default settings. The method read_settings(config_path) is responsible for parsing this configuration and updating the bot’s parameters accordingly.

Here’s an example setup:

Config File

Finally we call the method init_log(). It tests whether a log for today (‘marktcheck_’) already exist. If not -when it’s the first time the bot is run today – it creates a new one. 

When the init is done, we call the run() method, that orchestrates the execution. In this case it hands this task to the check_markt() method, that controls the flow of actions per asset in the watch-list of currencies.

Initiation and Management

Here’s the code for the  initiation and management of the operations of the Signal Bot class (plus some utilities used throughout the class).

# Copyright (c) 2024, 2025 Hans De Weme
# Licensed under the MIT License (https://opensource.org/licenses/MIT).
# Class SignalBot
# Purpose: generating buy/sell signals from current price data downloaded from a Binance API on crypto assets from a watch-list
# for every asset signals are formed based on a combination of technical indicators:
#  - Moving Averages  and Exponentally Weighted Moving Avarage
#  - Bollinger Band, Schaff Trend Cycle (STC) indicator and RSI 
# potential signals are calculated and combined with the entry / exit prices 
# Gaussian Peaks & Valleys are calculated 
# Based on the 'strategic' threshold (e.g. the minimum number of combined positive/negative signals) buy/entry or exit/sell signal are generated 
#
import requests
import pandas as pd
import pandas_ta as ta
from   ta.volatility import BollingerBands
from   ta.trend import STCIndicator
from   scipy.ndimage import gaussian_filter, filters
import numpy as np
import time
import os
from   datetime import datetime
import json
import warnings
warnings.filterwarnings("ignore")

class SignalBot:
    def __init__(self, config_path='bot_config.json'):
        # init globals just to be sure, will be overwriten from the configuration file
        self.WATCH_LIST = ['BONK', 'PEPE', 'WIF']
        self.CLEAN = False
        self.EXCEL = False
        # frequence for the klines downloaded from the Binance API	
        # 1m/3m/5m/15m/30m/1h/2h/4h/6h/8h/12h/1d/3d/1w/1M  
        # 8/25/42/125/250/500/etc. number of hours in 500 rows when this frequency is used        
        self.FREQ = '5m' 
        # threshold min #indicators for entry/exit signals
        self.REC_ENTRY_SIG = 2
        self.REC_EXIT_SIG = 2
        self.read_settings(config_path)
        self.log = self.init_log()
        
    def run(self):
        self.check_markt(self.log)
        self.write_log(self.log, "\n\n")
        self.write_log(self.log, self.time_stamp()+"\tWatchlist ready for this run. End of Processing\n")
        # clean
        if self.CLEAN:
            self.cleanup()
            
    # Check selected markets for trading signals
    def check_markt(self, log):
        for i in range(0, len(self.WATCH_LIST)):
            MARKT = self.WATCH_LIST[i]+'USDT'
            mrkt = f"{MARKT:<10}"
            suc, data = self.download_data(MARKT, self.FREQ)
            if suc == False:
                self.write_log(log, self.time_stamp()+'\t'+mrkt+' '+'no data obtained for this asset from Binance'+"\n")
            else:
                data = self.calc_sma_ema(data)
                data = self.calculate_tis(data)
                data = self.calculate_signals(data)
                data = self.execute_strategy(data)
                lst_pindx, lst_vindx = self.get_gaussian_peaksvallies(data)
                sign, gaus = self.get_last_signals(data, lst_pindx, lst_vindx)
                self.write_log(log, self.time_stamp()+'\t'+mrkt+' '+sign+"\n")
                self.write_log(log, self.time_stamp()+'\t'+mrkt+' '+gaus+"\n")
                if self.EXCEL == True:
                    data.to_excel(MARKT+'_worked.xlsx', engine='openpyxl')
            time.sleep(3)
        
    # Read settings for globals from configuration file
    def read_settings(self, config_path):
        with open(config_path, 'r') as config_file:
            config_data = json.load(config_file)
            self.WATCH_LIST = config_data['list']
            self.CLEAN = config_data['clean']
            self.FREQ  = config_data['freq'] 
            self.EXCEL = config_data['excel']   
            self.REC_ENTRY_SIG = int(config_data['entry'])
            self.REC_EXIT_SIG = int(config_data['exit'])
            
    def init_log(self):
        # check if there is already logfile for today to be processed else create a new one
        current_dir  = os.getcwd()
        files_in_cwd = os.listdir(current_dir)
        file_list    = [(os.path.join(current_dir, file)) for file in files_in_cwd if file.startswith('marktcheck_'+self.today())] 
        if(file_list):
            log  = file_list[0]
            print('logfile found '+log)
            self.write_log(log, "\n")
        else:
            log = 'marktcheck_'+self.today()+'.txt'
            print('logfile created '+log)
            self.write_log(log, " * * * NEW MARKET CHECK * * *\n")
            self.write_log(log, "\n")
            self.write_log(log, self.time_stamp()+"\tprocessing started\n")
            self.write_log(log, "\n\n")
        return log

    # write logfile line
    def write_log(self, logfile, logregel):
        with open(logfile,'at') as f:
            f.write(logregel)
            f.close()
        
    # create timestamp strings
    def time_stamp(self):
        now  = datetime.now() 
        d = now.strftime("%d")
        m = now.strftime("%m")
        y = now.strftime("%Y")
        h = now.strftime("%H")
        n = now.strftime("%M")
        now = y+'-'+m+'-'+d+' '+h+':'+n
        return now

    def today(self):
        now  = datetime.now() 
        d = now.strftime("%d")
        m = now.strftime("%m")
        y = now.strftime("%Y")
        now = y+'-'+m+'-'+d
        return now

    # clean current dir: remove csv files
    def cleanup(self):
        current_dir = os.getcwd()
        files_in_directory = os.listdir(current_dir)
        file_list = [(os.path.join(current_dir, file)) for file in files_in_directory if file.endswith('.csv')]
        if(file_list):
            for file in file_list:
                os.remove(file)
            print('* * * old csv files removed from '+current_dir)
        else:
            print('* * * no old csv files found in '+current_dir)
        file_list = [(os.path.join(current_dir, file)) for file in files_in_directory if file.endswith('.xlsx')]
        if(file_list):
            for file in file_list:
                os.remove(file)
            print('* * * old xlsx files removed from '+current_dir)
        else:
            print('* * * no old xlsx files found in '+current_dir)
Python

Actual operations

The check_markt() method directs the core execution. For each asset in the watch list, the bot begins by downloading the most recent 500 candlestick (kline) entries from the Binance API, using the configured frequency. Binance supports a range of intervals—from 1 minute to 1 month—but for a signal bot focused on short-term trends, only high-frequency intervals like 1, 3, or 5 minutes are truly effective.

For example, using a 5-minute frequency, the 500 most recent klines cover approximately the last 42 hours of trading data—enough to support meaningful signal calculations across all relevant indicators.

For each asset, the code performs the following operations:

  • Download price data from the Binance API and load it into an indexed Pandas DataFrame
  • Calculate Simple and Exponential Moving Averages
  • Compute the Bollinger Bands, Schaff Trend Cycle (STC) indicator, and RSI (z-scores)
  • Identify potential buy/sell signals from each indicator
  • Combine those signals with corresponding entry and exit price levels
  • Apply entry and exit thresholds to filter valid signals
  • Calculate and apply Gaussian Peaks & Valleys as an additional smoothed indicator
  • Output the most recent buy and sell signals based on all processed data

Here’s the code for the heart of the Signal Bot class.

    # Download Binance historical data and store in dataframe - last 500 datapoints
    def download_data(self, market, interval):
        # columns of the klines downloaded from the Binance API
        self.COLMNS = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore']
        print(f'Downloading data for {market}. Interval {interval}.')
        tick_interval = self.FREQ
        url = f'https://api.binance.com/api/v3/klines?symbol={market}&interval={tick_interval}'
        max_retries = 3
        data = None

        for attempt in range(1, max_retries + 1):
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                data = response.json()
                break  # Success, exit retry loop
            except requests.exceptions.HTTPError as errh:
                print(f"[Attempt {attempt}] HTTP Error: {errh}")
            except requests.exceptions.ConnectionError as errc:
                print(f"[Attempt {attempt}] Connection Error: {errc}")
            except requests.exceptions.Timeout as errt:
                print(f"[Attempt {attempt}] Timeout Error: {errt}")
            except requests.exceptions.RequestException as err:
                print(f"[Attempt {attempt}] Request Exception: {err}")
            except json.decoder.JSONDecodeError as errj:
                print(f"[Attempt {attempt}] JSON Decode Error: {errj}")
            if attempt < max_retries:
                time.sleep(2)  # Brief pause before retrying
            else:
                print(f"❌ Failed to fetch data for {market} after {max_retries} attempts.")
                return False, None
        # If data was successfully retrieved
        if data:
            try:
                df = pd.DataFrame(data, columns=self.COLMNS)
                df['dt'] = pd.to_datetime(df['timestamp'], unit='ms')
                df.set_index('dt', inplace=True)
                df['close'] = df['close'].astype(float)
                if self.EXCEL == True: 
                    nu = self.time_stamp().replace(":", "-")
                    filename = f"{market}-{nu}.xlsx"
                    df.to_excel(filename, engine='openpyxl')
                return True, df
            except Exception as e:
                print(f"❌ Error processing data for {market}: {e}")
                return False, None
        return False, None
    
    # MOVING AVARAGES + EXPONENTIALLY WEIGHTED MOVING AVARAGE
    def calc_sma_ema(self, df):
        print('* * * calculating moving averages over complete dataset * * *')   
        df['7_day_MA']   = df['close'].rolling(window=7).mean()
        df['20_day_MA']  = df['close'].rolling(window=20).mean()
        df['50_day_MA']  = df['close'].rolling(window=50).mean()
        df['7_day_EM']   = df['close'].ewm(span=7,  adjust=False).mean()
        df['20_day_EM']  = df['close'].ewm(span=20, adjust=False).mean()
        df['50_day_EM']  = df['close'].ewm(span=50, adjust=False).mean()
        sma7  = df['7_day_MA'].to_numpy()
        sma20 = df['20_day_MA'].to_numpy() 
        sma50 = df['50_day_MA'].to_numpy()
        ema7  = df['7_day_EM'].to_numpy()
        ema20 = df['20_day_EM'].to_numpy()
        ema50 = df['50_day_EM'].to_numpy()
    # check for crossovers SMA en EMA buy/sell signals
        sma = []
        ema = []
        for i in range(len(df)):
            if   sma7[i] > sma20[i] and sma20[i] > sma50[i]:
                sma.append(1)
            elif sma7[i] < sma20[i] and sma20[i] < sma50[i]:
                sma.append(-1)
            else:
                sma.append(np.nan)
        for i in range(len(df)):
            if   ema7[i] > ema20[i] and ema20[i] > ema50[i]:
                ema.append(1)
            elif ema7[i] < ema20[i] and ema20[i] < ema50[i]:
                ema.append(-1)
            else:
                ema.append(np.nan)
        df['SMA'] = sma
        df['EMA'] = ema
        return df

    # Calculate Technical Indicators
    def calculate_tis(self, df):
        print('* * * calculating Bollinger Band, Schaff Trend Cycle (STC) indicator and RSI over full dataset * * *')   
        # The Schaff Trend Cycle (STC) indicator combines cycle analysis and moving averages to identify trends and potential reversals.
        # It oscillates between 0 and 100, with values above 50 indicating a bullish trend and values below 50 indicating a bearish trend
        RSI_WINDOW  = 14
        STD_DEV = 2
        SMA_PERIOD  = 28 
        indicator_bb = BollingerBands(close=df['close'], window=SMA_PERIOD, window_dev=STD_DEV)
        # Add Bollinger Bands features
        df['BB_mid']  = indicator_bb.bollinger_mavg()
        df['BB_high'] = indicator_bb.bollinger_hband()
        df['BB_low']  = indicator_bb.bollinger_lband()  
        df['RSI'] = ta.rsi(df['close'], window=RSI_WINDOW)
        # Calculate z-score of RSI
        df['RSI_mean'] = df['RSI'].rolling(window=50).mean()
        df['RSI_std']  = df['RSI'].rolling(window=50).std()
        df['RSI_z'] = (df['RSI'] - df['RSI_mean']) / df['RSI_std']        
        # The Schaff Trend Cycle (STC) indicator to identify market trends and potential buy or sell signals.
        # window_fast is around 23 periods to captures the shorter-term price trends
        # window_slow is around 50 periods, is 'smoother' trend, less sensitive to price changes
        # cycle bepaalt de sensitiviteit voor markt trends en cycli, default = 10: hogere waarde bij valatiele markt, lager bij zijwaardse beweging
        stc_window_slow = 50
        stc_window_fast = 23
        stc_cycle = 10
        indicator_stc = STCIndicator(close=df['close'], window_slow=stc_window_slow, window_fast=stc_window_fast, cycle=stc_cycle, smooth1=3, smooth2=3)
        # Add features
        df['STC'] = indicator_stc.stc()
        return df

    #  calculate trading signals
    def calculate_signals(self, df):
        print('* * * calculating potential signals * * *')  
        entry_z_thresh = self.rsi_z_up
        df['RSI_entry_ind'] = np.where(np.logical_and(df['RSI_z'] > entry_z_thresh, df['RSI_z'].shift() <= entry_z_thresh), 1, 0)
        exit_z_thresh = self.rsi_z_down
        df['RSI_exit_ind'] = np.where(np.logical_and(df['RSI_z'] < exit_z_thresh, df['RSI_z'].shift() >= exit_z_thresh), 1, 0)
        # use z-score instead of hardcoded thresholds
        #Calculate upper / lower boundary for BB
        close_prices = df['close'].to_numpy()
        max_close    = np.amax(close_prices)
        min_close    = np.amin(close_prices)
        diff_close   = max_close - min_close
        df['BB_low_adj']    = df["BB_low"] + (diff_close * 0.09)
        df['BB_entry_ind']  = np.where((df["close"] <= df["BB_low_adj"]), 1, 0)
        df['BB_high_adj']   = df["BB_high"] - (diff_close * 0.07)
        df['BB_exit_ind']   = np.where((df["close"] >= df["BB_high_adj"]), 1, 0)
        df['STC_entry_ind'] = np.where(np.logical_and(df['STC'] > 73, df['STC'].shift() <= 73), 1, 0)
        df['STC_exit_ind']  = np.where(np.logical_and(df['STC'] < 97, df['STC'].shift() >= 97), 1, 0)
        return df

    def execute_strategy(self, df):
        print('* * * combining potential signals and entry / exit prices * * *')  
        close_prices = df['close'].to_numpy()
        rsi_entry    = df['RSI_entry_ind'].to_numpy()
        rsi_exit     = df['RSI_exit_ind'].to_numpy()  
        bb_entry     = df['BB_entry_ind'].to_numpy()
        bb_exit      = df['BB_exit_ind'].to_numpy()
        stc_entry    = df['STC_entry_ind'].to_numpy()
        stc_exit     = df['STC_exit_ind'].to_numpy()  
        ema          = df['EMA'].to_numpy()
        sma          = df['SMA'].to_numpy()
        required_entry_signals = self.REC_ENTRY_SIG
        required_exit_signals  = self.REC_EXIT_SIG
        entry_prices = []
        exit_prices  = []
        entry_strengths = []
        exit_strengths = []
        
        for i in range(len(close_prices)):
            current_price = close_prices[i]
            num_entry_signals = 0
            num_exit_signals = 0

            # ✅ Evaluate only current row instead of a lookback window
            if rsi_entry[i] == 1:
                num_entry_signals += 1
            if bb_entry[i] == 1:
                num_entry_signals += 1
            if stc_entry[i] == 1:
                num_entry_signals += 1

            if rsi_exit[i] == 1:
                num_exit_signals += 1
            if bb_exit[i] == 1:
                num_exit_signals += 1
            if stc_exit[i] == 1:
                num_exit_signals += 1

            entry_strengths.append(num_entry_signals)
            exit_strengths.append(num_exit_signals)
            # ✅ Require SMA/EMA confirmation only when signal appears on this bar
            if num_entry_signals >= required_entry_signals and ema[i] == 1 and sma[i] == 1: 
                entry_prices.append(current_price)
                exit_prices.append(np.nan)
            elif num_exit_signals >= required_exit_signals and ema[i] == -1 and sma[i] == -1: 
                entry_prices.append(np.nan)
                exit_prices.append(current_price)
            else:
                entry_prices.append(np.nan)
                exit_prices.append(np.nan)

        df['Entry'] = entry_prices
        df['Exit']  = exit_prices
        df['Entry_Strength'] = entry_strengths
        df['Exit_Strength'] = exit_strengths
        return df

    # Gaussian filters
    def std_filtered_gaussian(self, data, sigma, n_poles=1):
        # Apply the n-pole Gaussian filter. The sigma is modified using the std_dev.
        for _ in range(n_poles):
            data = gaussian_filter(data, sigma=sigma)
        return data

    def get_gaussian_peaksvallies(self, df):
        print('* * * calculating Gaussian Peaks & Valleys * * *')
        # Calculate the slope of the filtered data
        filtered_close = self.std_filtered_gaussian(df['close'], sigma=1, n_poles=1)
        slope = filters.convolve1d(filtered_close, [1, 0, -1])
        # Find peaks and valleys
        peak_indices   = np.where((slope[:-1] > 0) & (slope[1:] < 0))[0]
        valley_indices = np.where((slope[:-1] < 0) & (slope[1:] > 0))[0]
        lst_pindx = peak_indices[-1]
        lst_vindx = valley_indices[-1]
        return lst_pindx, lst_vindx

    def get_last_signals(self, df, lst_pindx, lst_vindx):
        print('* * * selecting most recent buy- and sell-signals * * *')  
        sma7  = df['7_day_MA'].to_numpy()
        sma20 = df['20_day_MA'].to_numpy() 
        sma50 = df['50_day_MA'].to_numpy()
        ema7  = df['7_day_EM'].to_numpy()
        ema20 = df['20_day_EM'].to_numpy()
        ema50 = df['50_day_EM'].to_numpy()
        ema   = df['EMA'].to_numpy()
        sma   = df['SMA'].to_numpy()
        rsi   = df['RSI'].to_numpy()
        dtm   = df.index.to_numpy()
        entry_prices = df['Entry']
        exit_prices  = df['Exit']
        # we search for the latest occurence, so do an inverse of all arrays, start at the end and look back 
        ema   = ema[::-1]
        sma   = sma[::-1]
        sma7  = sma7[::-1]
        sma20 = sma20[::-1]
        sma50 = sma50[::-1]
        ema7  = ema7[::-1]
        ema20 = ema20[::-1]
        ema50 = ema50[::-1]
        rsi   = rsi[::-1]
        dtm   = dtm[::-1]
        piek  = (len(df)-1) - lst_pindx
        dal   = (len(df)-1) - lst_vindx
        entry_prices = entry_prices[::-1]
        exit_prices  = exit_prices[::-1]
        buy_txt  = 'No recent buy-signal'
        sell_txt = 'No recent sell-signal'
        b_pdk    = '0000-00-00 00:00'
        s_pdk    = '0000-00-00 00:00'
        for i in range(len(entry_prices)):
            if np.isnan(entry_prices[i]) == False:  # latest entry/buy price found 
                # formatting {value:{align}{width}.{precision}{type}}
                pprice = f"{ entry_prices[i]:<11.8f}"
                prsi   = f"{rsi[i]:<2.0f}"
                pdk    = np.datetime_as_string(dtm[i], unit='s')
                pdkd   = pdk[0:10]
                pdku   = pdk[11:16]
                pdk    = pdkd+' '+pdku
                if pdk != '0000-00-00 00:00':
                    ps7    = f"{sma7[i]:<11.8f}" 
                    ps20   = f"{sma20[i]:<11.8f}" 
                    ps50   = f"{sma50[i]:<11.8f}"           
                    em7    = f"{ema7[i]:<11.8f}" 
                    em20   = f"{ema20[i]:<11.8f}" 
                    em50   = f"{ema50[i]:<11.8f}"           
                    buy_txt = 'Most recent buy signal on:     '+ pdk + "   price: " +pprice+ " RSI = "+prsi+ " SMA7 = "+ps7+ " SMA20 = "+ps20+ " SMA50 = "+ps50+ " EMA7 = "+em7+ " EMA20 = "+em20+ " EMA50 = "+em50
                    strength = df['Entry_Strength'].iloc[::-1][i]
                    buy_txt += f"  Strength = {strength}/3"
                    b_pdk = pdk
                    break # stop searching at 1th hit
        for i in range(len(exit_prices)):
            if np.isnan(exit_prices[i]) == False:  # latest exit/sell price found
                pprice = f"{exit_prices[i]:<11.8f}"
                prsi   = f"{rsi[i]:<2.0f}"
                pdk    = np.datetime_as_string(dtm[i], unit='s')
                pdkd   = pdk[0:10]
                pdku   = pdk[11:16]
                pdk    = pdkd+' '+pdku
                if pdk != '0000-00-00 00:00':
                    ps7    = f"{sma7[i]:<11.8f}" 
                    ps20   = f"{sma20[i]:<11.8f}" 
                    ps50   = f"{sma50[i]:<11.8f}"
                    em7    = f"{ema7[i]:<11.8f}" 
                    em20   = f"{ema20[i]:<11.8f}" 
                    em50   = f"{ema50[i]:<11.8f}"                      
                    sell_txt = 'Most recent sell signal on:    '+ pdk + "   price: " +pprice+ " RSI = "+prsi+ " SMA7 = "+ps7+ " SMA20 = "+ps20+ " SMA50 = "+ps50+ " EMA7 = "+em7+ " EMA20 = "+em20+ " EMA50 = "+em50
                    strength = df['Exit_Strength'].iloc[::-1][i]
                    sell_txt += f"  Strength = {strength}/3"
                    s_pdk = pdk
                    break # stop searching at 1th hit
        datetime_format = '%Y-%m-%d %H:%M'
        # Compare timestamps only if both signals are present
        if b_pdk != '0000-00-00 00:00' and s_pdk != '0000-00-00 00:00':
            datetime_b = datetime.strptime(b_pdk, datetime_format)
            datetime_s = datetime.strptime(s_pdk, datetime_format)
            if datetime_b > datetime_s:
                sign = buy_txt
                pdk = np.datetime_as_string(dtm[dal], unit='s')
                gaus = 'Latest Gaussian Valley on:     ' + pdk[:10] + ' ' + pdk[11:16]
            else:
                sign = sell_txt
                pdk = np.datetime_as_string(dtm[piek], unit='s')
                gaus = 'Latest Gaussian Peak on  :     ' + pdk[:10] + ' ' + pdk[11:16]
        elif b_pdk != '0000-00-00 00:00':
            sign = buy_txt
            pdk = np.datetime_as_string(dtm[dal], unit='s')
            gaus = 'Latest Gaussian Valley on:     ' + pdk[:10] + ' ' + pdk[11:16]
        elif s_pdk != '0000-00-00 00:00':
            sign = sell_txt
            pdk = np.datetime_as_string(dtm[piek], unit='s')
            gaus = 'Latest Gaussian Peak on  :     ' + pdk[:10] + ' ' + pdk[11:16]
        else:
            sign = 'No recent buy or sell signal'
            gaus = 'No peak or valley information available'
        print(sign)
        return sign, gaus
Python

This script is mend to run standalone, all the initialization and orchestration we do inside the class itself, so the method to run it standalone can remain extremely simple.

if __name__ == "__main__":
    sb = SignalBot()
    sb.run()
Python

Do It Yourself Using Windows

To run our signal generating script as a bot, we need a platform that provides scheduling capabilities—a tool that manages tasks and launches programs at predefined times. While options like cloud services (e.g., AWS) or Unix servers are available, a straightforward and cost-free do-it-yourself alternative is to use your local Windows machine.

Batch file to run the bot

All Windows versions include the built-in Task Scheduler utility. To automate your bot with it, you’ll need to create a scheduled task that runs a batch file. This batch file serves as a wrapper that launches your Python script with the required parameters.

We started by preparing the main script—the signal bot in this case—which performs the core trading signal logic. For the batch file to run it we need the full path to the Python interpreter. On most systems, it’s located at a path similar to:  ‘C:\Users\YourName\AppData\local\Programs\Python\Python3nn\pythonw.exe’.

With this, you can create a simple .cmd or .bat file containing the command to launch your signal bot. This batch file will then be referenced in the Task Scheduler when setting up the automated task.

Batch File Wrapper

Configuring the Scheduler

Now that we have both the main bot file that runs the task we want to automate and the batch file ready to start the bot we can create a Scheduled Task in just a few steps:

  • Open Task Scheduler (search “Task Scheduler” in the Start menu).
  • Click “Create Task” (not “Basis Task”) for full control;
  • Under the General Tab, name the task (e.g. “Run Signal Bot every 15 minutes”).
  • Choose “Run whether user is logged on or not” and check “Run with highest privileges”.
  • Under the Triggers Tab click New:
    • set “Begin the task  to “Daily”.set “Start “ time to 00:00.check “Repeat task every and select 15 minutes from the dropdown. Set the “Duration to 7 hours (e.g. until  07:00).
    • Ensure “Enabled” is checked, then click OK.
  • Under the “Action Tab”  click “New”:
    • Choose “Start a program”.
    • Browse to your batch file (e.g. runsignalbot.cmd).
  • click OK  and Finish.
Configure Windows Task Scheduler

To test, right-click the new task and select ‘Run’. Observe if the script executes correctly and check the Task Scheduler history or log files for any errors.

This setup allows you to automate and monitor trading signal generation without relying on external platforms or manual effort. You can extend this setup with additional automation. The next step is emailing the results.

Bonus: Basic Script to Mail the Results

Once the signal-bot script has been executed and running for the desired period of time, we want to know the results. Wouldn’t it be convenient to have the results emailed? This little Python script will do just that. It too can be encapsulated into a small batch file that can be scheduled as a task to be run once after the signal-bot has finished.

# Copyright (c) 2024, 2025 Hans De Weme
# Licensed under the MIT License (https://opensource.org/licenses/MIT).
# Script SendLog 
# Purpose: emailing the latest version of a logfile generated by SignalBot ('marktcheck_') 
#                 for the current date to an email adress read from a config file (bot_config.json) 

import win32com.client as win32
import os
import json
from   datetime import datetime

def today():
    now  = datetime.now() 
    d = now.strftime("%d")
    m = now.strftime("%m")
    y = now.strftime("%Y")
    now = y+'-'+m+'-'+d
    return now

# sendlog via email
# note: Outlook needs to be present, current user account is used as sender 
def send_log(logfile, mail_to):
    try:
        outlook = win32.Dispatch('outlook.application')
    except Exception as e:
        print("❌ Failed to access Outlook COM interface:", e)
        return    
    mail = outlook.CreateItem(0)
    mail.To = mail_to
    mail.Subject = 'Logfile Signalbot'
    mail.Body = 'See Attachment for Logfile.'
    mail.HTMLBody = f"""
    <h2>SignalBot Logfile</h2>
    <p>Run completed at: {datetime.now().strftime('%Y-%m-%d %H:%M')}</p>
    <p>See attached logfile: <strong>{os.path.basename(logfile)}</strong></p>
    """        
    attachment  = logfile
    mail.Attachments.Add(attachment)
    mail.Send()

if __name__ == "__main__":
    # check if there is a logfile for today to be processed 
    current_dir  = os.getcwd()
    files_in_cwd = os.listdir(current_dir)
    file_list    = [(os.path.join(current_dir, file)) for file in files_in_cwd if file.startswith('marktcheck_'+today())] 
    if(file_list):
        config_path='bot_config.json'
        with open(config_path, 'r') as config_file:
            config_data = json.load(config_file)
            mail_to = config_data['mail']
        log_path  = file_list[0]
        send_log(log_path, mail_to)
        print('logfile send: '+log_path)
    else:
        print('no logfile found to send for today')
Python

Here is an example batch file wrapper to have the Windows Task Schedular trigger sending it.

Batch File Wrapper

This is of course a very basic solution, but it actually works! And it illustrates the possibilities of scheduling and using the power of automating other applications.  You can easily elaborate on this functionality and/or the platform it uses. Another extension could be triggering downstream scripts for trade execution.

Related Stories

1 Comment

Leave A Reply

Please enter your comment!
Please enter your name here