Files
cointrader/scripts/ls_ratio_backtest.py
21in7 52d05f2ddd feat: 전략 리서치 스크립트 및 테스트 일괄 추가
- FR/OI 백테스트, LS ratio 백테스트 스크립트
- 펀딩/OI 분석, 거래 LS 분석 스크립트
- evaluate_oos 테스트 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 09:03:06 +09:00

486 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
L/S Ratio 단독 백테스트 — Phase 1: Pure Edge Test
6개 조합 (3 임계값 × 2 방향) 스윕, 3단계 필터 판정.
데이터: 프로덕션 수집 L/S ratio + Binance kline (같은 기간).
Usage: python scripts/ls_ratio_backtest.py
"""
import asyncio
import aiohttp
import pandas as pd
import numpy as np
from datetime import timezone
from pathlib import Path
BASE = "https://fapi.binance.com"
DATA_DIR = Path("data")
SYMBOL = "XRPUSDT"
FEE_RATE = 0.0004 # 0.04% per side
HOLD_BARS = 4 # 4 candles = 1 hour
async def fetch_klines(session, symbol, start_ms, end_ms):
"""Binance kline 데이터 가져오기"""
all_klines = []
current = start_ms
while current < end_ms:
params = {
"symbol": symbol, "interval": "15m",
"startTime": current, "endTime": end_ms, "limit": 1500,
}
async with session.get(f"{BASE}/fapi/v1/klines", params=params) as resp:
data = await resp.json()
if not data:
break
all_klines.extend(data)
current = data[-1][0] + 1
return all_klines
def load_ls_ratio(symbol):
"""프로덕션 수집 L/S ratio 로드"""
path = DATA_DIR / symbol.lower() / "ls_ratio_15m.parquet"
if not path.exists():
raise FileNotFoundError(f"{path} not found. Sync from production first.")
df = pd.read_parquet(path)
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
return df.sort_values("timestamp").reset_index(drop=True)
def build_dataset(klines_raw, ls_df):
"""Kline + L/S ratio 조인"""
df = pd.DataFrame(klines_raw, columns=[
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_vol", "trades", "taker_buy_vol",
"taker_buy_quote_vol", "ignore",
])
df["timestamp"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
for c in ["open", "high", "low", "close", "volume"]:
df[c] = df[c].astype(float)
# L/S ratio 조인 (가장 가까운 타임스탬프)
df = df.sort_values("timestamp").reset_index(drop=True)
merged = pd.merge_asof(
df, ls_df, on="timestamp", direction="nearest",
tolerance=pd.Timedelta(minutes=20),
)
return merged
def run_backtest(df, percentile, direction, hold_bars=HOLD_BARS):
"""
단일 조합 백테스트 실행.
- percentile: L/S ratio 임계값 (0~100)
- direction: "LONG" or "SHORT"
- hold_bars: 보유 캔들 수
LONG 진입: ratio >= threshold (Momentum)
SHORT 진입: ratio <= threshold (Momentum)
"""
threshold = df["top_acct_ls_ratio"].quantile(percentile / 100)
trades = []
i = 0
while i < len(df) - hold_bars:
ratio = df.iloc[i]["top_acct_ls_ratio"]
if pd.isna(ratio):
i += 1
continue
# 시그널 체크
if direction == "LONG" and ratio >= threshold:
entry_price = df.iloc[i + 1]["open"] # 다음 캔들 시가 진입
exit_price = df.iloc[i + 1 + hold_bars - 1]["close"] # hold_bars 후 종가
gross_return = (exit_price / entry_price) - 1
fee = FEE_RATE * 2 # 진입 + 청산
net_return = gross_return - fee
trades.append({
"entry_time": df.iloc[i + 1]["timestamp"],
"exit_time": df.iloc[i + 1 + hold_bars - 1]["timestamp"],
"entry_price": entry_price,
"exit_price": exit_price,
"entry_ls_ratio": ratio,
"gross_return_bps": gross_return * 10000,
"net_return_bps": net_return * 10000,
"fee_bps": fee * 10000,
})
i += 1 + hold_bars # 포지션 종료 후 다음 캔들부터
elif direction == "SHORT" and ratio <= threshold:
entry_price = df.iloc[i + 1]["open"]
exit_price = df.iloc[i + 1 + hold_bars - 1]["close"]
gross_return = (entry_price / exit_price) - 1 # SHORT: 반대
fee = FEE_RATE * 2
net_return = gross_return - fee
trades.append({
"entry_time": df.iloc[i + 1]["timestamp"],
"exit_time": df.iloc[i + 1 + hold_bars - 1]["timestamp"],
"entry_price": entry_price,
"exit_price": exit_price,
"entry_ls_ratio": ratio,
"gross_return_bps": gross_return * 10000,
"net_return_bps": net_return * 10000,
"fee_bps": fee * 10000,
})
i += 1 + hold_bars
else:
i += 1
if not trades:
return None
df_trades = pd.DataFrame(trades)
# PF 계산: Σ(net profit) / Σ(|net loss|)
wins = df_trades[df_trades["net_return_bps"] > 0]["net_return_bps"]
losses = df_trades[df_trades["net_return_bps"] <= 0]["net_return_bps"]
gross_profit = wins.sum() if len(wins) > 0 else 0
gross_loss = abs(losses.sum()) if len(losses) > 0 else 0
pf = gross_profit / gross_loss if gross_loss > 0 else float("inf") if gross_profit > 0 else 0
# Max Drawdown (cumulative bps)
cum_pnl = df_trades["net_return_bps"].cumsum()
running_max = cum_pnl.cummax()
drawdown = cum_pnl - running_max
max_dd = drawdown.min()
return {
"trades": len(df_trades),
"wins": len(wins),
"losses": len(losses),
"win_rate": len(wins) / len(df_trades) * 100,
"pf": pf,
"total_pnl_bps": df_trades["net_return_bps"].sum(),
"avg_pnl_bps": df_trades["net_return_bps"].mean(),
"max_dd_bps": max_dd,
"threshold": threshold,
"df_trades": df_trades,
}
def confidence_emoji(n_trades):
if n_trades < 20:
return "🔴"
elif n_trades < 50:
return "🟡"
elif n_trades < 100:
return "🟢"
else:
return "🟢"
def confidence_label(n_trades):
if n_trades < 20:
return "폐기(과적합)"
elif n_trades < 50:
return "낮음(참고만)"
elif n_trades < 100:
return "보통(검토)"
else:
return "높음(우선)"
async def main():
print("=" * 80)
print(" L/S Ratio 단독 백테스트 — Phase 1: Pure Edge Test")
print("=" * 80)
# 1. 데이터 로드
print("\n[1] 데이터 로드")
ls_df = load_ls_ratio(SYMBOL)
print(f" L/S ratio: {len(ls_df)} rows ({ls_df['timestamp'].min()} ~ {ls_df['timestamp'].max()})")
start_ms = int(ls_df["timestamp"].min().timestamp() * 1000)
end_ms = int(ls_df["timestamp"].max().timestamp() * 1000)
async with aiohttp.ClientSession() as session:
klines = await fetch_klines(session, SYMBOL, start_ms, end_ms)
print(f" Klines: {len(klines)} rows")
df = build_dataset(klines, ls_df)
valid = df.dropna(subset=["top_acct_ls_ratio"])
print(f" 조인 결과: {len(df)} rows (L/S 매칭: {len(valid)})")
print(f" top_acct_ls_ratio: mean={valid['top_acct_ls_ratio'].mean():.4f}, "
f"std={valid['top_acct_ls_ratio'].std():.4f}")
# 백분위수 표시
for p in [25, 50, 75]:
v = valid["top_acct_ls_ratio"].quantile(p / 100)
print(f" P{p}: {v:.4f}")
# 2. 6개 조합 백테스트
print("\n[2] 6개 조합 백테스트 실행")
print("-" * 80)
combinations = [
(75, "LONG", "모멘텀 강함: ratio ≥ P75 → LONG"),
(75, "SHORT", "역모멘텀: ratio ≥ P75 → SHORT"),
(50, "LONG", "모멘텀 중간: ratio ≥ P50 → LONG"),
(50, "SHORT", "역모멘텀 중간: ratio ≤ P50 → SHORT"),
(25, "LONG", "역모멘텀 약: ratio ≤ P25 → LONG"),
(25, "SHORT", "모멘텀 강함: ratio ≤ P25 → SHORT"),
]
results = []
for pct, direction, desc in combinations:
# LONG: ratio >= threshold, SHORT: ratio <= threshold
# 25th percentile LONG = ratio가 낮을 때 LONG (Contrarian)
# 실제 로직:
# 75 LONG = ratio >= P75 (상위 25% 롱비율 높을 때 롱) = Momentum
# 75 SHORT = ratio >= P75 (상위 25% 롱비율 높을 때 숏) = Contrarian
# 25 SHORT = ratio <= P25 (하위 25% 롱비율 낮을 때 숏) = Momentum
# 25 LONG = ratio <= P25 (하위 25% 롱비율 낮을 때 롱) = Contrarian
# 방향 보정: 25th에서 LONG은 "ratio <= P25일 때 LONG" (Contrarian)
if pct == 25 and direction == "LONG":
# 특수 케이스: 낮은 ratio에서 LONG (Contrarian)
result = run_backtest_contrarian(df, pct, "LONG")
elif pct == 25 and direction == "SHORT":
# ratio <= P25일 때 SHORT (Momentum)
result = run_backtest(df, pct, "SHORT")
elif pct == 75 and direction == "SHORT":
# ratio >= P75일 때 SHORT (Contrarian)
result = run_backtest_contrarian(df, pct, "SHORT")
else:
result = run_backtest(df, pct, direction)
if result:
result["percentile"] = pct
result["direction"] = direction
result["description"] = desc
results.append(result)
else:
results.append({
"percentile": pct, "direction": direction,
"description": desc, "trades": 0, "pf": 0,
"win_rate": 0, "total_pnl_bps": 0, "max_dd_bps": 0,
"threshold": 0, "wins": 0, "losses": 0, "avg_pnl_bps": 0,
})
# 3. 결과 테이블
print("\n[3] 결과 테이블")
print("=" * 80)
print(f"{'조합':<35} {'거래수':>6} {'승률':>7} {'PF':>7} {'PnL(bps)':>10} {'MaxDD':>10} {'신뢰도':<15}")
print("-" * 80)
for r in results:
emoji = confidence_emoji(r["trades"])
label = confidence_label(r["trades"])
pf_str = f"{r['pf']:.2f}" if r["pf"] != float("inf") else "INF"
print(f"{r['description']:<35} {r['trades']:>6} {r['win_rate']:>6.1f}% {pf_str:>7} "
f"{r['total_pnl_bps']:>+10.1f} {r['max_dd_bps']:>10.1f} {emoji} {label}")
# 4. 필터 1: PF 판정
print("\n[4] 필터 1: PF 판정")
print("-" * 80)
strong = [r for r in results if r["pf"] > 1.5 and r["trades"] > 0]
weak = [r for r in results if 0.5 <= r["pf"] <= 1.5 and r["trades"] > 0]
failed = [r for r in results if r["pf"] < 0.5 and r["trades"] > 0]
print(f" PF > 1.5 (명확한 edge): {len(strong)}개 조합")
for r in strong:
print(f"{r['description']} (PF={r['pf']:.2f}, trades={r['trades']})")
print(f" 0.5 ≤ PF ≤ 1.5 (보류): {len(weak)}개 조합")
for r in weak:
print(f" ~ {r['description']} (PF={r['pf']:.2f}, trades={r['trades']})")
print(f" PF < 0.5 (실패): {len(failed)}개 조합")
for r in failed:
print(f"{r['description']} (PF={r['pf']:.2f}, trades={r['trades']})")
# 5. 필터 2: 거래수 신뢰도
print("\n[5] 필터 2: 거래수 신뢰도 (필터 1 통과 조합)")
print("-" * 80)
filter2_passed = [r for r in strong if r["trades"] >= 20]
filter2_ref = [r for r in strong if r["trades"] < 20]
if filter2_passed:
for r in filter2_passed:
print(f"{r['description']}{r['trades']}건 ({confidence_label(r['trades'])})")
else:
print(" ⚠️ PF > 1.5 조합 중 거래수 20건 이상인 것 없음")
if filter2_ref:
for r in filter2_ref:
print(f" 🔴 {r['description']}{r['trades']}건 (폐기: 과적합)")
# 6. 필터 3: 대칭성 판정
print("\n[6] 필터 3: 대칭성 판정")
print("-" * 80)
# 같은 percentile에서 LONG/SHORT 양쪽 확인
for pct in [75, 50, 25]:
long_r = next((r for r in results if r["percentile"] == pct and r["direction"] == "LONG"), None)
short_r = next((r for r in results if r["percentile"] == pct and r["direction"] == "SHORT"), None)
if not long_r or not short_r:
continue
l_pf = long_r["pf"]
s_pf = short_r["pf"]
if l_pf > 1.5 and s_pf > 1.5:
verdict = "Case 1: 양방향 생존 → ✓ Phase 2 후보"
elif (l_pf > 1.5 and s_pf < 0.5) or (s_pf > 1.5 and l_pf < 0.5):
verdict = "Case 2: 한쪽만 성공 → ✗ 시장 베타/우연 (폐기)"
elif l_pf > 1.5 or s_pf > 1.5:
verdict = "Case 3: 부분적 edge → ~ 낮은 신뢰도"
else:
verdict = "양쪽 모두 약함 → 해당 없음"
print(f" P{pct}: LONG PF={l_pf:.2f}, SHORT PF={s_pf:.2f}")
print(f"{verdict}")
# 7. 최종 판정
print("\n" + "=" * 80)
print(" [최종 판정]")
print("=" * 80)
# Phase 2 후보 찾기
phase2_candidates = []
for pct in [75, 50, 25]:
long_r = next((r for r in results if r["percentile"] == pct and r["direction"] == "LONG"), None)
short_r = next((r for r in results if r["percentile"] == pct and r["direction"] == "SHORT"), None)
if not long_r or not short_r:
continue
# Case 1: 양방향 PF > 1.5
if long_r["pf"] > 1.5 and short_r["pf"] > 1.5:
if long_r["trades"] >= 20 and short_r["trades"] >= 20:
phase2_candidates.append(("Case1", pct, long_r, short_r))
# Case 3: 한쪽만 PF > 1.5
elif long_r["pf"] > 1.5 and long_r["trades"] >= 20:
phase2_candidates.append(("Case3-LONG", pct, long_r, short_r))
elif short_r["pf"] > 1.5 and short_r["trades"] >= 20:
phase2_candidates.append(("Case3-SHORT", pct, long_r, short_r))
if phase2_candidates:
print("\n 🟢 Phase 2 진행 후보 발견!")
for case, pct, lr, sr in phase2_candidates:
print(f" [{case}] P{pct}: LONG PF={lr['pf']:.2f}({lr['trades']}건), "
f"SHORT PF={sr['pf']:.2f}({sr['trades']}건)")
print("\n → Phase 2 (Bot Simulation) 진행 권장")
print(" → 단, 8일 데이터이므로 4월 15일 재검증 필수")
else:
# 모든 조합 중 최고 PF
best = max(results, key=lambda r: r["pf"] if r["trades"] > 0 else 0)
if best["pf"] > 1.0:
print(f"\n 🟡 필터 미통과이나 PF > 1.0 조합 존재")
print(f" Best: {best['description']} (PF={best['pf']:.2f}, {best['trades']}건)")
print(f"\n → 데이터 부족. 4월 15일까지 수집 후 재검증")
else:
print(f"\n 🔴 PF > 1.0 조합 없음")
print(f" Best: {best['description']} (PF={best['pf']:.2f}, {best['trades']}건)")
print(f"\n → L/S ratio 단독 시그널로는 edge 없음")
print(f" → 다른 데이터 소스 탐색 권장")
# 8. 추가: 전 구간 상세 (best 조합)
best = max(results, key=lambda r: r["pf"] if r["trades"] > 10 else 0)
if "df_trades" in best and best["trades"] > 0:
print(f"\n[참고] Best 조합 상세: {best['description']}")
print("-" * 60)
tdf = best["df_trades"]
print(f" 거래 기간: {tdf['entry_time'].min()} ~ {tdf['exit_time'].max()}")
print(f" 평균 진입 L/S ratio: {tdf['entry_ls_ratio'].mean():.4f}")
print(f" 수익 거래 평균: {tdf[tdf['net_return_bps']>0]['net_return_bps'].mean():.1f} bps")
if len(tdf[tdf['net_return_bps'] <= 0]) > 0:
print(f" 손실 거래 평균: {tdf[tdf['net_return_bps']<=0]['net_return_bps'].mean():.1f} bps")
print(f" 최대 연승: ", end="")
streaks = []
streak = 0
for _, row in tdf.iterrows():
if row["net_return_bps"] > 0:
streak += 1
else:
if streak > 0:
streaks.append(streak)
streak = 0
if streak > 0:
streaks.append(streak)
print(f"{max(streaks) if streaks else 0}연승")
print("\n" + "=" * 80)
print(" 분석 완료. 결과를 바탕으로 의사결정하세요.")
print("=" * 80)
def run_backtest_contrarian(df, percentile, direction, hold_bars=HOLD_BARS):
"""
Contrarian 방향 백테스트.
- P25 + LONG: ratio <= P25일 때 LONG (낮은 ratio에서 롱)
- P75 + SHORT: ratio >= P75일 때 SHORT (높은 ratio에서 숏)
"""
threshold = df["top_acct_ls_ratio"].quantile(percentile / 100)
trades = []
i = 0
while i < len(df) - hold_bars:
ratio = df.iloc[i]["top_acct_ls_ratio"]
if pd.isna(ratio):
i += 1
continue
trigger = False
if direction == "LONG" and ratio <= threshold:
trigger = True
elif direction == "SHORT" and ratio >= threshold:
trigger = True
if trigger:
entry_price = df.iloc[i + 1]["open"]
exit_price = df.iloc[i + 1 + hold_bars - 1]["close"]
if direction == "LONG":
gross_return = (exit_price / entry_price) - 1
else:
gross_return = (entry_price / exit_price) - 1
fee = FEE_RATE * 2
net_return = gross_return - fee
trades.append({
"entry_time": df.iloc[i + 1]["timestamp"],
"exit_time": df.iloc[i + 1 + hold_bars - 1]["timestamp"],
"entry_price": entry_price,
"exit_price": exit_price,
"entry_ls_ratio": ratio,
"gross_return_bps": gross_return * 10000,
"net_return_bps": net_return * 10000,
"fee_bps": fee * 10000,
})
i += 1 + hold_bars
else:
i += 1
if not trades:
return None
df_trades = pd.DataFrame(trades)
wins = df_trades[df_trades["net_return_bps"] > 0]["net_return_bps"]
losses = df_trades[df_trades["net_return_bps"] <= 0]["net_return_bps"]
gross_profit = wins.sum() if len(wins) > 0 else 0
gross_loss = abs(losses.sum()) if len(losses) > 0 else 0
pf = gross_profit / gross_loss if gross_loss > 0 else float("inf") if gross_profit > 0 else 0
cum_pnl = df_trades["net_return_bps"].cumsum()
running_max = cum_pnl.cummax()
max_dd = (cum_pnl - running_max).min()
return {
"trades": len(df_trades),
"wins": len(wins),
"losses": len(losses),
"win_rate": len(wins) / len(df_trades) * 100,
"pf": pf,
"total_pnl_bps": df_trades["net_return_bps"].sum(),
"avg_pnl_bps": df_trades["net_return_bps"].mean(),
"max_dd_bps": max_dd,
"threshold": threshold,
"df_trades": df_trades,
}
if __name__ == "__main__":
asyncio.run(main())