3  알고리즘 거래전략 기말과제

20249132 김형환

from tqdm import tqdm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations, product
from joblib import Parallel, delayed
import statsmodels.api as sm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# 기존 데이터를 R로 가공하여 2022~2024의 snp500구성종목 데이터만 남기고, long form으로 변환
snp500_return = pd.read_csv("data/snp500_return.csv")
snp500_return['date'] = pd.to_datetime(snp500_return['date'])
snp500_return.head()
date permno ticker comnam return
0 2022-01-03 10104 ORCL ORACLE CORP 0.007912
1 2022-01-03 10107 MSFT MICROSOFT CORP -0.004668
2 2022-01-03 10138 TROW T ROWE PRICE GROUP INC -0.010476
3 2022-01-03 10145 HON HONEYWELL INTERNATIONAL INC -0.008201
4 2022-01-03 10516 ADM ARCHER DANIELS MIDLAND CO 0.004439
# 공적분 관계 검정을 위해 wide form 생성, 추후 전략 성능평가를 위해 24년 1년간은 테스트 목적으로 분할
snp500_return_train = snp500_return[snp500_return["date"] < "2024-01-01"]
snp500_return_test  = snp500_return[snp500_return["date"] >= "2024-01-01"]

snp500_return_wide = snp500_return_train.pivot(index='date', columns='permno', values='return').dropna(axis=1)
snp500_return_wide_test = snp500_return_test.pivot(index='date', columns='permno', values='return').dropna(axis=1)

snp500_prices_wide = (1 + snp500_return_wide).cumprod()
snp500_prices_wide_test = (1 + snp500_return_wide_test).cumprod()

print(snp500_return_wide.info())
print(snp500_return_wide_test.info())
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 501 entries, 2022-01-03 to 2023-12-29
Columns: 456 entries, 10104 to 93436
dtypes: float64(456)
memory usage: 1.7 MB
None
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 252 entries, 2024-01-02 to 2024-12-31
Columns: 456 entries, 10104 to 93436
dtypes: float64(456)
memory usage: 899.7 KB
None
# 공적분 검정 이전 계산량 효율화를 위한 사전 스크리닝
# 1. 단일 ADF검정 활용
alpha = 0.05

tqdm.pandas(desc="ADF Test")
pvals = snp500_prices_wide.progress_apply(lambda ts: adfuller(ts)[1])
non_sta = pvals[pvals >= alpha].index.tolist()
filtered_prices_wide = snp500_prices_wide[non_sta]

print(f"Number of total stocks : {len(pvals)}")
print(f"Number of non-stationary stocks(p<{alpha}) : {len(non_sta)}")
ADF Test: 100%|██████████| 456/456 [00:03<00:00, 130.26it/s]
Number of total stocks : 456
Number of non-stationary stocks(p<0.05) : 370
# 공적분 검정 이전 계산량 효율화를 위한 사전 스크리닝
# 2. 상관계수 활용
corr_threshold = 0.3

corr = filtered_prices_wide.corr().abs()
pairs = [
    (i, j)
    for i, j in combinations(filtered_prices_wide.columns, 2)
    if corr.loc[i, j] > corr_threshold]

print(f"Number of whole pairs : {len(list(combinations(filtered_prices_wide.columns, 2)))}")
print(f"Number of correlated(>{corr_threshold}) pairs : {len(pairs)}")
Number of whole pairs : 68265
Number of correlated(>0.3) pairs : 43997
# Engle-Granger 테스트를 통해 공적분 관계 검정정

alpha_coint = 0.01
min_obs = 50

def test_pair(pair):
    t1, t2 = pair
    df = filtered_prices_wide[[t1, t2]].dropna()
    if len(df) < min_obs:
        return None
    y = df[t1]
    x = sm.add_constant(df[t2])
    model = sm.OLS(y, x).fit()
    resid = model.resid
    try:
        pval = adfuller(resid, autolag=None)[1]
        if pval < alpha_coint:
            return {"permno_1": t1, "permno_2": t2, "pvalue": pval}
    except:
        return None
    return None

results = Parallel(n_jobs=-1, verbose=5)(
    delayed(test_pair)(pair) for pair in pairs
)

results = [r for r in results if r is not None]
coint_pairs = pd.DataFrame(results)
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  40 tasks      | elapsed:    4.4s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:    4.7s
[Parallel(n_jobs=-1)]: Done 2080 tasks      | elapsed:    6.0s
[Parallel(n_jobs=-1)]: Done 7264 tasks      | elapsed:    9.3s
[Parallel(n_jobs=-1)]: Done 13600 tasks      | elapsed:   13.6s
[Parallel(n_jobs=-1)]: Done 21088 tasks      | elapsed:   17.4s
[Parallel(n_jobs=-1)]: Done 29728 tasks      | elapsed:   22.1s
[Parallel(n_jobs=-1)]: Done 39520 tasks      | elapsed:   27.4s
[Parallel(n_jobs=-1)]: Done 43966 out of 43997 | elapsed:   30.4s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done 43997 out of 43997 | elapsed:   30.5s finished
print(f"Number of cointegrated(significant level={alpha_coint:.0%}) pairs : {len(coint_pairs)}")
Number of cointegrated(significant level=1%) pairs : 2565
# 상위 5개 공적분 페어 추출
comnam_map = snp500_return.drop_duplicates("permno").set_index("permno")["comnam"].to_dict()
top5_pairs = coint_pairs.sort_values("pvalue").head(5).copy()
top5_pairs["comnam_1"], top5_pairs["comnam_2"] = top5_pairs["permno_1"].map(comnam_map), top5_pairs["permno_2"].map(comnam_map)

top5_pairs
permno_1 permno_2 pvalue comnam_1 comnam_2
320 11850 93246 9.981788e-07 EXXON MOBIL CORP GENERAC HOLDINGS INC
2308 76605 77730 1.170548e-05 AUTOZONE INC TYSON FOODS INC
376 13407 86580 1.595093e-05 META PLATFORMS INC NVIDIA CORP
1419 24643 52708 1.624042e-05 HOWMET AEROSPACE INC LENNAR CORP
2495 87137 90215 2.325270e-05 DEVON ENERGY CORP NEW SALESFORCE INC
# Pair들에 대한 반감기, Historical Mean Crossing, 헤지비율, Hurst 지수, Pearson Corr 계산
pair_list   = list(coint_pairs[["permno_1","permno_2","pvalue"]].itertuples(index=False))
total_pairs = len(pair_list)

records = []
for permno_1, permno_2, pval in tqdm(pair_list, total=total_pairs, desc="Compute metrics"):
    
    pair = pd.concat([filtered_prices_wide[permno_1], filtered_prices_wide[permno_2]], axis=1, join='inner').dropna()
    if len(pair) < 252:
        continue2
    y1, y2 = pair.iloc[:,0], pair.iloc[:,1]

    # OLS로 hedge ratio β 추정
    X   = sm.add_constant(y2.values)
    res = sm.OLS(y1.values, X).fit()
    beta = res.params[1]

    spread = y1 - beta * y2

    # Hurst exponent
    lags = np.array([2,5,10,20,50])
    tau  = np.array([np.std(spread.diff(l).dropna()) for l in lags])
    H    = np.polyfit(np.log(lags), np.log(tau), 1)[0]

    # Half-life
    spread_lag    = spread.shift(1).dropna()
    spread_ret    = spread.diff().dropna().loc[spread_lag.index]
    reg           = sm.OLS(spread_ret.values, sm.add_constant(spread_lag.values)).fit()
    b_coef        = reg.params[1]
    phi_ar        = b_coef + 1
    half_life     = -np.log(2) / np.log(phi_ar)

    # Historical mean crossings per year
    m               = spread.mean()
    centered        = spread - m
    signs           = np.sign(centered)
    crossings       = np.sum(signs.diff().fillna(0) != 0)
    years           = (spread.index[-1] - spread.index[0]).days / 365.25
    mean_cross_year = crossings / years

    # Pearson Corr
    ret_pair = snp500_return_wide.loc[:, [permno_1, permno_2]].dropna(how="any")
    pearson_corr = ret_pair.corr().iloc[0,1] if len(ret_pair) > 1 else np.nan

    records.append({
        "permno_1":        permno_1,
        "permno_2":        permno_2,
        "pvalue":          pval,
        "hedge_ratio":     beta,
        "hurst":           H,
        "half_life":       half_life,
        "mean_cross_year": mean_cross_year,
        "pearson_corr":    pearson_corr
    })

metrics_df_1pct = pd.DataFrame(records)
metrics_df_1pct["comnam_1"] = metrics_df_1pct["permno_1"].map(comnam_map)
metrics_df_1pct["comnam_2"] = metrics_df_1pct["permno_2"].map(comnam_map)
Compute metrics: 100%|██████████| 2565/2565 [00:23<00:00, 111.44it/s]
# Hurst,Half-life,Mean-crossings,pearson_corr 필터링
filtered_pairs = metrics_df_1pct[
    (metrics_df_1pct["hurst"]           < 0.5)   &
    (metrics_df_1pct["half_life"]       > 1)     &
    (metrics_df_1pct["half_life"]       < 30)    &
    (metrics_df_1pct["mean_cross_year"] > 5)    &
    (metrics_df_1pct["pearson_corr"] > 0.5)
].copy()

# 점수 계산 및 최우수 페어 추출
filtered_pairs["score"] = (
    -filtered_pairs["pvalue"].rank() +
    -filtered_pairs["half_life"].rank() +
    filtered_pairs["mean_cross_year"].rank() +
    filtered_pairs["pearson_corr"].rank()
)

best5_pairs = filtered_pairs.sort_values("score", ascending=False).iloc[0:5]

print(f"Number of Filtered pairs : {len(filtered_pairs)}")
best5_pairs
Number of Filtered pairs : 322
permno_1 permno_2 pvalue hedge_ratio hurst half_life mean_cross_year pearson_corr comnam_1 comnam_2 score
853 19286 78987 0.000158 0.680322 0.314507 7.191855 32.242759 0.605119 OTIS WORLDWIDE CORP MICROCHIP TECHNOLOGY INC 557.5
340 12084 48486 0.001111 0.596882 0.360651 10.127506 29.220000 0.792955 N X P SEMICONDUCTORS N V LAM RESH CORP 543.5
2107 58819 64936 0.000495 0.300429 0.402952 12.482854 29.220000 0.761218 ALLIANT ENERGY CORP DOMINION ENERGY INC 519.5
1178 23931 24109 0.000785 0.848911 0.393015 12.980947 29.220000 0.886921 X C E L ENERGY INC AMERICAN ELECTRIC POWER CO INC 509.5
848 19286 60871 0.001356 0.737086 0.314549 8.868845 29.723793 0.621187 OTIS WORLDWIDE CORP ANALOG DEVICES INC 509.0
# 최적 쌍을 이용하여 전략 구성. 성과가 가장 좋은 진입청산기준을 선정해서 test데이터로 평가 실시
pair = best5_pairs.iloc[0]
p1, p2 = pair.permno_1, pair.permno_2
hedge_ratio = pair.hedge_ratio
comnam1, comnam2 = pair.comnam_1, pair.comnam_2

# 가격 데이터
price_is = snp500_prices_wide[[p1, p2]].dropna()
price_is.columns = [comnam1, comnam2]

# 파라미터 후보
entry_z_list = [0.5 ,1.0, 1.5, 2.0, 3.0]
exit_z_list = [0.0, 0.1, 0.3, 0.5]
transaction_cost = 0.0003  # 거래비용 3bp

results = []

for entry_z, exit_z in product(entry_z_list, exit_z_list):
    spread = price_is[comnam1] - hedge_ratio * price_is[comnam2]
    mu, sigma = spread.mean(), spread.std()
    z = (spread - mu) / sigma

    long_entry  = z < -entry_z
    short_entry = z >  entry_z
    exit_signal = z.abs() < exit_z

    # 포지션 설정
    pos = np.where(long_entry, 1,
          np.where(short_entry, -1,
          np.where(exit_signal, 0, np.nan)))
    pos = pd.Series(pos, index=z.index).ffill().fillna(0)

    # 수익률 계산
    ret = price_is.pct_change().dropna()
    strat_ret = pos.shift() * (ret[comnam1] - hedge_ratio * ret[comnam2])

    # 거래비용 반영 (포지션 변경 시 비용 발생)
    pos_chg = pos.diff().abs()
    cost = pos_chg.shift().fillna(0) * transaction_cost
    strat_ret_net = strat_ret - cost

    cum_ret = (1 + strat_ret_net).cumprod()
    sharpe = strat_ret_net.mean() / strat_ret_net.std() * np.sqrt(252)
    mdd = (cum_ret / cum_ret.cummax() - 1).min()

    results.append({
        "entry_z": entry_z,
        "exit_z": exit_z,
        "final_return": cum_ret.iloc[-1],
        "sharpe": sharpe,
        "mdd": mdd
    })

# 결과 정리
opt_results = pd.DataFrame(results).sort_values(by="sharpe", ascending=False).reset_index(drop=True)
display(opt_results)
entry_z exit_z final_return sharpe mdd
0 1.5 0.5 2.015736 3.152895 -0.044008
1 1.5 0.3 2.098776 3.035545 -0.056847
2 0.5 0.5 2.709583 2.929092 -0.088697
3 1.0 0.3 2.527457 2.925949 -0.070058
4 1.5 0.1 2.165035 2.904456 -0.068544
5 1.0 0.5 2.366084 2.890452 -0.070058
6 0.5 0.3 2.716753 2.780591 -0.088697
7 0.5 0.1 2.743286 2.626813 -0.088697
8 1.0 0.0 2.926148 2.523193 -0.093610
9 1.0 0.1 2.314327 2.489392 -0.076772
10 0.5 0.0 2.780223 2.405471 -0.093610
11 1.5 0.0 2.411925 2.112670 -0.102202
12 2.0 0.5 1.262682 1.875213 -0.038795
13 2.0 0.3 1.278799 1.811135 -0.038795
14 3.0 0.5 1.099983 1.519723 -0.031609
15 3.0 0.1 1.114023 1.294801 -0.031609
16 3.0 0.3 1.114023 1.294801 -0.031609
17 2.0 0.1 1.288337 1.231710 -0.101385
18 2.0 0.0 1.496786 1.036650 -0.139779
19 3.0 0.0 1.121741 0.578127 -0.080870
# 최적 entry_z, exit_z
entry_z_opt = opt_results.iloc[0]["entry_z"]
exit_z_opt  = opt_results.iloc[0]["exit_z"]

entry_z_list = [0.5 ,1.0, 1.5, 2.0, 3.0]
exit_z_list = [0.0, 0.1, 0.3, 0.5]

spread = price_is[comnam1] - hedge_ratio * price_is[comnam2]
mu, sigma = spread.mean(), spread.std()
z = (spread - mu) / sigma

fig, axs = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# (1) 가격 경로
axs[0].plot(price_is[comnam1], label=comnam1)
axs[0].plot(price_is[comnam2], label=comnam2)
axs[0].set_ylabel("Price")
axs[0].legend()
axs[0].set_title("Price Path (2022~2023)")

# (2) 스프레드 + 기준선
axs[1].plot(spread, label="Spread")
axs[1].axhline(mu, color="black", linestyle="--", label="Mean")

for ez in entry_z_list:
    alpha = 1.0 if ez == entry_z_opt else 0.1
    axs[1].axhline(mu + sigma * ez, color="red", linestyle="--", alpha=alpha, label=f"+{ez}" if alpha==1.0 else None)
    axs[1].axhline(mu - sigma * ez, color="red", linestyle="--", alpha=alpha, label=f"-{ez}" if alpha==1.0 else None)

for ez in exit_z_list:
    alpha = 1.0 if ez == exit_z_opt else 0.1
    axs[1].axhline(mu + sigma * ez, color="green", linestyle="--", alpha=alpha, label=f"+{ez}" if alpha==1.0 else None)
    axs[1].axhline(mu - sigma * ez, color="green", linestyle="--", alpha=alpha, label=f"-{ez}" if alpha==1.0 else None)

axs[1].set_ylabel("Spread")
axs[1].set_title("Spread with All Entry/Exit Thresholds")

# (3) z-score + 기준선
axs[2].plot(z, label="Z-score")
axs[2].axhline(0, color="black", linestyle="--")

for ez in entry_z_list:
    alpha = 1.0 if ez == entry_z_opt else 0.1
    axs[2].axhline(ez, color="red", linestyle="--", alpha=alpha)
    axs[2].axhline(-ez, color="red", linestyle="--", alpha=alpha)

for ez in exit_z_list:
    alpha = 1.0 if ez == exit_z_opt else 0.1
    axs[2].axhline(ez, color="green", linestyle="--", alpha=alpha)
    axs[2].axhline(-ez, color="green", linestyle="--", alpha=alpha)

axs[2].set_ylabel("Z-score")
axs[2].set_title("Z-score with Entry/Exit Candidates")

plt.tight_layout()
plt.show()

# OOS 가격 데이터
price_oos = snp500_prices_wide_test[[p1, p2]].dropna()
price_oos.columns = [comnam1, comnam2]

# 최적 파라미터
entry_z = opt_results.iloc[0]["entry_z"]
exit_z  = opt_results.iloc[0]["exit_z"]

# 스프레드 및 z-score
spread_oos = price_oos[comnam1] - hedge_ratio * price_oos[comnam2]
mu, sigma = spread_oos.mean(), spread_oos.std()
z_oos = (spread_oos - mu) / sigma

# 포지션
long_entry  = z_oos < -entry_z
short_entry = z_oos >  entry_z
exit_signal = z_oos.abs() < exit_z

pos = np.where(long_entry, 1,
      np.where(short_entry, -1,
      np.where(exit_signal, 0, np.nan)))
pos = pd.Series(pos, index=z_oos.index).ffill().fillna(0)

# 수익률 계산
ret = price_oos.pct_change().dropna()
strat_ret = pos.shift() * (ret[comnam1] - hedge_ratio * ret[comnam2])

# 거래비용 차감
pos_chg = pos.diff().abs().fillna(0)
cost = pos_chg.shift().fillna(0) * transaction_cost
strat_ret_net = strat_ret - cost

cum_ret = (1 + strat_ret_net).cumprod()

# 성과지표
sharpe = strat_ret_net.mean() / strat_ret_net.std() * np.sqrt(252)
mdd = (cum_ret / cum_ret.cummax() - 1).min()

# 진입/청산 시점
entry_longs  = ((pos.shift(1) == 0) & (pos == 1))
entry_shorts = ((pos.shift(1) == 0) & (pos == -1))
exits        = ((pos.shift(1) != 0) & (pos == 0))

entry_longs_idx = entry_longs[entry_longs].index
entry_shorts_idx = entry_shorts[entry_shorts].index
exits_idx = exits[exits].index

# 시각화
fig, axs = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# (1) 가격
axs[0].plot(price_oos[comnam1], label=comnam1)
axs[0].plot(price_oos[comnam2], label=comnam2)
axs[0].set_ylabel("Price")
axs[0].legend()
axs[0].set_title("Price Path (OOS)")

# (2) Spread + Entry/Exit 기준선
axs[1].plot(spread_oos, label="Spread")
axs[1].axhline(mu + sigma * entry_z, color="red", linestyle="--", label="Entry Z")
axs[1].axhline(mu - sigma * entry_z, color="red", linestyle="--")
axs[1].axhline(mu + sigma * exit_z, color="green", linestyle="--", label="Exit Z")
axs[1].axhline(mu - sigma * exit_z, color="green", linestyle="--")
axs[1].axhline(mu, color="black", linestyle="--", label="Mean")

# 진입/청산 화살표
axs[1].scatter(entry_longs_idx, spread_oos.loc[entry_longs_idx], marker='^', color='blue', label='Long Entry', zorder=5)
axs[1].scatter(entry_shorts_idx, spread_oos.loc[entry_shorts_idx], marker='v', color='red', label='Short Entry', zorder=5)
axs[1].scatter(exits_idx, spread_oos.loc[exits_idx], marker='o', color='black', label='Exit', zorder=5)

axs[1].set_ylabel("Spread")
axs[1].legend()
axs[1].set_title("Spread (OOS)")

# (3) 누적 수익률
axs[2].plot(cum_ret, label="Cumulative Return")
axs[2].axhline(1, color="black", linestyle="--")
axs[2].set_ylabel("Cumulative Return")
axs[2].legend()
axs[2].set_title("Strategy Cumulative Return (2024)")

plt.tight_layout()
plt.show()

print("--- Pair tradind Perfomance in 2024 ---")
print(f" 1. Cummulative return : {cum_ret.iloc[-1]:.4f}")
print(f" 2. Sharpe ratio       : {sharpe:.2f}")
print(f" 3. Max Drawdown       : {mdd:.2%}")
--- Pair tradind Perfomance in 2024 ---
 1. Cummulative return : 1.0609
 2. Sharpe ratio       : 0.53
 3. Max Drawdown       : -16.66%