from tqdm import tqdm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations, product
from joblib import Parallel, delayed
import statsmodels.api as sm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
3 알고리즘 거래전략 기말과제
20249132 김형환
# 기존 데이터를 R로 가공하여 2022~2024의 snp500구성종목 데이터만 남기고, long form으로 변환
= pd.read_csv("data/snp500_return.csv")
snp500_return 'date'] = pd.to_datetime(snp500_return['date'])
snp500_return[ snp500_return.head()
date | permno | ticker | comnam | return | |
---|---|---|---|---|---|
0 | 2022-01-03 | 10104 | ORCL | ORACLE CORP | 0.007912 |
1 | 2022-01-03 | 10107 | MSFT | MICROSOFT CORP | -0.004668 |
2 | 2022-01-03 | 10138 | TROW | T ROWE PRICE GROUP INC | -0.010476 |
3 | 2022-01-03 | 10145 | HON | HONEYWELL INTERNATIONAL INC | -0.008201 |
4 | 2022-01-03 | 10516 | ADM | ARCHER DANIELS MIDLAND CO | 0.004439 |
# 공적분 관계 검정을 위해 wide form 생성, 추후 전략 성능평가를 위해 24년 1년간은 테스트 목적으로 분할
= snp500_return[snp500_return["date"] < "2024-01-01"]
snp500_return_train = snp500_return[snp500_return["date"] >= "2024-01-01"]
snp500_return_test
= snp500_return_train.pivot(index='date', columns='permno', values='return').dropna(axis=1)
snp500_return_wide = snp500_return_test.pivot(index='date', columns='permno', values='return').dropna(axis=1)
snp500_return_wide_test
= (1 + snp500_return_wide).cumprod()
snp500_prices_wide = (1 + snp500_return_wide_test).cumprod()
snp500_prices_wide_test
print(snp500_return_wide.info())
print(snp500_return_wide_test.info())
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 501 entries, 2022-01-03 to 2023-12-29
Columns: 456 entries, 10104 to 93436
dtypes: float64(456)
memory usage: 1.7 MB
None
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 252 entries, 2024-01-02 to 2024-12-31
Columns: 456 entries, 10104 to 93436
dtypes: float64(456)
memory usage: 899.7 KB
None
# 공적분 검정 이전 계산량 효율화를 위한 사전 스크리닝
# 1. 단일 ADF검정 활용
= 0.05
alpha
="ADF Test")
tqdm.pandas(desc= snp500_prices_wide.progress_apply(lambda ts: adfuller(ts)[1])
pvals = pvals[pvals >= alpha].index.tolist()
non_sta = snp500_prices_wide[non_sta]
filtered_prices_wide
print(f"Number of total stocks : {len(pvals)}")
print(f"Number of non-stationary stocks(p<{alpha}) : {len(non_sta)}")
ADF Test: 100%|██████████| 456/456 [00:03<00:00, 130.26it/s]
Number of total stocks : 456
Number of non-stationary stocks(p<0.05) : 370
# 공적분 검정 이전 계산량 효율화를 위한 사전 스크리닝
# 2. 상관계수 활용
= 0.3
corr_threshold
= filtered_prices_wide.corr().abs()
corr = [
pairs
(i, j)for i, j in combinations(filtered_prices_wide.columns, 2)
if corr.loc[i, j] > corr_threshold]
print(f"Number of whole pairs : {len(list(combinations(filtered_prices_wide.columns, 2)))}")
print(f"Number of correlated(>{corr_threshold}) pairs : {len(pairs)}")
Number of whole pairs : 68265
Number of correlated(>0.3) pairs : 43997
# Engle-Granger 테스트를 통해 공적분 관계 검정정
= 0.01
alpha_coint = 50
min_obs
def test_pair(pair):
= pair
t1, t2 = filtered_prices_wide[[t1, t2]].dropna()
df if len(df) < min_obs:
return None
= df[t1]
y = sm.add_constant(df[t2])
x = sm.OLS(y, x).fit()
model = model.resid
resid try:
= adfuller(resid, autolag=None)[1]
pval if pval < alpha_coint:
return {"permno_1": t1, "permno_2": t2, "pvalue": pval}
except:
return None
return None
= Parallel(n_jobs=-1, verbose=5)(
results for pair in pairs
delayed(test_pair)(pair)
)
= [r for r in results if r is not None]
results = pd.DataFrame(results) coint_pairs
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done 40 tasks | elapsed: 4.4s
[Parallel(n_jobs=-1)]: Done 168 tasks | elapsed: 4.7s
[Parallel(n_jobs=-1)]: Done 2080 tasks | elapsed: 6.0s
[Parallel(n_jobs=-1)]: Done 7264 tasks | elapsed: 9.3s
[Parallel(n_jobs=-1)]: Done 13600 tasks | elapsed: 13.6s
[Parallel(n_jobs=-1)]: Done 21088 tasks | elapsed: 17.4s
[Parallel(n_jobs=-1)]: Done 29728 tasks | elapsed: 22.1s
[Parallel(n_jobs=-1)]: Done 39520 tasks | elapsed: 27.4s
[Parallel(n_jobs=-1)]: Done 43966 out of 43997 | elapsed: 30.4s remaining: 0.0s
[Parallel(n_jobs=-1)]: Done 43997 out of 43997 | elapsed: 30.5s finished
print(f"Number of cointegrated(significant level={alpha_coint:.0%}) pairs : {len(coint_pairs)}")
Number of cointegrated(significant level=1%) pairs : 2565
# 상위 5개 공적분 페어 추출
= snp500_return.drop_duplicates("permno").set_index("permno")["comnam"].to_dict()
comnam_map = coint_pairs.sort_values("pvalue").head(5).copy()
top5_pairs "comnam_1"], top5_pairs["comnam_2"] = top5_pairs["permno_1"].map(comnam_map), top5_pairs["permno_2"].map(comnam_map)
top5_pairs[
top5_pairs
permno_1 | permno_2 | pvalue | comnam_1 | comnam_2 | |
---|---|---|---|---|---|
320 | 11850 | 93246 | 9.981788e-07 | EXXON MOBIL CORP | GENERAC HOLDINGS INC |
2308 | 76605 | 77730 | 1.170548e-05 | AUTOZONE INC | TYSON FOODS INC |
376 | 13407 | 86580 | 1.595093e-05 | META PLATFORMS INC | NVIDIA CORP |
1419 | 24643 | 52708 | 1.624042e-05 | HOWMET AEROSPACE INC | LENNAR CORP |
2495 | 87137 | 90215 | 2.325270e-05 | DEVON ENERGY CORP NEW | SALESFORCE INC |
# Pair들에 대한 반감기, Historical Mean Crossing, 헤지비율, Hurst 지수, Pearson Corr 계산
= list(coint_pairs[["permno_1","permno_2","pvalue"]].itertuples(index=False))
pair_list = len(pair_list)
total_pairs
= []
records for permno_1, permno_2, pval in tqdm(pair_list, total=total_pairs, desc="Compute metrics"):
= pd.concat([filtered_prices_wide[permno_1], filtered_prices_wide[permno_2]], axis=1, join='inner').dropna()
pair if len(pair) < 252:
continue2= pair.iloc[:,0], pair.iloc[:,1]
y1, y2
# OLS로 hedge ratio β 추정
= sm.add_constant(y2.values)
X = sm.OLS(y1.values, X).fit()
res = res.params[1]
beta
= y1 - beta * y2
spread
# Hurst exponent
= np.array([2,5,10,20,50])
lags = np.array([np.std(spread.diff(l).dropna()) for l in lags])
tau = np.polyfit(np.log(lags), np.log(tau), 1)[0]
H
# Half-life
= spread.shift(1).dropna()
spread_lag = spread.diff().dropna().loc[spread_lag.index]
spread_ret = sm.OLS(spread_ret.values, sm.add_constant(spread_lag.values)).fit()
reg = reg.params[1]
b_coef = b_coef + 1
phi_ar = -np.log(2) / np.log(phi_ar)
half_life
# Historical mean crossings per year
= spread.mean()
m = spread - m
centered = np.sign(centered)
signs = np.sum(signs.diff().fillna(0) != 0)
crossings = (spread.index[-1] - spread.index[0]).days / 365.25
years = crossings / years
mean_cross_year
# Pearson Corr
= snp500_return_wide.loc[:, [permno_1, permno_2]].dropna(how="any")
ret_pair = ret_pair.corr().iloc[0,1] if len(ret_pair) > 1 else np.nan
pearson_corr
records.append({"permno_1": permno_1,
"permno_2": permno_2,
"pvalue": pval,
"hedge_ratio": beta,
"hurst": H,
"half_life": half_life,
"mean_cross_year": mean_cross_year,
"pearson_corr": pearson_corr
})
= pd.DataFrame(records)
metrics_df_1pct "comnam_1"] = metrics_df_1pct["permno_1"].map(comnam_map)
metrics_df_1pct["comnam_2"] = metrics_df_1pct["permno_2"].map(comnam_map) metrics_df_1pct[
Compute metrics: 100%|██████████| 2565/2565 [00:23<00:00, 111.44it/s]
# Hurst,Half-life,Mean-crossings,pearson_corr 필터링
= metrics_df_1pct[
filtered_pairs "hurst"] < 0.5) &
(metrics_df_1pct["half_life"] > 1) &
(metrics_df_1pct["half_life"] < 30) &
(metrics_df_1pct["mean_cross_year"] > 5) &
(metrics_df_1pct["pearson_corr"] > 0.5)
(metrics_df_1pct[
].copy()
# 점수 계산 및 최우수 페어 추출
"score"] = (
filtered_pairs[-filtered_pairs["pvalue"].rank() +
-filtered_pairs["half_life"].rank() +
"mean_cross_year"].rank() +
filtered_pairs["pearson_corr"].rank()
filtered_pairs[
)
= filtered_pairs.sort_values("score", ascending=False).iloc[0:5]
best5_pairs
print(f"Number of Filtered pairs : {len(filtered_pairs)}")
best5_pairs
Number of Filtered pairs : 322
permno_1 | permno_2 | pvalue | hedge_ratio | hurst | half_life | mean_cross_year | pearson_corr | comnam_1 | comnam_2 | score | |
---|---|---|---|---|---|---|---|---|---|---|---|
853 | 19286 | 78987 | 0.000158 | 0.680322 | 0.314507 | 7.191855 | 32.242759 | 0.605119 | OTIS WORLDWIDE CORP | MICROCHIP TECHNOLOGY INC | 557.5 |
340 | 12084 | 48486 | 0.001111 | 0.596882 | 0.360651 | 10.127506 | 29.220000 | 0.792955 | N X P SEMICONDUCTORS N V | LAM RESH CORP | 543.5 |
2107 | 58819 | 64936 | 0.000495 | 0.300429 | 0.402952 | 12.482854 | 29.220000 | 0.761218 | ALLIANT ENERGY CORP | DOMINION ENERGY INC | 519.5 |
1178 | 23931 | 24109 | 0.000785 | 0.848911 | 0.393015 | 12.980947 | 29.220000 | 0.886921 | X C E L ENERGY INC | AMERICAN ELECTRIC POWER CO INC | 509.5 |
848 | 19286 | 60871 | 0.001356 | 0.737086 | 0.314549 | 8.868845 | 29.723793 | 0.621187 | OTIS WORLDWIDE CORP | ANALOG DEVICES INC | 509.0 |
# 최적 쌍을 이용하여 전략 구성. 성과가 가장 좋은 진입청산기준을 선정해서 test데이터로 평가 실시
= best5_pairs.iloc[0]
pair = pair.permno_1, pair.permno_2
p1, p2 = pair.hedge_ratio
hedge_ratio = pair.comnam_1, pair.comnam_2
comnam1, comnam2
# 가격 데이터
= snp500_prices_wide[[p1, p2]].dropna()
price_is = [comnam1, comnam2]
price_is.columns
# 파라미터 후보
= [0.5 ,1.0, 1.5, 2.0, 3.0]
entry_z_list = [0.0, 0.1, 0.3, 0.5]
exit_z_list = 0.0003 # 거래비용 3bp
transaction_cost
= []
results
for entry_z, exit_z in product(entry_z_list, exit_z_list):
= price_is[comnam1] - hedge_ratio * price_is[comnam2]
spread = spread.mean(), spread.std()
mu, sigma = (spread - mu) / sigma
z
= z < -entry_z
long_entry = z > entry_z
short_entry = z.abs() < exit_z
exit_signal
# 포지션 설정
= np.where(long_entry, 1,
pos -1,
np.where(short_entry, 0, np.nan)))
np.where(exit_signal, = pd.Series(pos, index=z.index).ffill().fillna(0)
pos
# 수익률 계산
= price_is.pct_change().dropna()
ret = pos.shift() * (ret[comnam1] - hedge_ratio * ret[comnam2])
strat_ret
# 거래비용 반영 (포지션 변경 시 비용 발생)
= pos.diff().abs()
pos_chg = pos_chg.shift().fillna(0) * transaction_cost
cost = strat_ret - cost
strat_ret_net
= (1 + strat_ret_net).cumprod()
cum_ret = strat_ret_net.mean() / strat_ret_net.std() * np.sqrt(252)
sharpe = (cum_ret / cum_ret.cummax() - 1).min()
mdd
results.append({"entry_z": entry_z,
"exit_z": exit_z,
"final_return": cum_ret.iloc[-1],
"sharpe": sharpe,
"mdd": mdd
})
# 결과 정리
= pd.DataFrame(results).sort_values(by="sharpe", ascending=False).reset_index(drop=True)
opt_results display(opt_results)
entry_z | exit_z | final_return | sharpe | mdd | |
---|---|---|---|---|---|
0 | 1.5 | 0.5 | 2.015736 | 3.152895 | -0.044008 |
1 | 1.5 | 0.3 | 2.098776 | 3.035545 | -0.056847 |
2 | 0.5 | 0.5 | 2.709583 | 2.929092 | -0.088697 |
3 | 1.0 | 0.3 | 2.527457 | 2.925949 | -0.070058 |
4 | 1.5 | 0.1 | 2.165035 | 2.904456 | -0.068544 |
5 | 1.0 | 0.5 | 2.366084 | 2.890452 | -0.070058 |
6 | 0.5 | 0.3 | 2.716753 | 2.780591 | -0.088697 |
7 | 0.5 | 0.1 | 2.743286 | 2.626813 | -0.088697 |
8 | 1.0 | 0.0 | 2.926148 | 2.523193 | -0.093610 |
9 | 1.0 | 0.1 | 2.314327 | 2.489392 | -0.076772 |
10 | 0.5 | 0.0 | 2.780223 | 2.405471 | -0.093610 |
11 | 1.5 | 0.0 | 2.411925 | 2.112670 | -0.102202 |
12 | 2.0 | 0.5 | 1.262682 | 1.875213 | -0.038795 |
13 | 2.0 | 0.3 | 1.278799 | 1.811135 | -0.038795 |
14 | 3.0 | 0.5 | 1.099983 | 1.519723 | -0.031609 |
15 | 3.0 | 0.1 | 1.114023 | 1.294801 | -0.031609 |
16 | 3.0 | 0.3 | 1.114023 | 1.294801 | -0.031609 |
17 | 2.0 | 0.1 | 1.288337 | 1.231710 | -0.101385 |
18 | 2.0 | 0.0 | 1.496786 | 1.036650 | -0.139779 |
19 | 3.0 | 0.0 | 1.121741 | 0.578127 | -0.080870 |
# 최적 entry_z, exit_z
= opt_results.iloc[0]["entry_z"]
entry_z_opt = opt_results.iloc[0]["exit_z"]
exit_z_opt
= [0.5 ,1.0, 1.5, 2.0, 3.0]
entry_z_list = [0.0, 0.1, 0.3, 0.5]
exit_z_list
= price_is[comnam1] - hedge_ratio * price_is[comnam2]
spread = spread.mean(), spread.std()
mu, sigma = (spread - mu) / sigma
z
= plt.subplots(3, 1, figsize=(14, 10), sharex=True)
fig, axs
# (1) 가격 경로
0].plot(price_is[comnam1], label=comnam1)
axs[0].plot(price_is[comnam2], label=comnam2)
axs[0].set_ylabel("Price")
axs[0].legend()
axs[0].set_title("Price Path (2022~2023)")
axs[
# (2) 스프레드 + 기준선
1].plot(spread, label="Spread")
axs[1].axhline(mu, color="black", linestyle="--", label="Mean")
axs[
for ez in entry_z_list:
= 1.0 if ez == entry_z_opt else 0.1
alpha 1].axhline(mu + sigma * ez, color="red", linestyle="--", alpha=alpha, label=f"+{ez}" if alpha==1.0 else None)
axs[1].axhline(mu - sigma * ez, color="red", linestyle="--", alpha=alpha, label=f"-{ez}" if alpha==1.0 else None)
axs[
for ez in exit_z_list:
= 1.0 if ez == exit_z_opt else 0.1
alpha 1].axhline(mu + sigma * ez, color="green", linestyle="--", alpha=alpha, label=f"+{ez}" if alpha==1.0 else None)
axs[1].axhline(mu - sigma * ez, color="green", linestyle="--", alpha=alpha, label=f"-{ez}" if alpha==1.0 else None)
axs[
1].set_ylabel("Spread")
axs[1].set_title("Spread with All Entry/Exit Thresholds")
axs[
# (3) z-score + 기준선
2].plot(z, label="Z-score")
axs[2].axhline(0, color="black", linestyle="--")
axs[
for ez in entry_z_list:
= 1.0 if ez == entry_z_opt else 0.1
alpha 2].axhline(ez, color="red", linestyle="--", alpha=alpha)
axs[2].axhline(-ez, color="red", linestyle="--", alpha=alpha)
axs[
for ez in exit_z_list:
= 1.0 if ez == exit_z_opt else 0.1
alpha 2].axhline(ez, color="green", linestyle="--", alpha=alpha)
axs[2].axhline(-ez, color="green", linestyle="--", alpha=alpha)
axs[
2].set_ylabel("Z-score")
axs[2].set_title("Z-score with Entry/Exit Candidates")
axs[
plt.tight_layout() plt.show()
# OOS 가격 데이터
= snp500_prices_wide_test[[p1, p2]].dropna()
price_oos = [comnam1, comnam2]
price_oos.columns
# 최적 파라미터
= opt_results.iloc[0]["entry_z"]
entry_z = opt_results.iloc[0]["exit_z"]
exit_z
# 스프레드 및 z-score
= price_oos[comnam1] - hedge_ratio * price_oos[comnam2]
spread_oos = spread_oos.mean(), spread_oos.std()
mu, sigma = (spread_oos - mu) / sigma
z_oos
# 포지션
= z_oos < -entry_z
long_entry = z_oos > entry_z
short_entry = z_oos.abs() < exit_z
exit_signal
= np.where(long_entry, 1,
pos -1,
np.where(short_entry, 0, np.nan)))
np.where(exit_signal, = pd.Series(pos, index=z_oos.index).ffill().fillna(0)
pos
# 수익률 계산
= price_oos.pct_change().dropna()
ret = pos.shift() * (ret[comnam1] - hedge_ratio * ret[comnam2])
strat_ret
# 거래비용 차감
= pos.diff().abs().fillna(0)
pos_chg = pos_chg.shift().fillna(0) * transaction_cost
cost = strat_ret - cost
strat_ret_net
= (1 + strat_ret_net).cumprod()
cum_ret
# 성과지표
= strat_ret_net.mean() / strat_ret_net.std() * np.sqrt(252)
sharpe = (cum_ret / cum_ret.cummax() - 1).min()
mdd
# 진입/청산 시점
= ((pos.shift(1) == 0) & (pos == 1))
entry_longs = ((pos.shift(1) == 0) & (pos == -1))
entry_shorts = ((pos.shift(1) != 0) & (pos == 0))
exits
= entry_longs[entry_longs].index
entry_longs_idx = entry_shorts[entry_shorts].index
entry_shorts_idx = exits[exits].index
exits_idx
# 시각화
= plt.subplots(3, 1, figsize=(14, 10), sharex=True)
fig, axs
# (1) 가격
0].plot(price_oos[comnam1], label=comnam1)
axs[0].plot(price_oos[comnam2], label=comnam2)
axs[0].set_ylabel("Price")
axs[0].legend()
axs[0].set_title("Price Path (OOS)")
axs[
# (2) Spread + Entry/Exit 기준선
1].plot(spread_oos, label="Spread")
axs[1].axhline(mu + sigma * entry_z, color="red", linestyle="--", label="Entry Z")
axs[1].axhline(mu - sigma * entry_z, color="red", linestyle="--")
axs[1].axhline(mu + sigma * exit_z, color="green", linestyle="--", label="Exit Z")
axs[1].axhline(mu - sigma * exit_z, color="green", linestyle="--")
axs[1].axhline(mu, color="black", linestyle="--", label="Mean")
axs[
# 진입/청산 화살표
1].scatter(entry_longs_idx, spread_oos.loc[entry_longs_idx], marker='^', color='blue', label='Long Entry', zorder=5)
axs[1].scatter(entry_shorts_idx, spread_oos.loc[entry_shorts_idx], marker='v', color='red', label='Short Entry', zorder=5)
axs[1].scatter(exits_idx, spread_oos.loc[exits_idx], marker='o', color='black', label='Exit', zorder=5)
axs[
1].set_ylabel("Spread")
axs[1].legend()
axs[1].set_title("Spread (OOS)")
axs[
# (3) 누적 수익률
2].plot(cum_ret, label="Cumulative Return")
axs[2].axhline(1, color="black", linestyle="--")
axs[2].set_ylabel("Cumulative Return")
axs[2].legend()
axs[2].set_title("Strategy Cumulative Return (2024)")
axs[
plt.tight_layout() plt.show()
print("--- Pair tradind Perfomance in 2024 ---")
print(f" 1. Cummulative return : {cum_ret.iloc[-1]:.4f}")
print(f" 2. Sharpe ratio : {sharpe:.2f}")
print(f" 3. Max Drawdown : {mdd:.2%}")
--- Pair tradind Perfomance in 2024 ---
1. Cummulative return : 1.0609
2. Sharpe ratio : 0.53
3. Max Drawdown : -16.66%