Day 19: Circular Sample
Want to share your content on python-bloggers? click here.
On Day 18 we started to discuss simulating returns to quantify the probability of success for a strategy out-of-sample. The reason for this was we were unsure whether or how much to merit the 12-by-12’s performance relative to the 200-Day SMA. We discussed various simulation techniques, but we settled on historical sampling that accounts for autocorrelation of returns. We then showed the autocorrelation plots for the strategy and the three benchmarks – buy-and-hold, 60-40, and 200-day – with up to 10 lags. Let’s show that again and discuss.
As shown on the graph above, the correlation coefficients are relatively modest, ranging between -10% and 10%. The significance of these coefficients is also small. The 12-by-12 strategy has two significant lags (1 and 2), buy-and-hold has three (1, 3, and 7), the 60-40 has one (7), and the 200-day (surprisingly!) has none. We suspect these results are partly explained by the duration used – namely, weekly – but won’t delve into that.
Given these results, which block should we use? This is where the fine art of backtesting comes into play. There is no deterministic way to answer this. However, our intuition suggests using buy-and-hold as the guide and the 3 and 7-week lag as the blocks is the way to go. The reason: the strategy will follow the patterns of the underlying so we should do our best to simulate the underlying distribution. Let’s dive in.
As this will actually involve several iterations within the code, we’ll first look at the 200-day SMA vs. buy-and-hold to ground the analysis. After that, we’ll run another iteration with the 12-by-12 strategy. For those interested in the steps of how we do this feel free to look below our code where we outline the algorithm. Let’s move to results.
Our plan is forecast the likelihood a strategy will outperform buy-and-hold over a five year period. We sample three return blocks with replacement and append them together to create a five-year long time series. We repeat 1,000 times. We plot a histogram of the outperformance of the cumulative returns to the strategy vs. buy-and-hold. As one can see, at 13% points of underperformance (the blue dashed line), the 200-day’s mean is below the benchmark equivalent cut-off, where the strategy and benchmark achieve similar results, as shown by the red dashed line.
We also graph the empirical cumulative distribution function plot if that is more to the reader’s taste. The point where the two red lines cross is where the strategy begins to outperform buy-and-hold.
The crossover point is just below 27%. Hence, we can say that the frequency in which the 200-day outperforms buy-and-hold is about a quarter of the time. That doesn’t mean that in the future there’s precisely only a 25% chance the 200-day will outperform. Just that we shouldn’t assume we even have a 50/50 chance to outperform based on historical simulations. The Sharpe ratio fares better, however, with the strategy beating buy-and-hold over 30% of the time. Tomorrow we’ll run the circular block algorithm on the 12-by-12 strategy. Stay tuned.
Code below.
# Load libraries import pandas as pd import numpy as np from datetime import datetime, timedelta import statsmodels.api as sm import matplotlib.pyplot as plt import yfinance as yf from matplotlib.ticker import FuncFormatter from statsmodels.graphics.tsaplots import plot_acf from statsmodels.tsa.stattools import acf from tqdm import tqdm plt.style.use('seaborn-v0_8') plt.rcParams['figure.figsize'] = (14,8) # Function to get data def get_spy_weekly_data() -> pd.DataFrame: df = yf.download('SPY', start='2000-01-01', end='2024-10-01') df.columns = ['open', 'high', 'low', 'close', 'adj close', 'volume'] df.index.name = 'date' # Create training set and downsample to weekly ending Friday df_train = df.loc[:'2019-01-01', 'adj close'].copy() df_w = pd.DataFrame(df_train.resample('W-FRI').last()) df_w.columns = ['price'] return df_w # Get data df_w = get_spy_weekly_data() # Create momentum dictionary periods = [3, 6, 9, 12] momo_dict = {} for back in periods: for forward in periods: df_out = df_w.copy() df_out['ret_back'] = np.log(df_out['price']/df_out['price'].shift(back)) df_out['ret_for'] = np.log(df_out['price'].shift(-forward)/df_out['price']) df_out = df_out.dropna() mod = sm.OLS(df_out['ret_for'], sm.add_constant(df_out['ret_back'])).fit() momo_dict[f"{back} - {forward}"] = {'data': df_out, 'params': mod.params, 'pvalues': mod.pvalues} # Prepare model model_name = '12 - 12' mod_look_forward = 12 train_pd = 5 test_pd = 1 tot_pd = train_pd + test_pd # Create trading dataframe df_trade = momo_dict[model_name]['data'].copy() # Run model with train/forecast steps trade_pred = [] for i in range(tot_pd, len(df_trade)+1, test_pd): train_df = df_trade.iloc[i-tot_pd:i-test_pd, 1:] test_df = df_trade.iloc[i-test_pd+mod_look_forward-1:i-test_pd+mod_look_forward, 1:] # Ensure 'ret_back' is 2D by selecting it as a DataFrame, not a Series X_train = sm.add_constant(train_df[['ret_back']]) if test_df.shape[0] > 1: X_test = sm.add_constant(test_df[['ret_back']]) else: X_test = sm.add_constant(test_df[['ret_back']], has_constant='add') # Fit the model mod_run = sm.OLS(train_df['ret_for'], X_train).fit() # Predict using the test data mod_pred = mod_run.predict(X_test).values trade_pred.extend(mod_pred) # Test for length assert len(trade_pred) + mod_look_forward + train_pd - 1 == len(df_trade) # Add predictions to dataframe df_trade['pred'] = np.concatenate((np.repeat(np.nan, mod_look_forward + train_pd - 1), np.array(trade_pred))) # Generate returns df_trade['ret'] = np.log(df_trade['price']/df_trade['price'].shift(1)) # Generate signals df_trade['signal'] = np.where(df_trade['pred'] == np.nan, np.nan, np.where(df_trade['pred'] > 0, 1, 0)) df_trade['signal_sh'] = np.where(df_trade['pred'] == np.nan, np.nan, np.where(df_trade['pred'] >= 0, 1, -1)) # Generate strategy returns df_trade['strat_ret'] = df_trade['signal'].shift(1) * df_trade['ret'] df_trade['strat_ret_sh'] = df_trade['signal_sh'].shift(1) * df_trade['ret'] # Load data for 60-40 data = yf.download(['SPY', 'IEF'], start='2000-01-01', end='2024-10-01') df = data.loc["2003-01-01":, 'Adj Close'] df.columns.name = None tickers = ['ief', 'spy'] df.index.name = 'date' df.columns = tickers df[['ief_chg', 'spy_chg']] = df[['ief','spy']].apply(lambda x: np.log(x/x.shift(1))) df_bw = pd.DataFrame(df.resample('W-FRI').last()) df_bw[['ief_chg', 'spy_chg']] = df_bw[['ief','spy']].apply(lambda x: np.log(x/x.shift(1))) # Align dates for comparisons end_date_bench = df_trade.index[-1].strftime("%Y-%m-%d") bench_returns = df_bw[['ief_chg', 'spy_chg']].copy() bench_returns = bench_returns.loc[:end_date_bench] bench_returns = bench_returns.dropna() strat_returns_start = bench_returns.index[0].strftime("%Y-%m-%d") strat_returns = df_trade['strat_ret'].copy() strat_returns = strat_returns.loc[strat_returns_start:] # Create function to calculate portfolio performance def calculate_portfolio_performance(weights: list, returns: pd.DataFrame, rebalance=False, frequency='month') -> pd.Series: # Initialize the portfolio value to 0.0 portfolio_value = 0.0 portfolio_values = [] # Initialize the current weights current_weights = np.array(weights) # Create a dictionary to map frequency to the appropriate offset property frequency_map = { 'week': 'week', 'month': 'month', 'quarter': 'quarter' } if rebalance: # Iterate over each row in the returns DataFrame for date, daily_returns in returns.iterrows(): # Apply the current weights to the daily returns portfolio_value = np.dot(current_weights, daily_returns) portfolio_values.append(portfolio_value) # Rebalance at the selected frequency (week, month, quarter) offset = pd.DateOffset(days=1) next_date = date + offset #type: ignore # Dynamically get the attribute based on frequency current_period = getattr(date, frequency_map[frequency]) next_period = getattr(next_date, frequency_map[frequency]) # If current period does not equal next period, time to rebalance if current_period != next_period: current_weights = np.array(weights) else: # Update weights based on the previous day's returns current_weights = current_weights * (1 + daily_returns) current_weights /= np.sum(current_weights) else: # No rebalancing, just apply the initial weights for date, daily_returns in returns.iterrows(): portfolio_value = np.dot(current_weights, daily_returns) portfolio_values.append(portfolio_value) # Update weights based on the previous day's returns current_weights = current_weights * (1 + daily_returns) current_weights /= np.sum(current_weights) daily_returns = pd.Series(portfolio_values, index=returns.index) return daily_returns # Create 60-40 portfolio weights = [0.4,0.6] bench_60_40_rebal = calculate_portfolio_performance(weights, bench_returns, rebalance=False, frequency='quarter') bench_60_40_rebal.index = bench_60_40_rebal.index.tz_localize(None) #type:ignore # Create 200-day strategy df_200 = df_w.copy() df_200.columns = ['price'] df_200 = df_200.resample('W-FRI').last() # 40 weeks = 200 days df_200['sma_200'] = df_200['price'].rolling(40).mean() df_200['ret'] = np.log(df_200['price']/df_200['price'].shift(1)) df_200['signal'] = np.where(df_200['price'] > df_200['sma_200'], 1, 0) df_200['strat_ret'] = df_200['signal'].shift(1)*df_200['ret'] df_200_bench = df_200.loc[df_trade['strat_ret'].index[0]:df_trade['strat_ret'].index[-1]] # Plot ACF return_data = [strat_returns, bench_returns['spy_chg'], bench_60_40_rebal, df_200_bench['strat_ret']] names = ['Strategy', 'Buy-and-Hold', '60-40', '200-day'] fig, axs = plt.subplots(2, 2, sharey=True) for idx, (data, name) in enumerate(zip(return_data, names)): # Calculate the autocorrelation acf_values, confint = acf(data.dropna(), nlags=10, alpha=0.05) #type: ignore lower_bound = confint[1:, 0] - acf_values[1:] upper_bound = confint[1:, 1] - acf_values[1:] # Find the correct subplot (axs[0, 0], axs[0, 1], etc.) ax = axs[idx // 2, idx % 2] ax.stem(range(1, len(acf_values)), acf_values[1:]) # Set x-ticks to match lag numbers ax.set_xticks(range(1, len(acf_values))) ax.set_xticklabels(range(1, len(acf_values))) # Add shaded area for confidence interval ax.fill_between(range(1, len(acf_values)), upper_bound, lower_bound, color='gray', alpha=0.2) ax.set_title(name) if idx > 1: ax.set_xlabel("Lags") if idx % 2 == 0: ax.set_ylabel("Autocorrelation") plt.tight_layout() plt.show() # Circular block data = df_w.apply(lambda x: np.log(x/x.shift(1))).dropna().copy() data = data.loc[:'2019-01-01'] block_size = 3 data_idx = len(data) - 1 # data_len = len(data) data_len = int(52*5) iter = data_len // block_size adder = data_len % block_size block_data = [] # Create simulated returns np.random.seed(42) for _ in tqdm(range(1000), desc='Simulation'): dat = [] for _ in range(iter): start = np.random.choice(data_idx, 1)[0] end = start+block_size out = data.iloc[start:end] if end > data_idx + 1: new_end = end - data_idx - 1 new_out = data.iloc[:new_end] out = pd.concat([out, new_out]) dat.extend(out.values) if adder: new_start = np.random.choice(data_idx, 1)[0] new_end = new_start+adder out_add = data.iloc[new_start:new_end] if new_end > data_idx + 1: s_end = new_end - data_idx - 1 s_end = s_end if s_end < adder else adder - 1 new_out = data.iloc[:s_end] out_add = pd.concat([out_add, new_out]) dat.extend(out_add.values) block_data.append(dat) # create trading strategy for each simulation lst_sim = [] for dt in tqdm(block_data, desc='Simulations'): price_sim = np.exp(np.array(dt)).cumprod()*100 dataf = pd.DataFrame(np.c_[price_sim, np.array(dt)], columns=['price', 'ret']) dataf['sma_200'] = dataf['price'].rolling(40).mean() dataf['signal'] = np.where(dataf['price'] > dataf['sma_200'], 1, 0) dataf['strat_ret'] = dataf['signal'].shift(1) * dataf['ret'] cumul_ret = dataf[['strat_ret', 'ret']].cumsum().iloc[-1].values sharpe = dataf[['strat_ret', 'ret']].apply(lambda x: x.mean()/x.std()*np.sqrt(52)).values lst_sim.append([cumul_ret, sharpe]) # Wrangle data and calculate outperformance flat_data = [np.concatenate(pair) for pair in lst_sim] df_sim = pd.DataFrame(flat_data, columns=['strat_ret', 'ret', 'strat_sharpe', 'ret_sharpe']) df_sim['outperf'] = df_sim['strat_ret'] - df_sim['ret'] # Plot the histogram of outperformance plt.figure() plt.hist(df_sim['outperf'], bins=30, alpha=0.7, edgecolor='black') plt.axvline(0, color='red', linestyle='--', label='Benchmark Equivalent') plt.axvline(df_sim['outperf'].mean(), color='blue', linestyle='--', label='Avg. Outperformance') plt.xlabel('Outperformance (Strategy - Benchmark)') plt.ylabel('Frequency') plt.title('Distribution of Strategy Outperformance Over Simulations') plt.legend() plt.show() # Plot cumulative distribution function # Sort the outperformance values sorted_outperf = np.sort(df_sim['outperf']) # Calculate cumulative probabilities cumul_prob = np.arange(1, len(sorted_outperf) + 1) / len(sorted_outperf) crossover = cumul_prob[(sorted_outperf > -0.001) & (sorted_outperf < 0.001)].mean().round(2) # Plot the CDF plt.figure() plt.plot(sorted_outperf, cumul_prob, label='CDF of Outperformance') plt.axvline(0, color='red', linestyle='--', linewidth=0.8, label='Benchmark Level') plt.axhline(crossover, color='red', linestyle='--', linewidth=0.8, label='Crossover') plt.xlabel('Outperformance (Strategy - Benchmark)') plt.ylabel('Cumulative Probability') plt.title('Cumulative Density Function of Strategy Outperformance Over Simulations') plt.legend() plt.show()
Appendix
Algorithm: Circular Block Resampling with Remainder Adjustment
Given a time series dataset \(D\) of length \(N\), and a specified block size \(B\):
Preprocess Data: Calculate the log returns of each series in \(D\), removing any missing values. Optionally, truncate \(D\) to end at a specified date \(t_{0}\).
Initialize Parameters: Define the number of complete blocks as \(k = \frac{N}{B}\) (using integer division) and the remainder \(r = N\mod B\).
Set Random Seed: Fix a random seed to ensure reproducibility.
Generate Resampled Sequences:
- For each of the \(S\) desired resampled sequences:
- Initialize an empty sequence \(S_i\).
- Full Block Sampling:
- For each of the \(k\) full blocks:
- Randomly select a starting index \(s\) within the bounds of \(D\) \((0 \ to \ N-1)\).
- Define the end of the block as \(e = s + B\).
- Extract the sequence from \(s\) to \(e\), wrapping around if \(e > N\).
- Append this block to \(S_{i}\).
- For each of the \(k\) full blocks:
- Remainder Sampling:
- If \(r > 0\):
- Randomly select a starting index \(s\) for a smaller block of size \(r\).
- Define the end as \(e = s + r\).
- Extract the sequence from \(s\) to \(e\), wrapping around if \(e > N\).
- Append this remainder block to \(S_{i}\).
- If \(r > 0\):
- Store \(S_{i}\) in the final collection of resampled sequences.
- For each of the \(S\) desired resampled sequences:
Output: The result is a set of \(S\) resampled sequences, each containing \(N\) values with block structures retained from the original data.
Want to share your content on python-bloggers? click here.