In the previous PyNL article we’ve coded up a bunch of different risk metrics and plots.
There is not much use to risk metrics without backtests though so in this article we will work on a more fancy kind of backtester: an event-based backtester.
We will design it in a way where switching from backtesting to trading live is super straightforward and quick.
Table of Content
For-Loop Backtester
Event-Driven Backtester
Events
Data Handler
Portfolio & Risk Manager
Execution Engine
Strategy
Performance and Monitoring
Running a Backtest
Final Remarks
For-Loop Backtester
This is the type of backtester we’ve used in all articles so far.
The general structure is:
for datapoint in data:
do_something_with_datapoint()
buy_or_sell()
You always hear people talking about how amazing event-based backtesters are but for-loop backtesters actually have a couple of advantages as well.
Quick to code:
Building an event-based backtester itself takes a while and then coding up each backtest takes a while as well.
With a for-loop backtest you can much quicker test ideas and decide to throw them out of they aren’t good enough and not waste time moving on to an event-based backtest (Which you should do if the for-loop backtest comes back positive as event-based backtests are MUCH more realistic).Run faster:
Running a for-loop backtest is often a lot quicker than running an event-based one, especially if you are able to vectorize it. This can become important if you are trying to optimize a strategy via grid-search for example.
Event-Driven Backtester
With event-driven backtests we have the following general structure:
while data_vailable:
get_latest_event()
if event.type == market_data:
calculate_signal(event)
elif event.type == signal:
handle_signal(event)
elif event.type == order:
handle_order(event)
elif event.type == fill:
handle_fill(event)
Different parts of the infrastructure are responsible for those different functions.
Here are the different components of the backtester itself:
Event and Event Queue:
We have market_data, signals, orders and fills as events. Those are handled and generated by the other components of the backtester and are put into a FIFO-Event Queue.Data Handler:
The data handler generates market_data events which tell the backtester that we are now looking at a new datapoint and want to do any signal calculations on that new datapoint.Portfolio & Risk Manager:
This is the heart of the event-based manager. It takes signal events from the queue and returns order events. The goal of this component is to take you from current portfolio to desired portfolio. What the desired portfolio is is also decided by this component as this is where portfolio optimizations, optimal leverage calculations etc. happen.Execution Engine:
The execution engine takes order events from the portfolio & risk manager and tries to execute them as efficiently as possible. How you want to execute an order will depend on things like how quickly it needs to be executed, what your fees are etc. Once orders are executed the execution engine will return fill events that go back to the portfolio & risk manager to make it know what our current portfolio is.Strategy:
This is self explanatory. The strategy takes market_data events, grabs the latest data from the data handler and generates signal events based on the strategy logic.Performance & Monitoring:
This component returns performance reports with performance metrics, plots etc. (Which we’ve developed in the last article).
For live systems we also want to monitor live vs. theoretical backtest results and hardware.
Now that we know the different components here is a more detailed general structure for a backtester:
events = Queue()
while True:
if data_available():
data_handler.update_data()
else:
break
while True:
if events.is_empty():
break
event = events.get()
if event.type == "MARKET":
strategy.calculate_signal(event)
portfolio.update_info(event)
elif event.type == "SIGNAL":
portfolio.handle_signal(event)
elif event.type == "ORDER":
executer.execute_order(event)
elif event.type == "FILL":
portfolio.update_cur_portfolio(event)
performance.results(portfolio)
An event-based backtester has the following pros:
Eliminates look-ahead bias (mostly):
Since we are handling backtests very close to how we handle live data without indexing ([i], [i-1], [i+1], etc.) it’s much harder to accidentally introduce look-ahead bias in your backtest.Can backtest entire portfolios:
We can very easily tell the backtester what coins and what strategies we want to consider for the backtest. This gives us a much more realistic idea of what our portfolio level pnl will be in live trading.Proper portfolio management and risk management:
You most likely wouldn’t run a strategy the way you backtest it in a simple for-loop backtest. This once again makes the backtest more realistic and representative of live results.Plug & Play:
The backtester will be object-oriented.
For example if we want to switch out the execution logic in our backtest we simply use a different executer class.
Going from backtest to live should also (almost) be as easy as switching out a backtest executer class with a live executer class.
Events
First let’s code up all the different event classes in a event.py file.
We start with an interface that all events inherit from:
class Event(object):
"""
Interface for all events
"""
pass
Market Event
class MarketEvent(Event):
"""
New market update
"""
def __init__(self):
self.type = "MARKET"
This event doesn’t include any information besides telling the backtester that new data is available.
Signal Event
class SignalEvent(Event):
"""
Signal generated by strategy
"""
def __init__(self, symbol, timestamp, signal_type, strength):
"""
Parameters:
symbol - Ticker symbol
timestamp - Timestamp at which the signal was generated
signal_type - "LONG", "SHORT" or "EXIT"
strength - Strength of the signal
"""
self.type = "SIGNAL"
self.symbol = symbol
self.timestamp = timestamp
self.signal_type = signal_type
self.strength = strength
strength here is a normalized signal strength since our portfolio & risk management system shouldn’t distinguish between different strategies.
It’s basically the suggested leverage from the strategy to be considered by the risk manager.
Order Event
class OrderEvent(Event):
"""
Order to be sent to execution system
"""
def __init__(self, symbol, order_type, quantity, price, direction):
"""
Parameters:
symbol - Ticker symbol
order_type - "MKT" or "LMT"
quantity - Non-negative integer for quantity
price - Quoting price (only relevant if order_type == "LMT")
direction - "LONG" or "SHORT"
"""
self.type = "ORDER"
self.symbol = symbol
self.order_type = order_type
self.quantity = quantity
self.price = price
self.direction = direction
I’m keeping the order event relatively simple here without IOC and stuff like that.
You can keep adding improvements like that over time but in the beginning it’s best to start with something as simple as possible.
Fill Event
class FillEvent(Event):
"""
Filled Order returned by exchange
"""
def __init__(self, exchange, symbol, timestamp, quantity, direction, price, fees):
"""
Parameters:
exchange - Exchange where the order was filled
symbol - Ticker symbol
timestamp - Timestamp at which the order was executed
quantity - Filled quantity
direction - "LONG" or "SHORT"
price - Price at which the order was filled
fees - Fees paid/received
"""
self.type = "FILL"
self.exchange = exchange
self.symbol = symbol
self.timestamp = timestamp
self.quantity = quantity
self.direction = direction
self.price = price
self.fees = fees
When running a strategy live you will get those variables from the exchange. When backtesting you gain them from simulation! You therefore need to make certain assumptions about fill price etc.
Data Handler
Our data handler will live in the datahandler.py file. First we make some imports:
import pandas as pd
from abc import ABCMeta, abstractmethod
from .event import MarketEvent
and create the following abstract class:
class DataHandler(object):
"""
Abstract interface for all data handlers (Live and Historical)
"""
__metaclass__ = ABCMeta
@abstractmethod
def get_latest_data_points(self, symbol, N=1):
"""
Returns the latest N data points from symbol (latest symbol) or fewer if less data points are available
"""
raise NotImplementedError("get_latest_data_points() not implemented!")
@abstractmethod
def update_data_points(self):
"""
Pushes the latest data point to the latest symbol for all symbols in the symbol list
"""
raise NotImplementedError("update_data_points() not implemented!")
I will be implementing a datahandler that reads csv files for OHLCV data.
class HistoricalOHLCVDataHandler(DataHandler):
"""
Reads CSV files for each requested symbol
"""
def __init__(self, events, csv_dir, symbol_list):
"""
Parameters:
events - Event Queue
csv_dir - Directory path to the CSV files
symbol_list - List of symbol ticker strings
"""
self.events = events
self.csv_dir = csv_dir
self.symbol_list = symbol_list
self.symbol_data = {}
self.latest_symbol_data = {}
self.continue_backtest = True
self.open_csv_files()
symbol_data here will contain the pandas dataframes for each symbol while latest_symbol_data always contains the latest datapoint for each symbol.
Now let’s implement the _open_csv_files() function which will load in all the symbols in symbol_list and save the data into pandas dataframes.
def open_csv_files(self):
"""
Opens CSV files and converts them into pandas DataFrame
"""
comb_index = None
for cur_symbol in self.symbol_list:
self.symbol_data[cur_symbol] = pd.read_csv(f"{self.csv_dir}/{cur_symbol}-OHLCV.csv").set_index("timestamp")
self.symbol_data[cur_symbol].index = pd.to_datetime(self.symbol_data[cur_symbol].index)
self.symbol_data[cur_symbol].sort_index(inplace=True)
self.symbol_data[cur_symbol] = self.symbol_data[cur_symbol].loc[~self.symbol_data[cur_symbol].index.duplicated(keep='last')]
if comb_index is None:
comb_index = self.symbol_data[cur_symbol].index
else:
comb_index.union(self.symbol_data[cur_symbol].index)
self.latest_symbol_data[cur_symbol] = []
for cur_symbol in self.symbol_list:
self.symbol_data[cur_symbol] = self.symbol_data[cur_symbol].reindex(
index=comb_index, method="ffill"
)
self.symbol_data[cur_symbol] = self.symbol_data[cur_symbol].iterrows()
And next the _get_new_bar() function which gives us the latest data point (here a bar) as a tuple:
def get_new_bar(self, symbol):
"""
Returns the latest bar from the data feed as a tuple
(symbol, timestamp, open, high, low, close, volume)
"""
for bar in self.symbol_data[symbol]:
yield tuple([symbol, bar[0].timestamp(), bar[1]["open"], bar[1]["high"], bar[1]["low"], bar[1]["close"], bar[1]["volume"]])
We use yield here instead of return to create a generator for the latest data point.
Using those 2 helper functions we can now code up the 2 abstract functions that need to be implemented in every data handler.
def get_latest_data_points(self, symbol, N=1):
try:
bars_list = self.latest_symbol_data[symbol]
except KeyError:
print("{symbol} data not available!")
else:
return bars_list[-N:]
def update_data_points(self):
for cur_symbol in self.symbol_list:
try:
bar = next(self.get_new_bar(cur_symbol))
except StopIteration:
self.continue_backtest = False
else:
if bar is not None:
self.latest_symbol_data[cur_symbol].append(bar)
self.events.put(MarketEvent())
Portfolio & Risk Manager
For this component we create a new python file portfolio.py with the following imports:
import pandas as pd
from abc import ABCMeta, abstractmethod
from .event import OrderEvent
and the following abstract class:
class Portfolio(object):
"""
Handles positions and fills creating new Orders
"""
__metaclass__ = ABCMeta
@abstractmethod
def update_signal(self, event):
"""
Generates new Order from SignalEvent
"""
raise NotImplementedError("update_signal() not implemented!")
@abstractmethod
def update_fill(self, event):
"""
Updates current Portfolio based on FillEvent
"""
raise NotImplementedError("update_fill() not implemented!")
I’ll be implementing a very naive portfolio and risk manager that blindly generates orders for all signals that it gets with size equal to the signal strength.
class NaivePortfolio(Portfolio):
"""
Naive Portfolio Manager that sends orders with constant quantity blindly
without any risk management or position sizing
"""
def __init__(self, bars, events, start_date, initial_capital=100000.0):
"""
Parameters:
bars - DataHandler object with current market data
events - Event Queue object
start_date - Start date of the portfolio
initial_capital - Starting capital in USD
"""
self.bars = bars
self.events = events
self.start_date = start_date
self.initial_capital = initial_capital
self.symbol_list = self.bars.symbol_list
self.all_positions = self.construct_all_positions()
self.current_positions = dict((k, v) for k,v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
self.all_holdings = self.construct_all_holdings()
self.current_holdings = self.construct_current_holdings()
all_positions and all_holdings contain all the historical portfolio positions and holdings (cash, total portfolio value, etc.)
def construct_all_positions(self):
"""
Constructs the positions list using the start_date
to determine when the time index will begin
"""
d = dict((k,v) for k, v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
d["timestamp"] = self.start_date
return [d]
def construct_all_holdings(self):
"""
Constructs the holdings list using the start_date
to determine when the time index will begin
"""
d = dict((k,v) for k, v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
d["timestamp"] = self.start_date
d["cash"] = self.initial_capital
d["fees"] = 0.0
d["total"] = self.initial_capital
return [d]
def construct_current_holdings(self):
"""
Construcs the dictionary which will hold the instantaneous
value of the portfolio across all symbols
"""
d = dict((k,v) for k, v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
d["cash"] = self.initial_capital
d["fees"] = 0.0
d["total"] = self.initial_capital
return d
Now we will implement a function that updates our positions and holdings if a new data point appears:
def update_market(self):
"""
Adds a new record to the positions and holdings matrices for the current market data bar.
Uses MarketEvent from events queue.
"""
bars = {}
for cur_symbol in self.symbol_list:
bars[cur_symbol] = self.bars.get_latest_data_points(cur_symbol, N=1)
# Update positions
dp = dict((k,v) for k, v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
dp["timestamp"] = bars[self.symbol_list[0]][0][1]
for cur_symbol in self.symbol_list:
dp[cur_symbol] = self.current_positions[cur_symbol]
self.all_positions.append(dp)
# Update holdings
dh = dict((k,v) for k, v in [(cur_symbol, 0) for cur_symbol in self.symbol_list])
dh["timestamp"] = bars[self.symbol_list[0]][0][1]
dh["cash"] = self.current_holdings["cash"]
dh["fees"] = self.current_holdings["fees"]
dh["total"] = self.current_holdings["cash"]
for cur_symbol in self.symbol_list:
market_value = self.current_positions[cur_symbol] * bars[cur_symbol][0][5]
dh[cur_symbol] = market_value
dh["total"] += market_value
self.all_holdings.append(dh)
Next we need 2 functions that update our current positions and current holdings if we get back a fill from the execution engine:
def update_positions_from_fill(self, fill):
"""
Updates the positions according to a FillEvent
Parameters:
fill - FillEvent object
"""
fill_dir = 0
if fill.direction == "LONG":
fill_dir = 1
if fill.direction == "SHORT":
fill_dir = -1
self.current_positions[fill.symbol] += fill_dir*fill.quantity
def update_holdings_from_fill(self, fill):
"""
Updates the holdings according to a FillEvent
Parameters:
fill - FillEvent object
"""
fill_dir = 0
if fill.direction == "LONG":
fill_dir = 1
if fill.direction == "SHORT":
fill_dir = -1
fill_price = self.bars.get_latest_data_points(fill.symbol)[0][5]
cost = fill_dir * fill_price * fill.quantity
self.current_holdings[fill.symbol] += cost
self.current_holdings["fees"] += fill.fees
self.current_holdings["cash"] -= cost + fill.fees
self.current_holdings["total"] -= cost + fill.fees
To not type out both functions each time we get a fill we create another function that simply runs both of those functions (one of the 2 abstract classes):
def update_fill(self, event):
if event.type == "FILL":
self.update_positions_from_fill(event)
self.update_holdings_from_fill(event)
Next the naive order creation:
def generate_naive_order(self, signal):
"""
Creates OrderEvent with constant quantity (100) using signal
Parameters:
signal - SignalEvent object
"""
order = None
symbol = signal.symbol
signal_type = signal.signal_type
strength = signal.strength
mkt_quantity = strength
cur_quantity = self.current_positions[symbol]
order_type = "MKT"
if signal_type == "LONG" and cur_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, None, "LONG")
if signal_type == "SHORT" and cur_quantity == 0:
order = OrderEvent(symbol, order_type, mkt_quantity, None, "SHORT")
if signal_type == "EXIT" and cur_quantity > 0:
order = OrderEvent(symbol, order_type, abs(cur_quantity), None, "SHORT")
if signal_type == "EXIT" and cur_quantity < 0:
order = OrderEvent(symbol, order_type, abs(cur_quantity), None, "LONG")
return order
and the last abstract function that takes the order created by generate_naive_order() and puts it in the queue:
def update_signal(self, event):
if event.type == "SIGNAL":
order_event = self.generate_naive_order(event)
self.events.put(order_event)
Execution Engine
Next is the (very simple) execution engine in execution.py:
import datetime
from abc import ABCMeta, abstractmethod
from .event import FillEvent
class ExecutionHandler(object):
"""
Abstract interface for Execution Engines responsible for
executing trades
"""
__metaclass__ = ABCMeta
@abstractmethod
def execute_order(self, event):
"""
Takes and Order event and executes it producing a Fill event
Parameters:
event - Event object with order information
"""
raise NotImplementedError("execute_order() is not implemented!")
Our execution engine will just convert orders into the respective fills without returning a fill price as our portfolio & risk manager already assumes we get filled at the close.
class SimulatedExecutionHandler(ExecutionHandler):
"""
Converts all Order objects into their equivalent Fill objects
without any latency, slippage, etc.
"""
def __init__(self, bars, events):
"""
Parameters:
bars - DataHandler object that provides bar information
events - Queue of Event objects
"""
self.bars = bars
self.events = events
def execute_order(self, event):
"""
Parameters:
events - Event object with order information
"""
if event.type == "ORDER":
fill_event = FillEvent("TestExchange", event.symbol, datetime.datetime.now().timestamp(),
event.quantity, None, event.direction, 0.0003*event.quantity*self.bars.get_latest_data_points(event.symbol)[0][5])
self.events.put(fill_event)
I’ve hard coded 3bps as a fee here.
Strategy
Now for our strategies in strategy.py:
from abc import ABCMeta, abstractmethod
from .event import SignalEvent
class Strategy(object):
"""
Abstract interface for all strategies
"""
__metaclass__ = ABCMeta
@abstractmethod
def calculate_signals(self):
"""
Calculates list of signals
"""
raise NotImplementedError("calculate_signals() not implemented!")
The strategy that we will implement is a simple buy and hold.
It will send a buy signal for each instrument and then never sell.
class BuyAndHoldStrategy(Strategy):
"""
Goes LONG all of the symbols as soon as a bar is received.
"""
def __init__(self, bars, events):
"""
Parameters:
bars - DataHandler object that provides bar information
events - Event Queue object
"""
self.bars = bars
self.symbol_list = self.bars.symbol_list
self.events = events
self.bought = self.calculate_initial_bought()
bought is a dictionary that tells us if we already bought an instrument or not.
calculate_inital_bought() simply sets every value to False.
def calculate_initial_bought(self):
"""
Creates dictionary with symbols as keys with all set to False
"""
bought = {}
for cur_symbol in self.symbol_list:
bought[cur_symbol] = False
return bought
We now implement the abstract class that actually calculates the signal:
def calculate_signals(self, event):
if event.type == "MARKET":
for cur_symbol in self.symbol_list:
bars = self.bars.get_latest_data_points(cur_symbol, N=1)
if bars is not None and bars != []:
if self.bought[cur_symbol] == False:
signal = SignalEvent(bars[0][0], bars[0][1], "LONG", 1.0)
self.events.put(signal)
self.bought[cur_symbol] = True
Performance and Reporting
This is the last component of our simple event-based backtester! It will be in performance.py.
import pandas as pd
from abc import ABCMeta, abstractmethod
from . import metrics_and_plots as mp
metrics_and_plots contains our risk metrics and plots that we’ve coded in the previous PyNL article.
class Performance(object):
"""
Handles performance reports and monitoring
"""
__metaclass__ = ABCMeta
@abstractmethod
def report(self, event):
"""
Returns a performance report
"""
raise NotImplementedError("report() not implemented!")
Our implementation will return every performance metric and plot that we’ve coded up:
class SimplePerformanceReport(Performance):
"""
Returns basic performance statistics and plots
"""
def __init__(self, portfolio):
"""
Parameters:
portfolio - Portfolio and Risk Manager containing the portfolio to be analyzed
"""
self.portfolio = portfolio
self.summary = pd.DataFrame()
def create_equity_curve_dataframe(self):
"""
Creates a pandas DataFrame from the all_holdings list of dictionaries
"""
summary = pd.DataFrame(self.portfolio.all_holdings).set_index("timestamp")
summary["returns"] = summary["total"].pct_change()
summary["equity"] = (1.0+summary["returns"]).cumprod()
self.summary = summary
def summary_statistics(self):
"""
Outputs a bunch of performance statistics for the equity curve
"""
pd.set_option('display.max_columns', None)
print(self.summary)
alpha = 0.05
T = (self.summary.index[-1] - self.summary.index[0])/(60*60*24*365)
equity = self.summary["equity"].values[1:]
sharpe_ratio = mp.metrics.sharpe_ratio(equity, T)
sortino_ratio = mp.metrics.sortino_ratio(equity, T)
calmar_ratio = mp.metrics.calmar_ratio(equity, T)
max_drawdown = mp.metrics.max_drawdown(equity)
omega_ratio = mp.metrics.omega_ratio(equity)
VaR = mp.metrics.VaR(equity, alpha)
CVaR = mp.metrics.CVaR(equity, alpha)
print(f"Sharpe Ratio: {sharpe_ratio}")
print(f"Sortino Ratio: {sortino_ratio}")
print(f"Calmar Ratio: {calmar_ratio}")
print(f"Maximum Drawdown: {max_drawdown}")
print(f"Omega Ratio: {omega_ratio}")
print(f"Value at Risk ({alpha*100}%): {VaR*100}%")
print(f"Conditional Value at Risk ({alpha*100}%): {CVaR*100}%")
mp.plots.plot_drawdown(equity)
mp.plots.plot_return_cdf(equity, show=True)
I’ve made a little change to plot_drawdown and plot_return_cdf:
if show:
plt.show()
Adding this at the end allows me to do multiple plots in the report without having to import matplotlib and doing plt.figure() etc.
And with that our event-based backtester is finally complete! Let’s use it to do an actual backtest now!
Running a Backtest
I’ve already done most of the work in the pseudo-code in the Event-Based Backtester section of the article.
Now we just need to convert the pseudo-code to real code:
import infrastructure
from queue import Queue
symbols = ["BNBUSDT", "BTCUSDT"]
event_queue = Queue()
datahandler = infrastructure.datahandler.HistoricalOHLCVDataHandler(event_queue, "C:/QuantData/OHLCV/Binance", symbols)
portfolio = infrastructure.portfolio.NaivePortfolio(datahandler, event_queue, 1690934400)
executionengine = infrastructure.execution.SimulatedExecutionHandler(datahandler, event_queue)
buyandhold = infrastructure.strategy.BuyAndHoldStrategy(datahandler, event_queue)
performance = infrastructure.performance.SimplePerformanceReport(portfolio)
while True:
if datahandler.continue_backtest == True:
datahandler.update_data_points()
else:
break
while True:
try:
event = event_queue.get(False)
except:
break
else:
if event is not None:
if event.type == "MARKET":
buyandhold.calculate_signals(event)
portfolio.update_market()
elif event.type == "SIGNAL":
portfolio.update_signal(event)
elif event.type == "ORDER":
executionengine.execute_order(event)
elif event.type == "FILL":
portfolio.update_fill(event)
performance.create_equity_curve_dataframe()
performance.summary_statistics()
We get back the following report:
Final Remarks
Just 2 articles later and PyNL is already starting to become a pretty useful library!
You could already use this event-based backtester to run simple backtests.
With this backtester the sky is the limit, you can introduce volatility forecasting to the risk manager, add portfolio optimization techniques, implement execution algorithms like VWAP and TWAP etc.
Implement whatever you need and which will allow you to efficiently backtest strategies!
Hey Vertox is there a GitHub repo I can follow along? Thanks in advance