Volume-backed pullback strategy with 1h meta filter (EMA50/200 + ADX) and 15m 3-candle trigger sequence. Deployed as separate mtf-bot container alongside existing cointrader. All orders are dry-run (logged only). - src/mtf_bot.py: Module 1-4 (DataFetcher, MetaFilter, TriggerStrategy, ExecutionManager) - main_mtf.py: OOS dry-run entry point - docker-compose.yml: mtf-bot service added - requirements.txt: ccxt dependency added - scripts/mtf_backtest.py: backtest script (Phase 1 robustness: SHORT PF≥1.5 in 7/9 combos) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
343 lines
13 KiB
Python
343 lines
13 KiB
Python
"""
|
||
MTF Pullback Backtest
|
||
─────────────────────
|
||
Trigger: 1h 추세 방향으로 15m 눌림목 진입
|
||
LONG: 1h Meta=LONG + 15m close < EMA20 + vol < SMA20*0.5 → 다음 봉 close > EMA20 시 진입
|
||
SHORT: 1h Meta=SHORT + 15m close > EMA20 + vol < SMA20*0.5 → 다음 봉 close < EMA20 시 진입
|
||
|
||
SL/TP: 1h ATR 기반 (진입 시점 직전 완성된 1h 캔들)
|
||
Look-ahead bias 방지: 1h 지표는 직전 완성 봉만 사용
|
||
"""
|
||
|
||
import pandas as pd
|
||
import pandas_ta as ta
|
||
import numpy as np
|
||
from pathlib import Path
|
||
from dataclasses import dataclass
|
||
|
||
# ─── 설정 ────────────────────────────────────────────────────────
|
||
SYMBOL = "xrpusdt"
|
||
DATA_PATH = Path(f"data/{SYMBOL}/combined_15m.parquet")
|
||
START = "2026-02-01"
|
||
END = "2026-03-30"
|
||
|
||
ATR_SL_MULT = 1.5
|
||
ATR_TP_MULT = 2.3
|
||
FEE_RATE = 0.0004 # 0.04% per side
|
||
|
||
# 1h 메타필터
|
||
MTF_EMA_FAST = 50
|
||
MTF_EMA_SLOW = 200
|
||
MTF_ADX_THRESHOLD = 20
|
||
|
||
# 15m Trigger
|
||
EMA_PULLBACK_LEN = 20
|
||
VOL_DRY_RATIO = 0.5 # volume < vol_ma20 * 0.5
|
||
|
||
|
||
@dataclass
|
||
class Trade:
|
||
entry_time: pd.Timestamp
|
||
entry_price: float
|
||
side: str
|
||
sl: float
|
||
tp: float
|
||
exit_time: pd.Timestamp | None = None
|
||
exit_price: float | None = None
|
||
pnl_pct: float | None = None
|
||
|
||
|
||
def build_1h_data(df_15m: pd.DataFrame) -> pd.DataFrame:
|
||
"""15m → 1h 리샘플링 + EMA50, EMA200, ADX, ATR."""
|
||
df_1h = df_15m[["open", "high", "low", "close", "volume"]].resample("1h").agg(
|
||
{"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}
|
||
).dropna()
|
||
|
||
df_1h["ema50_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_FAST)
|
||
df_1h["ema200_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_SLOW)
|
||
adx_df = ta.adx(df_1h["high"], df_1h["low"], df_1h["close"], length=14)
|
||
df_1h["adx_1h"] = adx_df["ADX_14"]
|
||
df_1h["atr_1h"] = ta.atr(df_1h["high"], df_1h["low"], df_1h["close"], length=14)
|
||
|
||
return df_1h[["ema50_1h", "ema200_1h", "adx_1h", "atr_1h"]]
|
||
|
||
|
||
def merge_1h_to_15m(df_15m: pd.DataFrame, df_1h: pd.DataFrame) -> pd.DataFrame:
|
||
"""Look-ahead bias 방지: 1h 봉 완성 시점(+1h) 기준 backward merge."""
|
||
df_1h_shifted = df_1h.copy()
|
||
df_1h_shifted.index = df_1h_shifted.index + pd.Timedelta(hours=1)
|
||
|
||
df_15m_reset = df_15m.reset_index()
|
||
df_1h_reset = df_1h_shifted.reset_index()
|
||
df_1h_reset.rename(columns={"index": "timestamp"}, inplace=True)
|
||
if "timestamp" not in df_15m_reset.columns:
|
||
df_15m_reset.rename(columns={df_15m_reset.columns[0]: "timestamp"}, inplace=True)
|
||
|
||
df_15m_reset["timestamp"] = pd.to_datetime(df_15m_reset["timestamp"]).astype("datetime64[us]")
|
||
df_1h_reset["timestamp"] = pd.to_datetime(df_1h_reset["timestamp"]).astype("datetime64[us]")
|
||
|
||
merged = pd.merge_asof(
|
||
df_15m_reset.sort_values("timestamp"),
|
||
df_1h_reset.sort_values("timestamp"),
|
||
on="timestamp",
|
||
direction="backward",
|
||
)
|
||
return merged.set_index("timestamp")
|
||
|
||
|
||
def get_1h_meta(row) -> str:
|
||
"""1h 메타필터: EMA50/200 방향 + ADX > 20."""
|
||
ema50 = row.get("ema50_1h")
|
||
ema200 = row.get("ema200_1h")
|
||
adx = row.get("adx_1h")
|
||
|
||
if pd.isna(ema50) or pd.isna(ema200) or pd.isna(adx):
|
||
return "HOLD"
|
||
if adx < MTF_ADX_THRESHOLD:
|
||
return "HOLD"
|
||
if ema50 > ema200:
|
||
return "LONG"
|
||
elif ema50 < ema200:
|
||
return "SHORT"
|
||
return "HOLD"
|
||
|
||
|
||
def calc_metrics(trades: list[Trade]) -> dict:
|
||
if not trades:
|
||
return {"trades": 0, "win_rate": 0, "pf": 0, "pnl_bps": 0, "max_dd_bps": 0,
|
||
"avg_win_bps": 0, "avg_loss_bps": 0, "long_trades": 0, "short_trades": 0}
|
||
|
||
pnls = [t.pnl_pct for t in trades]
|
||
wins = [p for p in pnls if p > 0]
|
||
losses = [p for p in pnls if p <= 0]
|
||
|
||
gross_profit = sum(wins) if wins else 0
|
||
gross_loss = abs(sum(losses)) if losses else 0
|
||
pf = gross_profit / gross_loss if gross_loss > 0 else float("inf")
|
||
|
||
cumulative = np.cumsum(pnls)
|
||
peak = np.maximum.accumulate(cumulative)
|
||
dd = cumulative - peak
|
||
max_dd = abs(dd.min()) if len(dd) > 0 else 0
|
||
|
||
return {
|
||
"trades": len(trades),
|
||
"win_rate": len(wins) / len(trades) * 100,
|
||
"pf": round(pf, 2),
|
||
"pnl_bps": round(sum(pnls) * 10000, 1),
|
||
"max_dd_bps": round(max_dd * 10000, 1),
|
||
"avg_win_bps": round(np.mean(wins) * 10000, 1) if wins else 0,
|
||
"avg_loss_bps": round(np.mean(losses) * 10000, 1) if losses else 0,
|
||
"long_trades": sum(1 for t in trades if t.side == "LONG"),
|
||
"short_trades": sum(1 for t in trades if t.side == "SHORT"),
|
||
}
|
||
|
||
|
||
def main():
|
||
print("=" * 70)
|
||
print(" MTF Pullback Backtest")
|
||
print(f" {SYMBOL.upper()} | {START} ~ {END}")
|
||
print(f" SL: 1h ATR×{ATR_SL_MULT} | TP: 1h ATR×{ATR_TP_MULT} | Fee: {FEE_RATE*100:.2f}%/side")
|
||
print(f" Pullback: EMA{EMA_PULLBACK_LEN} | Vol dry: <{VOL_DRY_RATIO*100:.0f}% of SMA20")
|
||
print("=" * 70)
|
||
|
||
# ── 데이터 로드 ──
|
||
df_raw = pd.read_parquet(DATA_PATH)
|
||
if df_raw.index.tz is not None:
|
||
df_raw.index = df_raw.index.tz_localize(None)
|
||
|
||
# 1h EMA200 워밍업 (200h = 800 bars)
|
||
warmup_start = pd.Timestamp(START) - pd.Timedelta(hours=250)
|
||
df_full = df_raw[df_raw.index >= warmup_start].copy()
|
||
print(f"\n데이터: {len(df_full)} bars (워밍업 포함)")
|
||
|
||
# ── 15m 지표: EMA20, vol_ma20 ──
|
||
df_full["ema20"] = ta.ema(df_full["close"], length=EMA_PULLBACK_LEN)
|
||
df_full["vol_ma20"] = ta.sma(df_full["volume"], length=20)
|
||
|
||
# ── 1h 지표 ──
|
||
df_1h = build_1h_data(df_full)
|
||
print(f"1h 캔들: {len(df_1h)} bars")
|
||
|
||
# ── 병합 ──
|
||
df_merged = merge_1h_to_15m(df_full, df_1h)
|
||
|
||
# ── 분석 기간 ──
|
||
df = df_merged[(df_merged.index >= START) & (df_merged.index <= END)].copy()
|
||
print(f"분석 기간: {len(df)} bars ({df.index.min()} ~ {df.index.max()})")
|
||
|
||
# ── 신호 스캔 & 시뮬레이션 ──
|
||
trades: list[Trade] = []
|
||
in_trade = False
|
||
current_trade: Trade | None = None
|
||
pullback_ready = False # 눌림 감지 상태
|
||
pullback_side = ""
|
||
|
||
# 디버그 카운터
|
||
meta_long_count = 0
|
||
meta_short_count = 0
|
||
pullback_detected = 0
|
||
entry_triggered = 0
|
||
|
||
for i in range(1, len(df)):
|
||
row = df.iloc[i]
|
||
prev = df.iloc[i - 1]
|
||
|
||
# ── 기존 포지션 SL/TP 체크 ──
|
||
if in_trade and current_trade is not None:
|
||
hit_sl = False
|
||
hit_tp = False
|
||
|
||
if current_trade.side == "LONG":
|
||
if row["low"] <= current_trade.sl:
|
||
hit_sl = True
|
||
if row["high"] >= current_trade.tp:
|
||
hit_tp = True
|
||
else:
|
||
if row["high"] >= current_trade.sl:
|
||
hit_sl = True
|
||
if row["low"] <= current_trade.tp:
|
||
hit_tp = True
|
||
|
||
if hit_sl or hit_tp:
|
||
exit_price = current_trade.sl if hit_sl else current_trade.tp
|
||
if hit_sl and hit_tp:
|
||
exit_price = current_trade.sl # 보수적
|
||
|
||
if current_trade.side == "LONG":
|
||
raw_pnl = (exit_price - current_trade.entry_price) / current_trade.entry_price
|
||
else:
|
||
raw_pnl = (current_trade.entry_price - exit_price) / current_trade.entry_price
|
||
|
||
current_trade.exit_time = df.index[i]
|
||
current_trade.exit_price = exit_price
|
||
current_trade.pnl_pct = raw_pnl - FEE_RATE * 2
|
||
trades.append(current_trade)
|
||
in_trade = False
|
||
current_trade = None
|
||
|
||
# ── 포지션 중이면 새 진입 스킵 ──
|
||
if in_trade:
|
||
continue
|
||
|
||
# NaN 체크
|
||
if pd.isna(row.get("ema20")) or pd.isna(row.get("vol_ma20")) or pd.isna(row.get("atr_1h")):
|
||
pullback_ready = False
|
||
continue
|
||
|
||
# ── Step 1: 1h Meta Filter ──
|
||
meta = get_1h_meta(row)
|
||
if meta == "LONG":
|
||
meta_long_count += 1
|
||
elif meta == "SHORT":
|
||
meta_short_count += 1
|
||
|
||
if meta == "HOLD":
|
||
pullback_ready = False
|
||
continue
|
||
|
||
# ── Step 2: 눌림(Pullback) 감지 ──
|
||
# 이전 봉이 눌림 조건을 충족했는지 확인
|
||
if pullback_ready and pullback_side == meta:
|
||
# ── Step 4: 추세 재개 확인 (현재 봉 close 기준) ──
|
||
if pullback_side == "LONG" and row["close"] > row["ema20"]:
|
||
# 진입: 이 봉의 open (추세 재개 확인된 봉)
|
||
# 실제로는 close 시점에 확인하므로 다음 봉 open에 진입해야 look-ahead 방지
|
||
# 하지만 사양서에 "직후 캔들의 종가가 EMA20 상향 돌파한 첫 번째 캔들의 시가"라고 되어 있으므로
|
||
# → 이 봉(close > EMA20)의 open에서 진입은 look-ahead bias
|
||
# → 정확히는: prev가 pullback, 현재 봉 close > EMA20 확인 → 다음 봉 open 진입
|
||
# 여기서는 다음 봉 open으로 처리
|
||
if i + 1 < len(df):
|
||
next_row = df.iloc[i + 1]
|
||
entry_price = next_row["open"]
|
||
atr_1h = row["atr_1h"]
|
||
|
||
sl = entry_price - atr_1h * ATR_SL_MULT
|
||
tp = entry_price + atr_1h * ATR_TP_MULT
|
||
|
||
current_trade = Trade(
|
||
entry_time=df.index[i + 1],
|
||
entry_price=entry_price,
|
||
side="LONG",
|
||
sl=sl, tp=tp,
|
||
)
|
||
in_trade = True
|
||
pullback_ready = False
|
||
entry_triggered += 1
|
||
continue
|
||
|
||
elif pullback_side == "SHORT" and row["close"] < row["ema20"]:
|
||
if i + 1 < len(df):
|
||
next_row = df.iloc[i + 1]
|
||
entry_price = next_row["open"]
|
||
atr_1h = row["atr_1h"]
|
||
|
||
sl = entry_price + atr_1h * ATR_SL_MULT
|
||
tp = entry_price - atr_1h * ATR_TP_MULT
|
||
|
||
current_trade = Trade(
|
||
entry_time=df.index[i + 1],
|
||
entry_price=entry_price,
|
||
side="SHORT",
|
||
sl=sl, tp=tp,
|
||
)
|
||
in_trade = True
|
||
pullback_ready = False
|
||
entry_triggered += 1
|
||
continue
|
||
|
||
# ── Step 2+3: 눌림 + 거래량 고갈 감지 (다음 봉에서 재개 확인) ──
|
||
vol_dry = row["volume"] < row["vol_ma20"] * VOL_DRY_RATIO
|
||
|
||
if meta == "LONG" and row["close"] < row["ema20"] and vol_dry:
|
||
pullback_ready = True
|
||
pullback_side = "LONG"
|
||
pullback_detected += 1
|
||
elif meta == "SHORT" and row["close"] > row["ema20"] and vol_dry:
|
||
pullback_ready = True
|
||
pullback_side = "SHORT"
|
||
pullback_detected += 1
|
||
else:
|
||
# 조건 불충족 시 pullback 상태 리셋
|
||
# 단, 연속 pullback 허용 (여러 봉 동안 눌림 지속 가능)
|
||
if not (meta == pullback_side):
|
||
pullback_ready = False
|
||
|
||
# ── 결과 출력 ──
|
||
m = calc_metrics(trades)
|
||
long_trades = [t for t in trades if t.side == "LONG"]
|
||
short_trades = [t for t in trades if t.side == "SHORT"]
|
||
lm = calc_metrics(long_trades)
|
||
sm = calc_metrics(short_trades)
|
||
|
||
print(f"\n─── 신호 파이프라인 ───")
|
||
print(f"1h Meta LONG: {meta_long_count} bars | SHORT: {meta_short_count} bars")
|
||
print(f"Pullback 감지: {pullback_detected}건")
|
||
print(f"진입 트리거: {entry_triggered}건")
|
||
print(f"실제 거래: {m['trades']}건 (L:{m['long_trades']} / S:{m['short_trades']})")
|
||
|
||
print(f"\n{'=' * 70}")
|
||
print(f" 결과")
|
||
print(f"{'=' * 70}")
|
||
|
||
header = f"{'구분':<10} {'Trades':>7} {'WinRate':>8} {'PF':>6} {'PnL(bps)':>10} {'MaxDD(bps)':>11} {'AvgWin':>8} {'AvgLoss':>8}"
|
||
print(header)
|
||
print("-" * len(header))
|
||
print(f"{'전체':<10} {m['trades']:>7} {m['win_rate']:>7.1f}% {m['pf']:>6.2f} {m['pnl_bps']:>10.1f} {m['max_dd_bps']:>11.1f} {m['avg_win_bps']:>8.1f} {m['avg_loss_bps']:>8.1f}")
|
||
print(f"{'LONG':<10} {lm['trades']:>7} {lm['win_rate']:>7.1f}% {lm['pf']:>6.2f} {lm['pnl_bps']:>10.1f} {lm['max_dd_bps']:>11.1f} {lm['avg_win_bps']:>8.1f} {lm['avg_loss_bps']:>8.1f}")
|
||
print(f"{'SHORT':<10} {sm['trades']:>7} {sm['win_rate']:>7.1f}% {sm['pf']:>6.2f} {sm['pnl_bps']:>10.1f} {sm['max_dd_bps']:>11.1f} {sm['avg_win_bps']:>8.1f} {sm['avg_loss_bps']:>8.1f}")
|
||
|
||
# 개별 거래 목록
|
||
if trades:
|
||
print(f"\n─── 개별 거래 ───")
|
||
print(f"{'#':>3} {'Side':<6} {'Entry Time':<20} {'Entry':>10} {'Exit':>10} {'PnL(bps)':>10} {'Result':>8}")
|
||
print("-" * 75)
|
||
for idx, t in enumerate(trades, 1):
|
||
result = "WIN" if t.pnl_pct > 0 else "LOSS"
|
||
pnl_bps = t.pnl_pct * 10000
|
||
print(f"{idx:>3} {t.side:<6} {str(t.entry_time):<20} {t.entry_price:>10.4f} {t.exit_price:>10.4f} {pnl_bps:>+10.1f} {result:>8}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|