Files
cointrader/scripts/fr_oi_backtest.py
21in7 52d05f2ddd feat: 전략 리서치 스크립트 및 테스트 일괄 추가
- FR/OI 백테스트, LS ratio 백테스트 스크립트
- 펀딩/OI 분석, 거래 LS 분석 스크립트
- evaluate_oos 테스트 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 09:03:06 +09:00

365 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FR × OI 변화율 백테스트 — Phase 1: 12개 조합
신호: FR × OI변화율(1h) = funding_rate × oi_pct_change_4
- SHORT: 피처 >= threshold (롱 스퀴즈 전조)
- LONG: 피처 <= threshold (숏 스퀴즈 전조)
- 보유: 1h(4캔들) / 4h(16캔들)
Usage: python scripts/fr_oi_backtest.py
"""
import asyncio
import aiohttp
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from pathlib import Path
BASE = "https://fapi.binance.com"
SYMBOL = "XRPUSDT"
DATA_DIR = Path("data/xrpusdt")
FEE_RATE = 0.0004
async def fetch_oi_history(session, symbol, start_ms, end_ms):
all_data = []
current = start_ms
calls = 0
while current < end_ms:
params = {"symbol": symbol, "period": "15m", "startTime": current, "endTime": end_ms, "limit": 500}
async with session.get(f"{BASE}/futures/data/openInterestHist", params=params) as resp:
data = await resp.json()
if not data or not isinstance(data, list):
break
all_data.extend(data)
last_ts = int(data[-1]["timestamp"])
if last_ts <= current:
break
current = last_ts + 1
calls += 1
if calls % 50 == 0:
await asyncio.sleep(5)
else:
await asyncio.sleep(0.1)
if not all_data:
return pd.DataFrame()
df = pd.DataFrame(all_data)
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms", utc=True)
df["oi_value"] = df["sumOpenInterestValue"].astype(float)
return df[["timestamp", "oi_value"]].drop_duplicates("timestamp").sort_values("timestamp")
async def fetch_funding_rate(session, symbol, start_ms, end_ms):
all_data = []
current = start_ms
while current < end_ms:
params = {"symbol": symbol, "startTime": current, "endTime": end_ms, "limit": 1000}
async with session.get(f"{BASE}/fapi/v1/fundingRate", params=params) as resp:
data = await resp.json()
if not data or not isinstance(data, list):
break
all_data.extend(data)
last_ts = int(data[-1]["fundingTime"])
if last_ts <= current:
break
current = last_ts + 1
await asyncio.sleep(0.1)
if not all_data:
return pd.DataFrame()
df = pd.DataFrame(all_data)
df["timestamp"] = pd.to_datetime(df["fundingTime"].astype(int), unit="ms", utc=True)
df["funding_rate"] = df["fundingRate"].astype(float)
return df[["timestamp", "funding_rate"]].drop_duplicates("timestamp").sort_values("timestamp")
def run_backtest(df, feature_col, percentile, direction, hold_bars):
threshold = df[feature_col].quantile(percentile / 100)
trades = []
i = 0
while i < len(df) - hold_bars - 1:
val = df.iloc[i][feature_col]
if pd.isna(val):
i += 1
continue
trigger = False
if direction == "SHORT" and val >= threshold:
trigger = True
elif direction == "LONG" and val <= threshold:
trigger = True
if trigger:
entry_idx = i + 1
exit_idx = i + 1 + hold_bars - 1
if exit_idx >= len(df):
break
entry_price = df.iloc[entry_idx]["open"]
exit_price = df.iloc[exit_idx]["close"]
if direction == "LONG":
gross_return = (exit_price / entry_price) - 1
else:
gross_return = (entry_price / exit_price) - 1
fee = FEE_RATE * 2
net_return = gross_return - fee
trades.append({
"entry_time": df.iloc[entry_idx]["timestamp"],
"exit_time": df.iloc[exit_idx]["timestamp"],
"entry_price": entry_price,
"exit_price": exit_price,
"feature_val": val,
"gross_return_bps": gross_return * 10000,
"net_return_bps": net_return * 10000,
})
i = exit_idx + 1 # 포지션 종료 후 다음
else:
i += 1
if not trades:
return None
tdf = pd.DataFrame(trades)
wins = tdf[tdf["net_return_bps"] > 0]["net_return_bps"]
losses = tdf[tdf["net_return_bps"] <= 0]["net_return_bps"]
gross_profit = wins.sum() if len(wins) > 0 else 0
gross_loss = abs(losses.sum()) if len(losses) > 0 else 0
pf = gross_profit / gross_loss if gross_loss > 0 else float("inf") if gross_profit > 0 else 0
cum_pnl = tdf["net_return_bps"].cumsum()
max_dd = (cum_pnl - cum_pnl.cummax()).min()
return {
"trades": len(tdf),
"wins": len(wins),
"losses": len(losses),
"win_rate": len(wins) / len(tdf) * 100,
"pf": pf,
"total_pnl_bps": tdf["net_return_bps"].sum(),
"avg_pnl_bps": tdf["net_return_bps"].mean(),
"max_dd_bps": max_dd,
"threshold": threshold,
"df_trades": tdf,
}
def confidence(n):
if n < 20:
return "🔴", "폐기"
elif n < 50:
return "🟡", "참고"
else:
return "🟢", "검토"
async def main():
print("=" * 80)
print(" FR × OI 변화율 백테스트 — Phase 1: 12개 조합")
print("=" * 80)
# 데이터 수집
print("\n[1] 데이터 수집")
df_kline = pd.read_parquet(DATA_DIR / "combined_15m.parquet")
end_dt = datetime.now(timezone.utc)
oi_start_dt = end_dt - timedelta(days=29)
oi_start_ms = int(oi_start_dt.replace(microsecond=0, second=0).timestamp()) * 1000
fr_start_ms = oi_start_ms
end_ms = int(end_dt.replace(microsecond=0, second=0).timestamp()) * 1000
async with aiohttp.ClientSession() as session:
print(" OI 수집...")
oi_df = await fetch_oi_history(session, SYMBOL, oi_start_ms, end_ms)
print(f" OI: {len(oi_df)} rows")
print(" FR 수집...")
fr_df = await fetch_funding_rate(session, SYMBOL, fr_start_ms, end_ms)
print(f" FR: {len(fr_df)} rows")
# 병합
print("\n[2] 데이터 병합")
df = df_kline.loc[oi_start_dt:].copy().reset_index()
print(f" Kline (29일): {len(df)} rows")
# OI 병합
df = pd.merge_asof(df.sort_values("timestamp"), oi_df.sort_values("timestamp"),
on="timestamp", direction="nearest", tolerance=pd.Timedelta(minutes=20))
df["oi_pct_change_4"] = df["oi_value"].pct_change(4)
# FR 병합 (forward fill)
df = pd.merge_asof(df.sort_values("timestamp"), fr_df.rename(columns={"funding_rate": "fr_api"}).sort_values("timestamp"),
on="timestamp", direction="backward")
# 핵심 피처: FR × OI변화율(1h)
df["fr_x_oi_1h"] = df["fr_api"] * df["oi_pct_change_4"]
valid = df.dropna(subset=["fr_x_oi_1h"])
print(f" 유효 데이터: {len(valid)} rows")
print(f" fr_x_oi_1h: mean={valid['fr_x_oi_1h'].mean():.8f}, std={valid['fr_x_oi_1h'].std():.8f}")
for p in [25, 50, 75]:
v = valid["fr_x_oi_1h"].quantile(p / 100)
print(f" P{p}: {v:.8f}")
# 12개 조합 백테스트
print("\n[3] 12개 조합 백테스트")
print("=" * 80)
combos = []
for hold_label, hold_bars in [("1h", 4), ("4h", 16)]:
for direction in ["SHORT", "LONG"]:
for pct in [75, 50, 25]:
desc_dir = "롱스퀴즈" if direction == "SHORT" else "숏스퀴즈"
combos.append({
"hold_label": hold_label,
"hold_bars": hold_bars,
"direction": direction,
"percentile": pct,
"desc": f"{direction} {hold_label} P{pct} ({desc_dir})",
})
results = []
for c in combos:
r = run_backtest(valid.reset_index(drop=True), "fr_x_oi_1h",
c["percentile"], c["direction"], c["hold_bars"])
if r:
r.update(c)
else:
r = {**c, "trades": 0, "wins": 0, "losses": 0, "win_rate": 0,
"pf": 0, "total_pnl_bps": 0, "avg_pnl_bps": 0, "max_dd_bps": 0, "threshold": 0}
results.append(r)
# 결과 테이블
print(f"\n{'ID':>3} {'조합':<28} {'거래수':>6} {'승률':>7} {'PF':>7} {'PnL(bps)':>10} {'MaxDD':>10} {'신뢰도'}")
print("-" * 90)
for i, r in enumerate(results, 1):
emoji, label = confidence(r["trades"])
pf_str = f"{r['pf']:.2f}" if r["pf"] != float("inf") else "INF"
print(f"{i:>3} {r['desc']:<28} {r['trades']:>6} {r['win_rate']:>6.1f}% {pf_str:>7} "
f"{r['total_pnl_bps']:>+10.1f} {r['max_dd_bps']:>10.1f} {emoji} {label}")
# 대칭성 검증
print("\n" + "=" * 80)
print(" [대칭성 검증]")
print("=" * 80)
for hold_label in ["1h", "4h"]:
shorts = [r for r in results if r["hold_label"] == hold_label and r["direction"] == "SHORT" and r["trades"] > 0]
longs = [r for r in results if r["hold_label"] == hold_label and r["direction"] == "LONG" and r["trades"] > 0]
best_short = max(shorts, key=lambda x: x["pf"]) if shorts else None
best_long = max(longs, key=lambda x: x["pf"]) if longs else None
print(f"\n [{hold_label} 보유]")
if best_short:
print(f" Best SHORT: {best_short['desc']} — PF={best_short['pf']:.2f}, {best_short['trades']}")
if best_long:
print(f" Best LONG: {best_long['desc']} — PF={best_long['pf']:.2f}, {best_long['trades']}")
if best_short and best_long:
s_pf = best_short["pf"]
l_pf = best_long["pf"]
if s_pf > 1.5 and l_pf > 1.5:
print(f" → Case 1: 양방향 생존 ✓ Phase 2 후보")
elif (s_pf > 1.5 and l_pf < 0.5) or (l_pf > 1.5 and s_pf < 0.5):
print(f" → Case 2: 한쪽만 성공 ✗ 시장 베타/우연")
elif s_pf > 1.5 or l_pf > 1.5:
print(f" → Case 3: 부분적 edge ~ 낮은 신뢰도")
elif s_pf > 1.0 and l_pf > 1.0:
print(f" → 양쪽 PF > 1.0이나 < 1.5 — 약한 edge")
else:
print(f" → 양쪽 모두 약함")
# 보유시간 비교
print("\n" + "=" * 80)
print(" [보유시간 비교]")
print("=" * 80)
for direction in ["SHORT", "LONG"]:
r_1h = [r for r in results if r["hold_label"] == "1h" and r["direction"] == direction and r["trades"] > 0]
r_4h = [r for r in results if r["hold_label"] == "4h" and r["direction"] == direction and r["trades"] > 0]
best_1h = max(r_1h, key=lambda x: x["pf"]) if r_1h else None
best_4h = max(r_4h, key=lambda x: x["pf"]) if r_4h else None
print(f"\n [{direction}]")
if best_1h:
print(f" 1h Best: PF={best_1h['pf']:.2f} ({best_1h['desc']}, {best_1h['trades']}건)")
if best_4h:
print(f" 4h Best: PF={best_4h['pf']:.2f} ({best_4h['desc']}, {best_4h['trades']}건)")
if best_1h and best_4h:
if best_4h["pf"] > best_1h["pf"]:
print(f" → 4h가 더 강함 (상관분석 r=-0.1734과 일치)")
else:
print(f" → 1h가 더 강함 (주의: 상관분석은 4h 기준)")
# 최종 판정
print("\n" + "=" * 80)
print(" [최종 판정]")
print("=" * 80)
# Phase 2 후보 찾기
phase2 = []
for hold_label in ["4h", "1h"]:
shorts = [r for r in results if r["hold_label"] == hold_label and r["direction"] == "SHORT" and r["trades"] >= 20]
longs = [r for r in results if r["hold_label"] == hold_label and r["direction"] == "LONG" and r["trades"] >= 20]
best_s = max(shorts, key=lambda x: x["pf"]) if shorts else None
best_l = max(longs, key=lambda x: x["pf"]) if longs else None
if best_s and best_l:
if best_s["pf"] > 1.5 and best_l["pf"] > 1.5:
phase2.append(("Case1", hold_label, best_s, best_l))
elif best_s["pf"] > 1.5 or best_l["pf"] > 1.5:
phase2.append(("Case3", hold_label, best_s, best_l))
if phase2:
print(f"\n 🟢 Phase 2 후보 발견!")
for case, hl, bs, bl in phase2:
print(f" [{case}] {hl}: SHORT PF={bs['pf']:.2f}({bs['trades']}건), "
f"LONG PF={bl['pf']:.2f}({bl['trades']}건)")
print(f"\n → Phase 2 (Bot Simulation) 진행 권장")
print(f" → 단, 29일 OI 데이터 + 448행 제한 감안")
else:
all_pf = [(r["desc"], r["pf"], r["trades"]) for r in results if r["trades"] > 0]
all_pf.sort(key=lambda x: x[1], reverse=True)
best = all_pf[0] if all_pf else ("N/A", 0, 0)
above_1 = [r for r in results if r["pf"] > 1.0 and r["trades"] >= 20]
if above_1:
print(f"\n 🟡 PF > 1.0 조합 존재 ({len(above_1)}개), 단 < 1.5")
for r in sorted(above_1, key=lambda x: x["pf"], reverse=True):
emoji, _ = confidence(r["trades"])
print(f" {r['desc']}: PF={r['pf']:.2f}, {r['trades']}{emoji}")
print(f"\n → 약한 edge. 4월 데이터 축적 후 재검증 권장.")
else:
print(f"\n 🔴 PF > 1.0 조합 없음 (20건 이상)")
print(f" Best: {best[0]} (PF={best[1]:.2f}, {best[2]}건)")
print(f"\n → FR × OI 시그널도 비용 후 edge 없음")
# Best 조합 상세
valid_results = [r for r in results if r["trades"] > 10 and "df_trades" in r]
if valid_results:
best_r = max(valid_results, key=lambda x: x["pf"])
print(f"\n[참고] Best 조합 상세: {best_r['desc']}")
print("-" * 60)
tdf = best_r["df_trades"]
print(f" 기간: {tdf['entry_time'].min()} ~ {tdf['exit_time'].max()}")
print(f" 평균 피처값: {tdf['feature_val'].mean():.8f}")
w = tdf[tdf["net_return_bps"] > 0]
l = tdf[tdf["net_return_bps"] <= 0]
if len(w) > 0:
print(f" 수익 거래 평균: {w['net_return_bps'].mean():.1f} bps ({len(w)}건)")
if len(l) > 0:
print(f" 손실 거래 평균: {l['net_return_bps'].mean():.1f} bps ({len(l)}건)")
print("\n" + "=" * 80)
print(" 분석 완료.")
print("=" * 80)
if __name__ == "__main__":
asyncio.run(main())