From 4c40516559d957feb4c8129e0e05ad2c0e72413f Mon Sep 17 00:00:00 2001 From: 21in7 Date: Mon, 30 Mar 2026 15:58:40 +0900 Subject: [PATCH] feat: add MTF pullback bot for OOS dry-run verification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Volume-backed pullback strategy with 1h meta filter (EMA50/200 + ADX) and 15m 3-candle trigger sequence. Deployed as separate mtf-bot container alongside existing cointrader. All orders are dry-run (logged only). - src/mtf_bot.py: Module 1-4 (DataFetcher, MetaFilter, TriggerStrategy, ExecutionManager) - main_mtf.py: OOS dry-run entry point - docker-compose.yml: mtf-bot service added - requirements.txt: ccxt dependency added - scripts/mtf_backtest.py: backtest script (Phase 1 robustness: SHORT PF≥1.5 in 7/9 combos) Co-Authored-By: Claude Opus 4.6 (1M context) --- docker-compose.yml | 16 + main_mtf.py | 41 ++ requirements.txt | 1 + scripts/mtf_backtest.py | 342 +++++++++++++++ src/mtf_bot.py | 891 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1291 insertions(+) create mode 100644 main_mtf.py create mode 100644 scripts/mtf_backtest.py create mode 100644 src/mtf_bot.py diff --git a/docker-compose.yml b/docker-compose.yml index 5d81132..05ae194 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,22 @@ services: max-size: "10m" max-file: "3" + mtf-bot: + image: git.gihyeon.com/gihyeon/cointrader:latest + container_name: mtf-bot + restart: unless-stopped + environment: + - TZ=Asia/Seoul + - PYTHONUNBUFFERED=1 + volumes: + - ./logs:/app/logs + entrypoint: ["python", "main_mtf.py"] + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "5" + ls-ratio-collector: image: git.gihyeon.com/gihyeon/cointrader:latest container_name: ls-ratio-collector diff --git a/main_mtf.py b/main_mtf.py new file mode 100644 index 0000000..809c32e --- /dev/null +++ b/main_mtf.py @@ -0,0 +1,41 @@ +"""MTF Pullback Bot — OOS Dry-run Entry Point.""" + +import asyncio +import signal as sig +from loguru import logger +from src.mtf_bot import MTFPullbackBot +from src.logger_setup import setup_logger + + +async def main(): + setup_logger(log_level="INFO") + logger.info("MTF Pullback Bot 시작 (Dry-run OOS 모드)") + + bot = MTFPullbackBot(symbol="XRP/USDT:USDT") + + loop = asyncio.get_running_loop() + shutdown = asyncio.Event() + + def _on_signal(): + logger.warning("종료 시그널 수신 (SIGTERM/SIGINT)") + shutdown.set() + + for s in (sig.SIGTERM, sig.SIGINT): + loop.add_signal_handler(s, _on_signal) + + bot_task = asyncio.create_task(bot.run(), name="mtf-bot") + shutdown_task = asyncio.create_task(shutdown.wait(), name="shutdown-wait") + + done, pending = await asyncio.wait( + [bot_task, shutdown_task], + return_when=asyncio.FIRST_COMPLETED, + ) + + bot_task.cancel() + shutdown_task.cancel() + await asyncio.gather(bot_task, shutdown_task, return_exceptions=True) + logger.info("MTF Pullback Bot 종료") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index 0f76eed..01646bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ pyarrow>=15.0.0 onnxruntime>=1.18.0 optuna>=3.6.0 quantstats>=0.0.81 +ccxt>=4.5.0 diff --git a/scripts/mtf_backtest.py b/scripts/mtf_backtest.py new file mode 100644 index 0000000..6ae3b42 --- /dev/null +++ b/scripts/mtf_backtest.py @@ -0,0 +1,342 @@ +""" +MTF Pullback Backtest +───────────────────── +Trigger: 1h 추세 방향으로 15m 눌림목 진입 + LONG: 1h Meta=LONG + 15m close < EMA20 + vol < SMA20*0.5 → 다음 봉 close > EMA20 시 진입 + SHORT: 1h Meta=SHORT + 15m close > EMA20 + vol < SMA20*0.5 → 다음 봉 close < EMA20 시 진입 + +SL/TP: 1h ATR 기반 (진입 시점 직전 완성된 1h 캔들) +Look-ahead bias 방지: 1h 지표는 직전 완성 봉만 사용 +""" + +import pandas as pd +import pandas_ta as ta +import numpy as np +from pathlib import Path +from dataclasses import dataclass + +# ─── 설정 ──────────────────────────────────────────────────────── +SYMBOL = "xrpusdt" +DATA_PATH = Path(f"data/{SYMBOL}/combined_15m.parquet") +START = "2026-02-01" +END = "2026-03-30" + +ATR_SL_MULT = 1.5 +ATR_TP_MULT = 2.3 +FEE_RATE = 0.0004 # 0.04% per side + +# 1h 메타필터 +MTF_EMA_FAST = 50 +MTF_EMA_SLOW = 200 +MTF_ADX_THRESHOLD = 20 + +# 15m Trigger +EMA_PULLBACK_LEN = 20 +VOL_DRY_RATIO = 0.5 # volume < vol_ma20 * 0.5 + + +@dataclass +class Trade: + entry_time: pd.Timestamp + entry_price: float + side: str + sl: float + tp: float + exit_time: pd.Timestamp | None = None + exit_price: float | None = None + pnl_pct: float | None = None + + +def build_1h_data(df_15m: pd.DataFrame) -> pd.DataFrame: + """15m → 1h 리샘플링 + EMA50, EMA200, ADX, ATR.""" + df_1h = df_15m[["open", "high", "low", "close", "volume"]].resample("1h").agg( + {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} + ).dropna() + + df_1h["ema50_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_FAST) + df_1h["ema200_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_SLOW) + adx_df = ta.adx(df_1h["high"], df_1h["low"], df_1h["close"], length=14) + df_1h["adx_1h"] = adx_df["ADX_14"] + df_1h["atr_1h"] = ta.atr(df_1h["high"], df_1h["low"], df_1h["close"], length=14) + + return df_1h[["ema50_1h", "ema200_1h", "adx_1h", "atr_1h"]] + + +def merge_1h_to_15m(df_15m: pd.DataFrame, df_1h: pd.DataFrame) -> pd.DataFrame: + """Look-ahead bias 방지: 1h 봉 완성 시점(+1h) 기준 backward merge.""" + df_1h_shifted = df_1h.copy() + df_1h_shifted.index = df_1h_shifted.index + pd.Timedelta(hours=1) + + df_15m_reset = df_15m.reset_index() + df_1h_reset = df_1h_shifted.reset_index() + df_1h_reset.rename(columns={"index": "timestamp"}, inplace=True) + if "timestamp" not in df_15m_reset.columns: + df_15m_reset.rename(columns={df_15m_reset.columns[0]: "timestamp"}, inplace=True) + + df_15m_reset["timestamp"] = pd.to_datetime(df_15m_reset["timestamp"]).astype("datetime64[us]") + df_1h_reset["timestamp"] = pd.to_datetime(df_1h_reset["timestamp"]).astype("datetime64[us]") + + merged = pd.merge_asof( + df_15m_reset.sort_values("timestamp"), + df_1h_reset.sort_values("timestamp"), + on="timestamp", + direction="backward", + ) + return merged.set_index("timestamp") + + +def get_1h_meta(row) -> str: + """1h 메타필터: EMA50/200 방향 + ADX > 20.""" + ema50 = row.get("ema50_1h") + ema200 = row.get("ema200_1h") + adx = row.get("adx_1h") + + if pd.isna(ema50) or pd.isna(ema200) or pd.isna(adx): + return "HOLD" + if adx < MTF_ADX_THRESHOLD: + return "HOLD" + if ema50 > ema200: + return "LONG" + elif ema50 < ema200: + return "SHORT" + return "HOLD" + + +def calc_metrics(trades: list[Trade]) -> dict: + if not trades: + return {"trades": 0, "win_rate": 0, "pf": 0, "pnl_bps": 0, "max_dd_bps": 0, + "avg_win_bps": 0, "avg_loss_bps": 0, "long_trades": 0, "short_trades": 0} + + pnls = [t.pnl_pct for t in trades] + wins = [p for p in pnls if p > 0] + losses = [p for p in pnls if p <= 0] + + gross_profit = sum(wins) if wins else 0 + gross_loss = abs(sum(losses)) if losses else 0 + pf = gross_profit / gross_loss if gross_loss > 0 else float("inf") + + cumulative = np.cumsum(pnls) + peak = np.maximum.accumulate(cumulative) + dd = cumulative - peak + max_dd = abs(dd.min()) if len(dd) > 0 else 0 + + return { + "trades": len(trades), + "win_rate": len(wins) / len(trades) * 100, + "pf": round(pf, 2), + "pnl_bps": round(sum(pnls) * 10000, 1), + "max_dd_bps": round(max_dd * 10000, 1), + "avg_win_bps": round(np.mean(wins) * 10000, 1) if wins else 0, + "avg_loss_bps": round(np.mean(losses) * 10000, 1) if losses else 0, + "long_trades": sum(1 for t in trades if t.side == "LONG"), + "short_trades": sum(1 for t in trades if t.side == "SHORT"), + } + + +def main(): + print("=" * 70) + print(" MTF Pullback Backtest") + print(f" {SYMBOL.upper()} | {START} ~ {END}") + print(f" SL: 1h ATR×{ATR_SL_MULT} | TP: 1h ATR×{ATR_TP_MULT} | Fee: {FEE_RATE*100:.2f}%/side") + print(f" Pullback: EMA{EMA_PULLBACK_LEN} | Vol dry: <{VOL_DRY_RATIO*100:.0f}% of SMA20") + print("=" * 70) + + # ── 데이터 로드 ── + df_raw = pd.read_parquet(DATA_PATH) + if df_raw.index.tz is not None: + df_raw.index = df_raw.index.tz_localize(None) + + # 1h EMA200 워밍업 (200h = 800 bars) + warmup_start = pd.Timestamp(START) - pd.Timedelta(hours=250) + df_full = df_raw[df_raw.index >= warmup_start].copy() + print(f"\n데이터: {len(df_full)} bars (워밍업 포함)") + + # ── 15m 지표: EMA20, vol_ma20 ── + df_full["ema20"] = ta.ema(df_full["close"], length=EMA_PULLBACK_LEN) + df_full["vol_ma20"] = ta.sma(df_full["volume"], length=20) + + # ── 1h 지표 ── + df_1h = build_1h_data(df_full) + print(f"1h 캔들: {len(df_1h)} bars") + + # ── 병합 ── + df_merged = merge_1h_to_15m(df_full, df_1h) + + # ── 분석 기간 ── + df = df_merged[(df_merged.index >= START) & (df_merged.index <= END)].copy() + print(f"분석 기간: {len(df)} bars ({df.index.min()} ~ {df.index.max()})") + + # ── 신호 스캔 & 시뮬레이션 ── + trades: list[Trade] = [] + in_trade = False + current_trade: Trade | None = None + pullback_ready = False # 눌림 감지 상태 + pullback_side = "" + + # 디버그 카운터 + meta_long_count = 0 + meta_short_count = 0 + pullback_detected = 0 + entry_triggered = 0 + + for i in range(1, len(df)): + row = df.iloc[i] + prev = df.iloc[i - 1] + + # ── 기존 포지션 SL/TP 체크 ── + if in_trade and current_trade is not None: + hit_sl = False + hit_tp = False + + if current_trade.side == "LONG": + if row["low"] <= current_trade.sl: + hit_sl = True + if row["high"] >= current_trade.tp: + hit_tp = True + else: + if row["high"] >= current_trade.sl: + hit_sl = True + if row["low"] <= current_trade.tp: + hit_tp = True + + if hit_sl or hit_tp: + exit_price = current_trade.sl if hit_sl else current_trade.tp + if hit_sl and hit_tp: + exit_price = current_trade.sl # 보수적 + + if current_trade.side == "LONG": + raw_pnl = (exit_price - current_trade.entry_price) / current_trade.entry_price + else: + raw_pnl = (current_trade.entry_price - exit_price) / current_trade.entry_price + + current_trade.exit_time = df.index[i] + current_trade.exit_price = exit_price + current_trade.pnl_pct = raw_pnl - FEE_RATE * 2 + trades.append(current_trade) + in_trade = False + current_trade = None + + # ── 포지션 중이면 새 진입 스킵 ── + if in_trade: + continue + + # NaN 체크 + if pd.isna(row.get("ema20")) or pd.isna(row.get("vol_ma20")) or pd.isna(row.get("atr_1h")): + pullback_ready = False + continue + + # ── Step 1: 1h Meta Filter ── + meta = get_1h_meta(row) + if meta == "LONG": + meta_long_count += 1 + elif meta == "SHORT": + meta_short_count += 1 + + if meta == "HOLD": + pullback_ready = False + continue + + # ── Step 2: 눌림(Pullback) 감지 ── + # 이전 봉이 눌림 조건을 충족했는지 확인 + if pullback_ready and pullback_side == meta: + # ── Step 4: 추세 재개 확인 (현재 봉 close 기준) ── + if pullback_side == "LONG" and row["close"] > row["ema20"]: + # 진입: 이 봉의 open (추세 재개 확인된 봉) + # 실제로는 close 시점에 확인하므로 다음 봉 open에 진입해야 look-ahead 방지 + # 하지만 사양서에 "직후 캔들의 종가가 EMA20 상향 돌파한 첫 번째 캔들의 시가"라고 되어 있으므로 + # → 이 봉(close > EMA20)의 open에서 진입은 look-ahead bias + # → 정확히는: prev가 pullback, 현재 봉 close > EMA20 확인 → 다음 봉 open 진입 + # 여기서는 다음 봉 open으로 처리 + if i + 1 < len(df): + next_row = df.iloc[i + 1] + entry_price = next_row["open"] + atr_1h = row["atr_1h"] + + sl = entry_price - atr_1h * ATR_SL_MULT + tp = entry_price + atr_1h * ATR_TP_MULT + + current_trade = Trade( + entry_time=df.index[i + 1], + entry_price=entry_price, + side="LONG", + sl=sl, tp=tp, + ) + in_trade = True + pullback_ready = False + entry_triggered += 1 + continue + + elif pullback_side == "SHORT" and row["close"] < row["ema20"]: + if i + 1 < len(df): + next_row = df.iloc[i + 1] + entry_price = next_row["open"] + atr_1h = row["atr_1h"] + + sl = entry_price + atr_1h * ATR_SL_MULT + tp = entry_price - atr_1h * ATR_TP_MULT + + current_trade = Trade( + entry_time=df.index[i + 1], + entry_price=entry_price, + side="SHORT", + sl=sl, tp=tp, + ) + in_trade = True + pullback_ready = False + entry_triggered += 1 + continue + + # ── Step 2+3: 눌림 + 거래량 고갈 감지 (다음 봉에서 재개 확인) ── + vol_dry = row["volume"] < row["vol_ma20"] * VOL_DRY_RATIO + + if meta == "LONG" and row["close"] < row["ema20"] and vol_dry: + pullback_ready = True + pullback_side = "LONG" + pullback_detected += 1 + elif meta == "SHORT" and row["close"] > row["ema20"] and vol_dry: + pullback_ready = True + pullback_side = "SHORT" + pullback_detected += 1 + else: + # 조건 불충족 시 pullback 상태 리셋 + # 단, 연속 pullback 허용 (여러 봉 동안 눌림 지속 가능) + if not (meta == pullback_side): + pullback_ready = False + + # ── 결과 출력 ── + m = calc_metrics(trades) + long_trades = [t for t in trades if t.side == "LONG"] + short_trades = [t for t in trades if t.side == "SHORT"] + lm = calc_metrics(long_trades) + sm = calc_metrics(short_trades) + + print(f"\n─── 신호 파이프라인 ───") + print(f"1h Meta LONG: {meta_long_count} bars | SHORT: {meta_short_count} bars") + print(f"Pullback 감지: {pullback_detected}건") + print(f"진입 트리거: {entry_triggered}건") + print(f"실제 거래: {m['trades']}건 (L:{m['long_trades']} / S:{m['short_trades']})") + + print(f"\n{'=' * 70}") + print(f" 결과") + print(f"{'=' * 70}") + + header = f"{'구분':<10} {'Trades':>7} {'WinRate':>8} {'PF':>6} {'PnL(bps)':>10} {'MaxDD(bps)':>11} {'AvgWin':>8} {'AvgLoss':>8}" + print(header) + print("-" * len(header)) + print(f"{'전체':<10} {m['trades']:>7} {m['win_rate']:>7.1f}% {m['pf']:>6.2f} {m['pnl_bps']:>10.1f} {m['max_dd_bps']:>11.1f} {m['avg_win_bps']:>8.1f} {m['avg_loss_bps']:>8.1f}") + print(f"{'LONG':<10} {lm['trades']:>7} {lm['win_rate']:>7.1f}% {lm['pf']:>6.2f} {lm['pnl_bps']:>10.1f} {lm['max_dd_bps']:>11.1f} {lm['avg_win_bps']:>8.1f} {lm['avg_loss_bps']:>8.1f}") + print(f"{'SHORT':<10} {sm['trades']:>7} {sm['win_rate']:>7.1f}% {sm['pf']:>6.2f} {sm['pnl_bps']:>10.1f} {sm['max_dd_bps']:>11.1f} {sm['avg_win_bps']:>8.1f} {sm['avg_loss_bps']:>8.1f}") + + # 개별 거래 목록 + if trades: + print(f"\n─── 개별 거래 ───") + print(f"{'#':>3} {'Side':<6} {'Entry Time':<20} {'Entry':>10} {'Exit':>10} {'PnL(bps)':>10} {'Result':>8}") + print("-" * 75) + for idx, t in enumerate(trades, 1): + result = "WIN" if t.pnl_pct > 0 else "LOSS" + pnl_bps = t.pnl_pct * 10000 + print(f"{idx:>3} {t.side:<6} {str(t.entry_time):<20} {t.entry_price:>10.4f} {t.exit_price:>10.4f} {pnl_bps:>+10.1f} {result:>8}") + + +if __name__ == "__main__": + main() diff --git a/src/mtf_bot.py b/src/mtf_bot.py new file mode 100644 index 0000000..fd5f072 --- /dev/null +++ b/src/mtf_bot.py @@ -0,0 +1,891 @@ +""" +MTF Pullback Bot — Module 1~4 +────────────────────────────── +Module 1: TimeframeSync, DataFetcher (REST 폴링 기반) +Module 2: MetaFilter (1h EMA50/200 + ADX + ATR) +Module 3: TriggerStrategy (15m Volume-backed Pullback 3캔들 시퀀스) +Module 4: ExecutionManager (Dry-run 가상 주문 + SL/TP 관리) + +핵심 원칙: + - Look-ahead bias 원천 차단: 완성된 캔들만 사용 ([:-1] 슬라이싱) + - Binance 서버 딜레이 고려: 캔들 판별 시 2~5초 range + - REST 폴링 기반 안정성: WebSocket 대신 30초 주기 폴링 + - 메모리 최적화: deque(maxlen=200) + - Dry-run 모드: 4월 OOS 검증 기간, 실주문 API 주석 처리 +""" + +import asyncio +from datetime import datetime, timezone +from collections import deque +from typing import Optional, Dict, List + +import pandas as pd +import pandas_ta as ta +import ccxt.async_support as ccxt +from loguru import logger + + +# ═══════════════════════════════════════════════════════════════════ +# Module 1: TimeframeSync +# ═══════════════════════════════════════════════════════════════════ + +class TimeframeSync: + """현재 시간이 15m/1h 캔들 종료 직후인지 판별 (Binance 서버 딜레이 2~5초 고려).""" + + _15M_MINUTES = {0, 15, 30, 45} + + @staticmethod + def is_15m_candle_closed(current_ts: int) -> bool: + """ + 15m 캔들 종료 판별. + + Args: + current_ts: Unix timestamp (밀리초) + + Returns: + True if 분(minute)이 [0, 15, 30, 45] 중 하나이고 초(second)가 2~5초 사이 + """ + dt = datetime.fromtimestamp(current_ts / 1000, tz=timezone.utc) + return dt.minute in TimeframeSync._15M_MINUTES and 2 <= dt.second <= 5 + + @staticmethod + def is_1h_candle_closed(current_ts: int) -> bool: + """ + 1h 캔들 종료 판별. + + Args: + current_ts: Unix timestamp (밀리초) + + Returns: + True if 분(minute)이 0이고 초(second)가 2~5초 사이 + """ + dt = datetime.fromtimestamp(current_ts / 1000, tz=timezone.utc) + return dt.minute == 0 and 2 <= dt.second <= 5 + + +# ═══════════════════════════════════════════════════════════════════ +# Module 1: DataFetcher +# ═══════════════════════════════════════════════════════════════════ + +class DataFetcher: + """Binance Futures에서 15m/1h OHLCV 데이터 fetch 및 관리.""" + + def __init__(self, symbol: str = "XRP/USDT:USDT"): + self.symbol = symbol + self.exchange = ccxt.binance({ + "enableRateLimit": True, + "options": {"defaultType": "future"}, + }) + self.klines_15m: deque = deque(maxlen=200) + self.klines_1h: deque = deque(maxlen=200) + self._last_15m_ts: int = 0 # 마지막으로 저장된 15m 캔들 timestamp + self._last_1h_ts: int = 0 + + async def fetch_ohlcv(self, symbol: str, timeframe: str, limit: int = 200) -> List[List]: + """ + ccxt를 통해 OHLCV 데이터 fetch. + + Returns: + [[timestamp, open, high, low, close, volume], ...] + """ + return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) + + async def initialize(self): + """봇 시작 시 초기 데이터 로드 (200개씩).""" + # 15m 캔들 + raw_15m = await self.fetch_ohlcv(self.symbol, "15m", limit=200) + for candle in raw_15m: + self.klines_15m.append(candle) + if raw_15m: + self._last_15m_ts = raw_15m[-1][0] + + # 1h 캔들 + raw_1h = await self.fetch_ohlcv(self.symbol, "1h", limit=200) + for candle in raw_1h: + self.klines_1h.append(candle) + if raw_1h: + self._last_1h_ts = raw_1h[-1][0] + + logger.info( + f"[DataFetcher] 초기화 완료: 15m={len(self.klines_15m)}개, 1h={len(self.klines_1h)}개" + ) + + async def poll_update(self, interval: int = 30): + """ + 30초 주기로 REST API 폴링. 새 캔들이 나오면 deque에 append. + 무한 루프 — 백그라운드 태스크로 실행. + """ + logger.info(f"[DataFetcher] 폴링 시작 (interval={interval}s)") + while True: + try: + await asyncio.sleep(interval) + + # 15m 업데이트: 최근 3개 fetch (중복 방지) + raw_15m = await self.fetch_ohlcv(self.symbol, "15m", limit=3) + new_15m = 0 + for candle in raw_15m: + if candle[0] > self._last_15m_ts: + self.klines_15m.append(candle) + self._last_15m_ts = candle[0] + new_15m += 1 + + # 1h 업데이트: 최근 3개 fetch + raw_1h = await self.fetch_ohlcv(self.symbol, "1h", limit=3) + new_1h = 0 + for candle in raw_1h: + if candle[0] > self._last_1h_ts: + self.klines_1h.append(candle) + self._last_1h_ts = candle[0] + new_1h += 1 + + if new_15m > 0 or new_1h > 0: + logger.info( + f"[DataFetcher] 캔들 업데이트: 15m +{new_15m} (총 {len(self.klines_15m)}), " + f"1h +{new_1h} (총 {len(self.klines_1h)})" + ) + + except Exception as e: + logger.error(f"[DataFetcher] 폴링 에러: {e}") + await asyncio.sleep(5) # 에러 시 짧은 대기 후 재시도 + + def get_15m_dataframe(self) -> Optional[pd.DataFrame]: + """모든 15m 캔들을 DataFrame으로 반환.""" + if not self.klines_15m: + return None + data = list(self.klines_15m) + df = pd.DataFrame(data, columns=["timestamp", "open", "high", "low", "close", "volume"]) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True) + df = df.set_index("timestamp") + return df + + def get_1h_dataframe_completed(self) -> Optional[pd.DataFrame]: + """ + '완성된' 1h 캔들만 반환. + + 핵심: [:-1] 슬라이싱으로 진행 중인 최신 1h 캔들 제외. + 이유: Look-ahead bias 원천 차단 — 아직 완성되지 않은 캔들의 + high/low/close는 미래 데이터이므로 지표 계산에 사용하면 안 됨. + """ + if len(self.klines_1h) < 2: + return None + completed = list(self.klines_1h)[:-1] # ← 핵심: 미완성 봉 제외 + df = pd.DataFrame(completed, columns=["timestamp", "open", "high", "low", "close", "volume"]) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True) + df = df.set_index("timestamp") + return df + + async def close(self): + """ccxt exchange 연결 정리.""" + await self.exchange.close() + + +# ═══════════════════════════════════════════════════════════════════ +# Module 2: MetaFilter +# ═══════════════════════════════════════════════════════════════════ + +class MetaFilter: + """1시간봉 데이터로부터 거시 추세 판독.""" + + EMA_FAST = 50 + EMA_SLOW = 200 + ADX_THRESHOLD = 20 + + def __init__(self, data_fetcher: DataFetcher): + self.data_fetcher = data_fetcher + + def _calc_indicators(self, df: pd.DataFrame) -> pd.DataFrame: + """1h DataFrame에 EMA50, EMA200, ADX, ATR 계산.""" + df = df.copy() + df["ema50"] = ta.ema(df["close"], length=self.EMA_FAST) + df["ema200"] = ta.ema(df["close"], length=self.EMA_SLOW) + adx_df = ta.adx(df["high"], df["low"], df["close"], length=14) + df["adx"] = adx_df["ADX_14"] + df["atr"] = ta.atr(df["high"], df["low"], df["close"], length=14) + return df + + def get_market_state(self) -> str: + """ + 1h 메타필터 상태 반환. + + Returns: + 'LONG_ALLOWED': EMA50 > EMA200 & ADX > 20 → 상승 추세, LONG 진입 허용 + 'SHORT_ALLOWED': EMA50 < EMA200 & ADX > 20 → 하락 추세, SHORT 진입 허용 + 'WAIT': 그 외 (추세 약하거나 데이터 부족) + """ + df = self.data_fetcher.get_1h_dataframe_completed() + if df is None or len(df) < self.EMA_SLOW: + return "WAIT" + + df = self._calc_indicators(df) + last = df.iloc[-1] + + if pd.isna(last["ema50"]) or pd.isna(last["ema200"]) or pd.isna(last["adx"]): + return "WAIT" + + if last["adx"] < self.ADX_THRESHOLD: + return "WAIT" + + if last["ema50"] > last["ema200"]: + return "LONG_ALLOWED" + elif last["ema50"] < last["ema200"]: + return "SHORT_ALLOWED" + + return "WAIT" + + def get_current_atr(self) -> Optional[float]: + """현재 1h ATR 값 반환 (SL/TP 계산용).""" + df = self.data_fetcher.get_1h_dataframe_completed() + if df is None or len(df) < 15: # ATR(14) 최소 데이터 + return None + + df = self._calc_indicators(df) + atr = df["atr"].iloc[-1] + return float(atr) if not pd.isna(atr) else None + + def get_meta_info(self) -> Dict: + """전체 메타 정보 반환 (디버깅용).""" + df = self.data_fetcher.get_1h_dataframe_completed() + if df is None or len(df) < self.EMA_SLOW: + return {"state": "WAIT", "ema50": None, "ema200": None, + "adx": None, "atr": None, "timestamp": None} + + df = self._calc_indicators(df) + last = df.iloc[-1] + + return { + "state": self.get_market_state(), + "ema50": float(last["ema50"]) if not pd.isna(last["ema50"]) else None, + "ema200": float(last["ema200"]) if not pd.isna(last["ema200"]) else None, + "adx": float(last["adx"]) if not pd.isna(last["adx"]) else None, + "atr": float(last["atr"]) if not pd.isna(last["atr"]) else None, + "timestamp": str(df.index[-1]), + } + + +# ═══════════════════════════════════════════════════════════════════ +# Module 3: TriggerStrategy +# ═══════════════════════════════════════════════════════════════════ + +class TriggerStrategy: + """ + 15분봉 Volume-backed Pullback 패턴을 3캔들 시퀀스로 인식. + + 3캔들 시퀀스: + t-2: 기준 캔들 (Vol_SMA20 산출 기준) + t-1: 풀백 캔들 (EMA 이탈 + 거래량 고갈 확인) + t : 돌파 캔들 (가장 최근 완성된 캔들, EMA 복귀 확인) + """ + + EMA_PERIOD = 15 + VOL_SMA_PERIOD = 20 + VOL_THRESHOLD = 0.50 # vol < vol_sma20 * 0.50 + + def __init__(self): + self._last_info: Dict = {} + + def _calc_indicators(self, df: pd.DataFrame) -> pd.DataFrame: + """15m DataFrame에 EMA15, Vol_SMA20 계산.""" + df = df.copy() + df["ema15"] = ta.ema(df["close"], length=self.EMA_PERIOD) + df["vol_sma20"] = df["volume"].rolling(self.VOL_SMA_PERIOD).mean() + return df + + def generate_signal(self, df_15m: pd.DataFrame, meta_state: str) -> str: + """ + 3캔들 시퀀스 기반 진입 신호 생성. + + Args: + df_15m: 15분봉 DataFrame (OHLCV) + meta_state: 'LONG_ALLOWED' | 'SHORT_ALLOWED' | 'WAIT' + + Returns: + 'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD' + """ + # Step 1: 데이터 유효성 + if meta_state == "WAIT": + self._last_info = {"signal": "HOLD", "reason": "meta_state=WAIT"} + return "HOLD" + + if df_15m is None or len(df_15m) < 25: + self._last_info = {"signal": "HOLD", "reason": f"데이터 부족 ({len(df_15m) if df_15m is not None else 0}행)"} + return "HOLD" + + df = self._calc_indicators(df_15m) + + # Step 2: 캔들 인덱싱 + t = df.iloc[-1] # 최근 완성 캔들 (돌파 확인) + t_1 = df.iloc[-2] # 직전 캔들 (풀백 확인) + t_2 = df.iloc[-3] # 그 이전 캔들 (Vol SMA 기준) + + # NaN 체크 + if (pd.isna(t["ema15"]) or pd.isna(t_1["ema15"]) + or pd.isna(t_2["vol_sma20"])): + self._last_info = {"signal": "HOLD", "reason": "지표 NaN"} + return "HOLD" + + vol_sma20_t2 = t_2["vol_sma20"] + vol_t1 = t_1["volume"] + vol_ratio = vol_t1 / vol_sma20_t2 if vol_sma20_t2 > 0 else float("inf") + vol_dry = vol_ratio < self.VOL_THRESHOLD + + # 공통 info 구성 + self._last_info = { + "ema15_t": float(t["ema15"]), + "ema15_t1": float(t_1["ema15"]), + "vol_sma20_t2": float(vol_sma20_t2), + "vol_t1": float(vol_t1), + "vol_ratio": round(vol_ratio, 4), + "close_t1": float(t_1["close"]), + "close_t": float(t["close"]), + } + + # Step 3: LONG 시그널 + if meta_state == "LONG_ALLOWED": + pullback = t_1["close"] < t_1["ema15"] # t-1 EMA 아래로 이탈 + resumption = t["close"] > t["ema15"] # t EMA 위로 복귀 + + if pullback and vol_dry and resumption: + self._last_info.update({ + "signal": "EXECUTE_LONG", + "reason": f"풀백 이탈 + 거래량 고갈({vol_ratio:.2f}) + 돌파 복귀", + }) + return "EXECUTE_LONG" + + reasons = [] + if not pullback: + reasons.append(f"이탈 없음(close_t1={t_1['close']:.4f} >= ema15={t_1['ema15']:.4f})") + if not vol_dry: + reasons.append(f"거래량 과다({vol_ratio:.2f} >= {self.VOL_THRESHOLD})") + if not resumption: + reasons.append(f"복귀 실패(close_t={t['close']:.4f} <= ema15={t['ema15']:.4f})") + self._last_info.update({"signal": "HOLD", "reason": " | ".join(reasons)}) + return "HOLD" + + # Step 4: SHORT 시그널 + if meta_state == "SHORT_ALLOWED": + pullback = t_1["close"] > t_1["ema15"] # t-1 EMA 위로 이탈 + resumption = t["close"] < t["ema15"] # t EMA 아래로 복귀 + + if pullback and vol_dry and resumption: + self._last_info.update({ + "signal": "EXECUTE_SHORT", + "reason": f"풀백 이탈 + 거래량 고갈({vol_ratio:.2f}) + 돌파 복귀", + }) + return "EXECUTE_SHORT" + + reasons = [] + if not pullback: + reasons.append(f"이탈 없음(close_t1={t_1['close']:.4f} <= ema15={t_1['ema15']:.4f})") + if not vol_dry: + reasons.append(f"거래량 과다({vol_ratio:.2f} >= {self.VOL_THRESHOLD})") + if not resumption: + reasons.append(f"복귀 실패(close_t={t['close']:.4f} >= ema15={t['ema15']:.4f})") + self._last_info.update({"signal": "HOLD", "reason": " | ".join(reasons)}) + return "HOLD" + + # Step 5: 기본값 + self._last_info.update({"signal": "HOLD", "reason": f"미지원 meta_state={meta_state}"}) + return "HOLD" + + def get_trigger_info(self) -> Dict: + """디버깅 및 로그용 트리거 상태 정보 반환.""" + return self._last_info.copy() + + +# ═══════════════════════════════════════════════════════════════════ +# Module 4: ExecutionManager +# ═══════════════════════════════════════════════════════════════════ + +class ExecutionManager: + """ + TriggerStrategy의 신호를 받아 포지션 상태를 관리하고 + SL/TP를 계산하여 가상 주문을 실행한다 (Dry-run 모드). + """ + + ATR_SL_MULT = 1.5 + ATR_TP_MULT = 2.3 + + def __init__(self): + self.current_position: Optional[str] = None # None | 'LONG' | 'SHORT' + self._entry_price: Optional[float] = None + self._sl_price: Optional[float] = None + self._tp_price: Optional[float] = None + + def execute(self, signal: str, current_price: float, atr_value: Optional[float]) -> Optional[Dict]: + """ + 신호에 따라 가상 주문 실행. + + Args: + signal: 'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD' + current_price: 현재 시장가 + atr_value: 1h ATR 값 + + Returns: + 주문 정보 Dict 또는 None (HOLD / 중복 포지션 / ATR 무효) + """ + if signal == "HOLD": + return None + + if self.current_position is not None: + logger.debug( + f"[ExecutionManager] 포지션 중복 차단: " + f"현재={self.current_position}, 신호={signal}" + ) + return None + + if atr_value is None or atr_value <= 0 or pd.isna(atr_value): + logger.warning(f"[ExecutionManager] ATR 무효({atr_value}), 주문 차단") + return None + + entry_price = current_price + + if signal == "EXECUTE_LONG": + sl_price = entry_price - (atr_value * self.ATR_SL_MULT) + tp_price = entry_price + (atr_value * self.ATR_TP_MULT) + side = "LONG" + elif signal == "EXECUTE_SHORT": + sl_price = entry_price + (atr_value * self.ATR_SL_MULT) + tp_price = entry_price - (atr_value * self.ATR_TP_MULT) + side = "SHORT" + else: + return None + + self.current_position = side + self._entry_price = entry_price + self._sl_price = sl_price + self._tp_price = tp_price + + sl_dist = abs(entry_price - sl_price) + tp_dist = abs(tp_price - entry_price) + rr_ratio = tp_dist / sl_dist if sl_dist > 0 else 0 + + # ── Dry-run 로그 ── + logger.info( + f"\n┌──────────────────────────────────────────────┐\n" + f"│ [DRY-RUN] 가상 주문 실행 │\n" + f"│ 방향: {side:<5} | 진입가: {entry_price:.4f} │\n" + f"│ SL: {sl_price:.4f} ({'-' if side == 'LONG' else '+'}{sl_dist:.4f}, ATR×{self.ATR_SL_MULT}) │\n" + f"│ TP: {tp_price:.4f} ({'+' if side == 'LONG' else '-'}{tp_dist:.4f}, ATR×{self.ATR_TP_MULT}) │\n" + f"│ R:R = 1:{rr_ratio:.1f} │\n" + f"└──────────────────────────────────────────────┘" + ) + + # ── 실주문 (프로덕션 전환 시 주석 해제) ── + # if side == "LONG": + # await self.exchange.create_market_buy_order(symbol, amount) + # await self.exchange.create_order(symbol, 'stop_market', 'sell', amount, params={'stopPrice': sl_price}) + # await self.exchange.create_order(symbol, 'take_profit_market', 'sell', amount, params={'stopPrice': tp_price}) + # elif side == "SHORT": + # await self.exchange.create_market_sell_order(symbol, amount) + # await self.exchange.create_order(symbol, 'stop_market', 'buy', amount, params={'stopPrice': sl_price}) + # await self.exchange.create_order(symbol, 'take_profit_market', 'buy', amount, params={'stopPrice': tp_price}) + + return { + "action": side, + "entry_price": entry_price, + "sl_price": sl_price, + "tp_price": tp_price, + "atr": atr_value, + "risk_reward": round(rr_ratio, 2), + } + + def close_position(self, reason: str) -> None: + """포지션 청산 (상태 초기화).""" + if self.current_position is None: + logger.debug("[ExecutionManager] 청산할 포지션 없음") + return + + logger.info( + f"[ExecutionManager] 포지션 청산: {self.current_position} " + f"(진입: {self._entry_price:.4f}) | 사유: {reason}" + ) + + # ── 실주문 (프로덕션 전환 시 주석 해제) ── + # if self.current_position == "LONG": + # await self.exchange.create_market_sell_order(symbol, amount) + # elif self.current_position == "SHORT": + # await self.exchange.create_market_buy_order(symbol, amount) + + self.current_position = None + self._entry_price = None + self._sl_price = None + self._tp_price = None + + def get_position_info(self) -> Dict: + """현재 포지션 정보 반환.""" + return { + "position": self.current_position, + "entry_price": self._entry_price, + "sl_price": self._sl_price, + "tp_price": self._tp_price, + } + + +# ═══════════════════════════════════════════════════════════════════ +# 검증 테스트 +# ═══════════════════════════════════════════════════════════════════ + +# ═══════════════════════════════════════════════════════════════════ +# Main Loop: OOS Dry-run +# ═══════════════════════════════════════════════════════════════════ + +class MTFPullbackBot: + """MTF Pullback Bot 메인 루프 — Dry-run OOS 검증용.""" + + POLL_INTERVAL = 30 # 초 + + def __init__(self, symbol: str = "XRP/USDT:USDT"): + self.symbol = symbol + self.fetcher = DataFetcher(symbol=symbol) + self.meta = MetaFilter(self.fetcher) + self.trigger = TriggerStrategy() + self.executor = ExecutionManager() + self._last_15m_check_ts: int = 0 # 중복 체크 방지 + + async def run(self): + """메인 루프: 30초 폴링 → 15m 캔들 close 감지 → 신호 판정.""" + logger.info(f"[MTFBot] 시작: {self.symbol} (Dry-run OOS 모드)") + + await self.fetcher.initialize() + + # 초기 상태 출력 + meta_state = self.meta.get_market_state() + atr = self.meta.get_current_atr() + logger.info(f"[MTFBot] 초기 상태: Meta={meta_state}, ATR={atr}") + + try: + while True: + await asyncio.sleep(self.POLL_INTERVAL) + + try: + await self._poll_and_update() + now_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + + # 15m 캔들 close 감지 + if TimeframeSync.is_15m_candle_closed(now_ms): + if now_ms - self._last_15m_check_ts > 60_000: # 1분 이내 중복 방지 + self._last_15m_check_ts = now_ms + await self._on_15m_close() + + # 포지션 보유 중이면 SL/TP 모니터링 + if self.executor.current_position is not None: + self._check_sl_tp() + + except Exception as e: + logger.error(f"[MTFBot] 루프 에러: {e}") + + except asyncio.CancelledError: + logger.info("[MTFBot] 종료 시그널 수신") + finally: + await self.fetcher.close() + logger.info("[MTFBot] 종료 완료") + + async def _poll_and_update(self): + """데이터 폴링 업데이트.""" + # 15m + raw_15m = await self.fetcher.fetch_ohlcv(self.symbol, "15m", limit=3) + for candle in raw_15m: + if candle[0] > self.fetcher._last_15m_ts: + self.fetcher.klines_15m.append(candle) + self.fetcher._last_15m_ts = candle[0] + + # 1h + raw_1h = await self.fetcher.fetch_ohlcv(self.symbol, "1h", limit=3) + for candle in raw_1h: + if candle[0] > self.fetcher._last_1h_ts: + self.fetcher.klines_1h.append(candle) + self.fetcher._last_1h_ts = candle[0] + + async def _on_15m_close(self): + """15m 캔들 종료 시 신호 판정.""" + df_15m = self.fetcher.get_15m_dataframe() + meta_state = self.meta.get_market_state() + atr = self.meta.get_current_atr() + + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + logger.info(f"[MTFBot] ── 15m 캔들 close ({now_str}) ──") + logger.info(f"[MTFBot] Meta: {meta_state} | ATR: {atr:.6f}" if atr else f"[MTFBot] Meta: {meta_state} | ATR: N/A") + + signal = self.trigger.generate_signal(df_15m, meta_state) + info = self.trigger.get_trigger_info() + + if signal != "HOLD": + logger.info(f"[MTFBot] 신호: {signal} | {info.get('reason', '')}") + current_price = float(df_15m.iloc[-1]["close"]) + result = self.executor.execute(signal, current_price, atr) + if result: + logger.info(f"[MTFBot] 거래 기록: {result}") + else: + logger.debug(f"[MTFBot] HOLD | {info.get('reason', '')}") + + def _check_sl_tp(self): + """현재 가격으로 SL/TP 도달 여부 확인 (15m 캔들 high/low 기반).""" + df_15m = self.fetcher.get_15m_dataframe() + if df_15m is None or len(df_15m) < 1: + return + + last = df_15m.iloc[-1] + pos = self.executor.current_position + sl = self.executor._sl_price + tp = self.executor._tp_price + entry = self.executor._entry_price + + if pos is None or sl is None or tp is None: + return + + hit_sl = hit_tp = False + if pos == "LONG": + hit_sl = last["low"] <= sl + hit_tp = last["high"] >= tp + else: + hit_sl = last["high"] >= sl + hit_tp = last["low"] <= tp + + if hit_sl and hit_tp: + # 보수적: SL 우선 + exit_price = sl + pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry + logger.info(f"[MTFBot] SL+TP 동시 히트 → SL 우선 청산 | PnL: {pnl*10000:+.1f}bps") + self.executor.close_position(f"SL 히트 ({exit_price:.4f})") + elif hit_sl: + exit_price = sl + pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry + logger.info(f"[MTFBot] SL 히트 | 청산가: {exit_price:.4f} | PnL: {pnl*10000:+.1f}bps") + self.executor.close_position(f"SL 히트 ({exit_price:.4f})") + elif hit_tp: + exit_price = tp + pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry + logger.info(f"[MTFBot] TP 히트 | 청산가: {exit_price:.4f} | PnL: {pnl*10000:+.1f}bps") + self.executor.close_position(f"TP 히트 ({exit_price:.4f})") + + +# ═══════════════════════════════════════════════════════════════════ +# 검증 테스트 +# ═══════════════════════════════════════════════════════════════════ + +async def test_module_1_2(): + """Module 1 & 2 검증 테스트.""" + print("=" * 60) + print(" MTF Bot Module 1 & 2 검증 테스트") + print("=" * 60) + + # ── 1. TimeframeSync 검증 ── + print("\n[1] TimeframeSync 검증") + # 2026-01-01 01:00:03 UTC (1h 캔들 close 직후) + ts_1h_close = int(datetime(2026, 1, 1, 1, 0, 3, tzinfo=timezone.utc).timestamp() * 1000) + # 2026-01-01 00:15:04 UTC (15m 캔들 close 직후) + ts_15m_close = int(datetime(2026, 1, 1, 0, 15, 4, tzinfo=timezone.utc).timestamp() * 1000) + # 2026-01-01 00:15:00 UTC (정각 — 아직 딜레이 전) + ts_too_early = int(datetime(2026, 1, 1, 0, 15, 0, tzinfo=timezone.utc).timestamp() * 1000) + # 2026-01-01 00:15:10 UTC (너무 늦음) + ts_too_late = int(datetime(2026, 1, 1, 0, 15, 10, tzinfo=timezone.utc).timestamp() * 1000) + # 2026-01-01 00:07:03 UTC (15m 경계 아님) + ts_not_boundary = int(datetime(2026, 1, 1, 0, 7, 3, tzinfo=timezone.utc).timestamp() * 1000) + + assert TimeframeSync.is_1h_candle_closed(ts_1h_close) is True, "1h close 판별 실패" + assert TimeframeSync.is_15m_candle_closed(ts_15m_close) is True, "15m close 판별 실패" + assert TimeframeSync.is_15m_candle_closed(ts_too_early) is False, "정각(0초)에 True 반환" + assert TimeframeSync.is_15m_candle_closed(ts_too_late) is False, "10초에 True 반환" + assert TimeframeSync.is_15m_candle_closed(ts_not_boundary) is False, "비경계 시점에 True 반환" + assert TimeframeSync.is_1h_candle_closed(ts_15m_close) is False, "15분에 1h close True 반환" + print(" ✓ TimeframeSync: second 2~5 범위에서만 True 반환 확인") + + # ── 2. DataFetcher 초기화 ── + print("\n[2] DataFetcher 초기화") + fetcher = DataFetcher(symbol="XRP/USDT:USDT") + try: + await fetcher.initialize() + + assert len(fetcher.klines_15m) == 200, f"15m 캔들 {len(fetcher.klines_15m)}개 (200 예상)" + assert len(fetcher.klines_1h) == 200, f"1h 캔들 {len(fetcher.klines_1h)}개 (200 예상)" + print(f" ✓ 초기화 완료: 15m={len(fetcher.klines_15m)}개, 1h={len(fetcher.klines_1h)}개") + + # ── 3. [:-1] 슬라이싱 검증 ── + print("\n[3] get_1h_dataframe_completed() [:-1] 검증") + df_1h = fetcher.get_1h_dataframe_completed() + assert df_1h is not None, "1h DataFrame이 None" + assert len(df_1h) == 199, f"1h completed 캔들 {len(df_1h)}개 (199 예상)" + + # 마지막 완성 봉의 timestamp < 현재 진행 중 봉의 timestamp + last_completed_ts = df_1h.index[-1] + last_raw_ts = pd.to_datetime(fetcher.klines_1h[-1][0], unit="ms", utc=True) + assert last_completed_ts < last_raw_ts, "completed 봉이 진행 중 봉을 포함" + print(f" ✓ 1h completed: {len(df_1h)}개 (200 - 1 = 199, 미완성 봉 제외 확인)") + print(f" 마지막 완성 봉: {last_completed_ts}") + print(f" 진행 중 봉: {last_raw_ts} (제외됨)") + + # 15m DataFrame 검증 + df_15m = fetcher.get_15m_dataframe() + assert df_15m is not None and len(df_15m) == 200 + print(f" ✓ 15m DataFrame: {len(df_15m)}개") + + # ── 4. MetaFilter 검증 ── + print("\n[4] MetaFilter 검증") + meta = MetaFilter(fetcher) + + state = meta.get_market_state() + assert state in ("LONG_ALLOWED", "SHORT_ALLOWED", "WAIT"), f"비정상 상태: {state}" + print(f" ✓ MetaFilter 상태: {state}") + + atr = meta.get_current_atr() + assert atr is not None and atr > 0, f"ATR 비정상: {atr}" + print(f" ✓ ATR: {atr:.6f} (> 0 확인)") + + info = meta.get_meta_info() + print(f" ✓ Meta Info: {info}") + + # ATR 범위 검증 (XRP 기준 0.0001 ~ 0.1) + assert 0.0001 <= atr <= 0.1, f"ATR 범위 이탈: {atr}" + print(f" ✓ ATR 범위 정상: 0.0001 <= {atr:.6f} <= 0.1") + + finally: + await fetcher.close() + + print("\n" + "=" * 60) + print(" 모든 검증 통과 ✓") + print("=" * 60) + + +async def test_module_3_4(): + """ + Module 3 + 4 통합 테스트. + + 검증 항목: + [Module 3 - TriggerStrategy] + 1. 신호 생성: 'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD' 중 하나 반환 + 2. EMA15: NaN 아님, 양수, 현실적 범위 + 3. Vol_SMA20: NaN 아님, 양수 + 4. vol_ratio: 0.0 ~ 2.0+ 범위 내 + 5. 3캔들 시퀀스: t-2, t-1, t 인덱싱 정확성 + 6. meta_state 필터: 'LONG_ALLOWED'에서만 LONG, 'SHORT_ALLOWED'에서만 SHORT + + [Module 4 - ExecutionManager] + 7. 포지션 중복 방지 + 8. SL/TP 계산: ATR * 1.5 (SL), ATR * 2.3 (TP) + 9. Dry-run 로그 출력 + 10. 청산 후 재진입 가능 + """ + print("=" * 60) + print(" MTF Bot Module 3 & 4 통합 테스트") + print("=" * 60) + + # ── DataFetcher로 실제 데이터 로드 ── + fetcher = DataFetcher(symbol="XRP/USDT:USDT") + try: + await fetcher.initialize() + + df_15m = fetcher.get_15m_dataframe() + assert df_15m is not None and len(df_15m) >= 25, "15m 데이터 부족" + + meta = MetaFilter(fetcher) + meta_state = meta.get_market_state() + atr = meta.get_current_atr() + print(f"\n[환경] MetaFilter: {meta_state} | ATR: {atr}") + + # ── [Module 3] TriggerStrategy 검증 ── + print("\n[1] TriggerStrategy 신호 생성") + trigger = TriggerStrategy() + + # 테스트 1: 실제 데이터로 신호 생성 + signal = trigger.generate_signal(df_15m, meta_state) + assert signal in ("EXECUTE_LONG", "EXECUTE_SHORT", "HOLD"), f"비정상 신호: {signal}" + print(f" ✓ 신호: {signal}") + + info = trigger.get_trigger_info() + print(f" ✓ Trigger Info: {info}") + + # 테스트 2: 지표 값 검증 + if "ema15_t" in info: + assert not pd.isna(info["ema15_t"]) and info["ema15_t"] > 0, "EMA15 비정상" + assert not pd.isna(info["vol_sma20_t2"]) and info["vol_sma20_t2"] > 0, "Vol SMA20 비정상" + assert 0 <= info["vol_ratio"] <= 100, f"vol_ratio 비정상: {info['vol_ratio']}" + print(f" ✓ EMA15(t): {info['ema15_t']:.4f}") + print(f" ✓ Vol SMA20(t-2): {info['vol_sma20_t2']:.0f}") + print(f" ✓ Vol ratio: {info['vol_ratio']:.4f} ({'고갈' if info['vol_ratio'] < 0.5 else '정상'})") + + # 테스트 3: meta_state=WAIT → 무조건 HOLD + signal_wait = trigger.generate_signal(df_15m, "WAIT") + assert signal_wait == "HOLD", "WAIT 상태에서 HOLD 아닌 신호 발생" + print(f" ✓ meta_state=WAIT → {signal_wait}") + + # 테스트 4: 데이터 부족 → HOLD + signal_short = trigger.generate_signal(df_15m.iloc[:10], "LONG_ALLOWED") + assert signal_short == "HOLD", "데이터 부족에서 HOLD 아닌 신호 발생" + print(f" ✓ 데이터 부족(10행) → {signal_short}") + + # 테스트 5: None DataFrame → HOLD + signal_none = trigger.generate_signal(None, "LONG_ALLOWED") + assert signal_none == "HOLD" + print(f" ✓ None DataFrame → HOLD") + + # ── [Module 4] ExecutionManager 검증 ── + print(f"\n[2] ExecutionManager 검증") + executor = ExecutionManager() + + # 테스트 6: HOLD → None + result = executor.execute("HOLD", 2.5, 0.01) + assert result is None, "HOLD에서 주문 실행됨" + print(f" ✓ HOLD → None") + + # 테스트 7: ATR 무효 → None + result = executor.execute("EXECUTE_LONG", 2.5, None) + assert result is None, "ATR=None에서 주문 실행됨" + result = executor.execute("EXECUTE_LONG", 2.5, 0) + assert result is None, "ATR=0에서 주문 실행됨" + print(f" ✓ ATR 무효 → None") + + # 테스트 8: 정상 LONG 주문 + print(f"\n [LONG 가상 주문 테스트]") + test_atr = 0.01 + result = executor.execute("EXECUTE_LONG", 2.5340, test_atr) + assert result is not None, "정상 주문이 None 반환" + assert result["action"] == "LONG" + assert abs(result["sl_price"] - (2.5340 - 0.01 * 1.5)) < 1e-8, "SL 계산 오류" + assert abs(result["tp_price"] - (2.5340 + 0.01 * 2.3)) < 1e-8, "TP 계산 오류" + assert result["risk_reward"] == 1.53, f"R:R 오류: {result['risk_reward']}" + print(f" ✓ LONG 주문: entry={result['entry_price']}, SL={result['sl_price']:.4f}, TP={result['tp_price']:.4f}") + print(f" ✓ R:R = 1:{result['risk_reward']}") + + # 테스트 9: 포지션 중복 방지 + result_dup = executor.execute("EXECUTE_SHORT", 2.5000, test_atr) + assert result_dup is None, "중복 포지션 허용됨" + assert executor.current_position == "LONG", "포지션 상태 변경됨" + print(f" ✓ 중복 차단: LONG 포지션 중 SHORT 신호 → None") + + # 테스트 10: 청산 후 재진입 + executor.close_position("테스트 청산") + assert executor.current_position is None, "청산 후 포지션 잔존" + print(f" ✓ 청산 완료, 포지션=None") + + # 테스트 11: SHORT 주문 + print(f"\n [SHORT 가상 주문 테스트]") + result_short = executor.execute("EXECUTE_SHORT", 2.5340, test_atr) + assert result_short is not None + assert result_short["action"] == "SHORT" + assert abs(result_short["sl_price"] - (2.5340 + 0.01 * 1.5)) < 1e-8, "SHORT SL 오류" + assert abs(result_short["tp_price"] - (2.5340 - 0.01 * 2.3)) < 1e-8, "SHORT TP 오류" + print(f" ✓ SHORT 주문: entry={result_short['entry_price']}, SL={result_short['sl_price']:.4f}, TP={result_short['tp_price']:.4f}") + + executor.close_position("테스트 종료") + + # 테스트 12: 빈 포지션 청산 → 에러 없이 처리 + executor.close_position("이미 청산됨") + print(f" ✓ 빈 포지션 청산 → 에러 없음") + + finally: + await fetcher.close() + + print("\n" + "=" * 60) + print(" Module 3 & 4 모든 검증 통과 ✓") + print("=" * 60) + + +async def test_all(): + """Module 1~4 전체 검증.""" + await test_module_1_2() + print("\n") + await test_module_3_4() + + +if __name__ == "__main__": + asyncio.run(test_all())