feat: add MTF pullback bot for OOS dry-run verification

Volume-backed pullback strategy with 1h meta filter (EMA50/200 + ADX)
and 15m 3-candle trigger sequence. Deployed as separate mtf-bot container
alongside existing cointrader. All orders are dry-run (logged only).

- src/mtf_bot.py: Module 1-4 (DataFetcher, MetaFilter, TriggerStrategy, ExecutionManager)
- main_mtf.py: OOS dry-run entry point
- docker-compose.yml: mtf-bot service added
- requirements.txt: ccxt dependency added
- scripts/mtf_backtest.py: backtest script (Phase 1 robustness: SHORT PF≥1.5 in 7/9 combos)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-30 15:58:40 +09:00
parent 17742da6af
commit 4c40516559
5 changed files with 1291 additions and 0 deletions

View File

@@ -52,6 +52,22 @@ services:
max-size: "10m"
max-file: "3"
mtf-bot:
image: git.gihyeon.com/gihyeon/cointrader:latest
container_name: mtf-bot
restart: unless-stopped
environment:
- TZ=Asia/Seoul
- PYTHONUNBUFFERED=1
volumes:
- ./logs:/app/logs
entrypoint: ["python", "main_mtf.py"]
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
ls-ratio-collector:
image: git.gihyeon.com/gihyeon/cointrader:latest
container_name: ls-ratio-collector

41
main_mtf.py Normal file
View File

@@ -0,0 +1,41 @@
"""MTF Pullback Bot — OOS Dry-run Entry Point."""
import asyncio
import signal as sig
from loguru import logger
from src.mtf_bot import MTFPullbackBot
from src.logger_setup import setup_logger
async def main():
setup_logger(log_level="INFO")
logger.info("MTF Pullback Bot 시작 (Dry-run OOS 모드)")
bot = MTFPullbackBot(symbol="XRP/USDT:USDT")
loop = asyncio.get_running_loop()
shutdown = asyncio.Event()
def _on_signal():
logger.warning("종료 시그널 수신 (SIGTERM/SIGINT)")
shutdown.set()
for s in (sig.SIGTERM, sig.SIGINT):
loop.add_signal_handler(s, _on_signal)
bot_task = asyncio.create_task(bot.run(), name="mtf-bot")
shutdown_task = asyncio.create_task(shutdown.wait(), name="shutdown-wait")
done, pending = await asyncio.wait(
[bot_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
bot_task.cancel()
shutdown_task.cancel()
await asyncio.gather(bot_task, shutdown_task, return_exceptions=True)
logger.info("MTF Pullback Bot 종료")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -15,3 +15,4 @@ pyarrow>=15.0.0
onnxruntime>=1.18.0
optuna>=3.6.0
quantstats>=0.0.81
ccxt>=4.5.0

342
scripts/mtf_backtest.py Normal file
View File

@@ -0,0 +1,342 @@
"""
MTF Pullback Backtest
─────────────────────
Trigger: 1h 추세 방향으로 15m 눌림목 진입
LONG: 1h Meta=LONG + 15m close < EMA20 + vol < SMA20*0.5 → 다음 봉 close > EMA20 시 진입
SHORT: 1h Meta=SHORT + 15m close > EMA20 + vol < SMA20*0.5 → 다음 봉 close < EMA20 시 진입
SL/TP: 1h ATR 기반 (진입 시점 직전 완성된 1h 캔들)
Look-ahead bias 방지: 1h 지표는 직전 완성 봉만 사용
"""
import pandas as pd
import pandas_ta as ta
import numpy as np
from pathlib import Path
from dataclasses import dataclass
# ─── 설정 ────────────────────────────────────────────────────────
SYMBOL = "xrpusdt"
DATA_PATH = Path(f"data/{SYMBOL}/combined_15m.parquet")
START = "2026-02-01"
END = "2026-03-30"
ATR_SL_MULT = 1.5
ATR_TP_MULT = 2.3
FEE_RATE = 0.0004 # 0.04% per side
# 1h 메타필터
MTF_EMA_FAST = 50
MTF_EMA_SLOW = 200
MTF_ADX_THRESHOLD = 20
# 15m Trigger
EMA_PULLBACK_LEN = 20
VOL_DRY_RATIO = 0.5 # volume < vol_ma20 * 0.5
@dataclass
class Trade:
entry_time: pd.Timestamp
entry_price: float
side: str
sl: float
tp: float
exit_time: pd.Timestamp | None = None
exit_price: float | None = None
pnl_pct: float | None = None
def build_1h_data(df_15m: pd.DataFrame) -> pd.DataFrame:
"""15m → 1h 리샘플링 + EMA50, EMA200, ADX, ATR."""
df_1h = df_15m[["open", "high", "low", "close", "volume"]].resample("1h").agg(
{"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}
).dropna()
df_1h["ema50_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_FAST)
df_1h["ema200_1h"] = ta.ema(df_1h["close"], length=MTF_EMA_SLOW)
adx_df = ta.adx(df_1h["high"], df_1h["low"], df_1h["close"], length=14)
df_1h["adx_1h"] = adx_df["ADX_14"]
df_1h["atr_1h"] = ta.atr(df_1h["high"], df_1h["low"], df_1h["close"], length=14)
return df_1h[["ema50_1h", "ema200_1h", "adx_1h", "atr_1h"]]
def merge_1h_to_15m(df_15m: pd.DataFrame, df_1h: pd.DataFrame) -> pd.DataFrame:
"""Look-ahead bias 방지: 1h 봉 완성 시점(+1h) 기준 backward merge."""
df_1h_shifted = df_1h.copy()
df_1h_shifted.index = df_1h_shifted.index + pd.Timedelta(hours=1)
df_15m_reset = df_15m.reset_index()
df_1h_reset = df_1h_shifted.reset_index()
df_1h_reset.rename(columns={"index": "timestamp"}, inplace=True)
if "timestamp" not in df_15m_reset.columns:
df_15m_reset.rename(columns={df_15m_reset.columns[0]: "timestamp"}, inplace=True)
df_15m_reset["timestamp"] = pd.to_datetime(df_15m_reset["timestamp"]).astype("datetime64[us]")
df_1h_reset["timestamp"] = pd.to_datetime(df_1h_reset["timestamp"]).astype("datetime64[us]")
merged = pd.merge_asof(
df_15m_reset.sort_values("timestamp"),
df_1h_reset.sort_values("timestamp"),
on="timestamp",
direction="backward",
)
return merged.set_index("timestamp")
def get_1h_meta(row) -> str:
"""1h 메타필터: EMA50/200 방향 + ADX > 20."""
ema50 = row.get("ema50_1h")
ema200 = row.get("ema200_1h")
adx = row.get("adx_1h")
if pd.isna(ema50) or pd.isna(ema200) or pd.isna(adx):
return "HOLD"
if adx < MTF_ADX_THRESHOLD:
return "HOLD"
if ema50 > ema200:
return "LONG"
elif ema50 < ema200:
return "SHORT"
return "HOLD"
def calc_metrics(trades: list[Trade]) -> dict:
if not trades:
return {"trades": 0, "win_rate": 0, "pf": 0, "pnl_bps": 0, "max_dd_bps": 0,
"avg_win_bps": 0, "avg_loss_bps": 0, "long_trades": 0, "short_trades": 0}
pnls = [t.pnl_pct for t in trades]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
gross_profit = sum(wins) if wins else 0
gross_loss = abs(sum(losses)) if losses else 0
pf = gross_profit / gross_loss if gross_loss > 0 else float("inf")
cumulative = np.cumsum(pnls)
peak = np.maximum.accumulate(cumulative)
dd = cumulative - peak
max_dd = abs(dd.min()) if len(dd) > 0 else 0
return {
"trades": len(trades),
"win_rate": len(wins) / len(trades) * 100,
"pf": round(pf, 2),
"pnl_bps": round(sum(pnls) * 10000, 1),
"max_dd_bps": round(max_dd * 10000, 1),
"avg_win_bps": round(np.mean(wins) * 10000, 1) if wins else 0,
"avg_loss_bps": round(np.mean(losses) * 10000, 1) if losses else 0,
"long_trades": sum(1 for t in trades if t.side == "LONG"),
"short_trades": sum(1 for t in trades if t.side == "SHORT"),
}
def main():
print("=" * 70)
print(" MTF Pullback Backtest")
print(f" {SYMBOL.upper()} | {START} ~ {END}")
print(f" SL: 1h ATR×{ATR_SL_MULT} | TP: 1h ATR×{ATR_TP_MULT} | Fee: {FEE_RATE*100:.2f}%/side")
print(f" Pullback: EMA{EMA_PULLBACK_LEN} | Vol dry: <{VOL_DRY_RATIO*100:.0f}% of SMA20")
print("=" * 70)
# ── 데이터 로드 ──
df_raw = pd.read_parquet(DATA_PATH)
if df_raw.index.tz is not None:
df_raw.index = df_raw.index.tz_localize(None)
# 1h EMA200 워밍업 (200h = 800 bars)
warmup_start = pd.Timestamp(START) - pd.Timedelta(hours=250)
df_full = df_raw[df_raw.index >= warmup_start].copy()
print(f"\n데이터: {len(df_full)} bars (워밍업 포함)")
# ── 15m 지표: EMA20, vol_ma20 ──
df_full["ema20"] = ta.ema(df_full["close"], length=EMA_PULLBACK_LEN)
df_full["vol_ma20"] = ta.sma(df_full["volume"], length=20)
# ── 1h 지표 ──
df_1h = build_1h_data(df_full)
print(f"1h 캔들: {len(df_1h)} bars")
# ── 병합 ──
df_merged = merge_1h_to_15m(df_full, df_1h)
# ── 분석 기간 ──
df = df_merged[(df_merged.index >= START) & (df_merged.index <= END)].copy()
print(f"분석 기간: {len(df)} bars ({df.index.min()} ~ {df.index.max()})")
# ── 신호 스캔 & 시뮬레이션 ──
trades: list[Trade] = []
in_trade = False
current_trade: Trade | None = None
pullback_ready = False # 눌림 감지 상태
pullback_side = ""
# 디버그 카운터
meta_long_count = 0
meta_short_count = 0
pullback_detected = 0
entry_triggered = 0
for i in range(1, len(df)):
row = df.iloc[i]
prev = df.iloc[i - 1]
# ── 기존 포지션 SL/TP 체크 ──
if in_trade and current_trade is not None:
hit_sl = False
hit_tp = False
if current_trade.side == "LONG":
if row["low"] <= current_trade.sl:
hit_sl = True
if row["high"] >= current_trade.tp:
hit_tp = True
else:
if row["high"] >= current_trade.sl:
hit_sl = True
if row["low"] <= current_trade.tp:
hit_tp = True
if hit_sl or hit_tp:
exit_price = current_trade.sl if hit_sl else current_trade.tp
if hit_sl and hit_tp:
exit_price = current_trade.sl # 보수적
if current_trade.side == "LONG":
raw_pnl = (exit_price - current_trade.entry_price) / current_trade.entry_price
else:
raw_pnl = (current_trade.entry_price - exit_price) / current_trade.entry_price
current_trade.exit_time = df.index[i]
current_trade.exit_price = exit_price
current_trade.pnl_pct = raw_pnl - FEE_RATE * 2
trades.append(current_trade)
in_trade = False
current_trade = None
# ── 포지션 중이면 새 진입 스킵 ──
if in_trade:
continue
# NaN 체크
if pd.isna(row.get("ema20")) or pd.isna(row.get("vol_ma20")) or pd.isna(row.get("atr_1h")):
pullback_ready = False
continue
# ── Step 1: 1h Meta Filter ──
meta = get_1h_meta(row)
if meta == "LONG":
meta_long_count += 1
elif meta == "SHORT":
meta_short_count += 1
if meta == "HOLD":
pullback_ready = False
continue
# ── Step 2: 눌림(Pullback) 감지 ──
# 이전 봉이 눌림 조건을 충족했는지 확인
if pullback_ready and pullback_side == meta:
# ── Step 4: 추세 재개 확인 (현재 봉 close 기준) ──
if pullback_side == "LONG" and row["close"] > row["ema20"]:
# 진입: 이 봉의 open (추세 재개 확인된 봉)
# 실제로는 close 시점에 확인하므로 다음 봉 open에 진입해야 look-ahead 방지
# 하지만 사양서에 "직후 캔들의 종가가 EMA20 상향 돌파한 첫 번째 캔들의 시가"라고 되어 있으므로
# → 이 봉(close > EMA20)의 open에서 진입은 look-ahead bias
# → 정확히는: prev가 pullback, 현재 봉 close > EMA20 확인 → 다음 봉 open 진입
# 여기서는 다음 봉 open으로 처리
if i + 1 < len(df):
next_row = df.iloc[i + 1]
entry_price = next_row["open"]
atr_1h = row["atr_1h"]
sl = entry_price - atr_1h * ATR_SL_MULT
tp = entry_price + atr_1h * ATR_TP_MULT
current_trade = Trade(
entry_time=df.index[i + 1],
entry_price=entry_price,
side="LONG",
sl=sl, tp=tp,
)
in_trade = True
pullback_ready = False
entry_triggered += 1
continue
elif pullback_side == "SHORT" and row["close"] < row["ema20"]:
if i + 1 < len(df):
next_row = df.iloc[i + 1]
entry_price = next_row["open"]
atr_1h = row["atr_1h"]
sl = entry_price + atr_1h * ATR_SL_MULT
tp = entry_price - atr_1h * ATR_TP_MULT
current_trade = Trade(
entry_time=df.index[i + 1],
entry_price=entry_price,
side="SHORT",
sl=sl, tp=tp,
)
in_trade = True
pullback_ready = False
entry_triggered += 1
continue
# ── Step 2+3: 눌림 + 거래량 고갈 감지 (다음 봉에서 재개 확인) ──
vol_dry = row["volume"] < row["vol_ma20"] * VOL_DRY_RATIO
if meta == "LONG" and row["close"] < row["ema20"] and vol_dry:
pullback_ready = True
pullback_side = "LONG"
pullback_detected += 1
elif meta == "SHORT" and row["close"] > row["ema20"] and vol_dry:
pullback_ready = True
pullback_side = "SHORT"
pullback_detected += 1
else:
# 조건 불충족 시 pullback 상태 리셋
# 단, 연속 pullback 허용 (여러 봉 동안 눌림 지속 가능)
if not (meta == pullback_side):
pullback_ready = False
# ── 결과 출력 ──
m = calc_metrics(trades)
long_trades = [t for t in trades if t.side == "LONG"]
short_trades = [t for t in trades if t.side == "SHORT"]
lm = calc_metrics(long_trades)
sm = calc_metrics(short_trades)
print(f"\n─── 신호 파이프라인 ───")
print(f"1h Meta LONG: {meta_long_count} bars | SHORT: {meta_short_count} bars")
print(f"Pullback 감지: {pullback_detected}")
print(f"진입 트리거: {entry_triggered}")
print(f"실제 거래: {m['trades']}건 (L:{m['long_trades']} / S:{m['short_trades']})")
print(f"\n{'=' * 70}")
print(f" 결과")
print(f"{'=' * 70}")
header = f"{'구분':<10} {'Trades':>7} {'WinRate':>8} {'PF':>6} {'PnL(bps)':>10} {'MaxDD(bps)':>11} {'AvgWin':>8} {'AvgLoss':>8}"
print(header)
print("-" * len(header))
print(f"{'전체':<10} {m['trades']:>7} {m['win_rate']:>7.1f}% {m['pf']:>6.2f} {m['pnl_bps']:>10.1f} {m['max_dd_bps']:>11.1f} {m['avg_win_bps']:>8.1f} {m['avg_loss_bps']:>8.1f}")
print(f"{'LONG':<10} {lm['trades']:>7} {lm['win_rate']:>7.1f}% {lm['pf']:>6.2f} {lm['pnl_bps']:>10.1f} {lm['max_dd_bps']:>11.1f} {lm['avg_win_bps']:>8.1f} {lm['avg_loss_bps']:>8.1f}")
print(f"{'SHORT':<10} {sm['trades']:>7} {sm['win_rate']:>7.1f}% {sm['pf']:>6.2f} {sm['pnl_bps']:>10.1f} {sm['max_dd_bps']:>11.1f} {sm['avg_win_bps']:>8.1f} {sm['avg_loss_bps']:>8.1f}")
# 개별 거래 목록
if trades:
print(f"\n─── 개별 거래 ───")
print(f"{'#':>3} {'Side':<6} {'Entry Time':<20} {'Entry':>10} {'Exit':>10} {'PnL(bps)':>10} {'Result':>8}")
print("-" * 75)
for idx, t in enumerate(trades, 1):
result = "WIN" if t.pnl_pct > 0 else "LOSS"
pnl_bps = t.pnl_pct * 10000
print(f"{idx:>3} {t.side:<6} {str(t.entry_time):<20} {t.entry_price:>10.4f} {t.exit_price:>10.4f} {pnl_bps:>+10.1f} {result:>8}")
if __name__ == "__main__":
main()

891
src/mtf_bot.py Normal file
View File

@@ -0,0 +1,891 @@
"""
MTF Pullback Bot — Module 1~4
──────────────────────────────
Module 1: TimeframeSync, DataFetcher (REST 폴링 기반)
Module 2: MetaFilter (1h EMA50/200 + ADX + ATR)
Module 3: TriggerStrategy (15m Volume-backed Pullback 3캔들 시퀀스)
Module 4: ExecutionManager (Dry-run 가상 주문 + SL/TP 관리)
핵심 원칙:
- Look-ahead bias 원천 차단: 완성된 캔들만 사용 ([:-1] 슬라이싱)
- Binance 서버 딜레이 고려: 캔들 판별 시 2~5초 range
- REST 폴링 기반 안정성: WebSocket 대신 30초 주기 폴링
- 메모리 최적화: deque(maxlen=200)
- Dry-run 모드: 4월 OOS 검증 기간, 실주문 API 주석 처리
"""
import asyncio
from datetime import datetime, timezone
from collections import deque
from typing import Optional, Dict, List
import pandas as pd
import pandas_ta as ta
import ccxt.async_support as ccxt
from loguru import logger
# ═══════════════════════════════════════════════════════════════════
# Module 1: TimeframeSync
# ═══════════════════════════════════════════════════════════════════
class TimeframeSync:
"""현재 시간이 15m/1h 캔들 종료 직후인지 판별 (Binance 서버 딜레이 2~5초 고려)."""
_15M_MINUTES = {0, 15, 30, 45}
@staticmethod
def is_15m_candle_closed(current_ts: int) -> bool:
"""
15m 캔들 종료 판별.
Args:
current_ts: Unix timestamp (밀리초)
Returns:
True if 분(minute)이 [0, 15, 30, 45] 중 하나이고 초(second)가 2~5초 사이
"""
dt = datetime.fromtimestamp(current_ts / 1000, tz=timezone.utc)
return dt.minute in TimeframeSync._15M_MINUTES and 2 <= dt.second <= 5
@staticmethod
def is_1h_candle_closed(current_ts: int) -> bool:
"""
1h 캔들 종료 판별.
Args:
current_ts: Unix timestamp (밀리초)
Returns:
True if 분(minute)이 0이고 초(second)가 2~5초 사이
"""
dt = datetime.fromtimestamp(current_ts / 1000, tz=timezone.utc)
return dt.minute == 0 and 2 <= dt.second <= 5
# ═══════════════════════════════════════════════════════════════════
# Module 1: DataFetcher
# ═══════════════════════════════════════════════════════════════════
class DataFetcher:
"""Binance Futures에서 15m/1h OHLCV 데이터 fetch 및 관리."""
def __init__(self, symbol: str = "XRP/USDT:USDT"):
self.symbol = symbol
self.exchange = ccxt.binance({
"enableRateLimit": True,
"options": {"defaultType": "future"},
})
self.klines_15m: deque = deque(maxlen=200)
self.klines_1h: deque = deque(maxlen=200)
self._last_15m_ts: int = 0 # 마지막으로 저장된 15m 캔들 timestamp
self._last_1h_ts: int = 0
async def fetch_ohlcv(self, symbol: str, timeframe: str, limit: int = 200) -> List[List]:
"""
ccxt를 통해 OHLCV 데이터 fetch.
Returns:
[[timestamp, open, high, low, close, volume], ...]
"""
return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
async def initialize(self):
"""봇 시작 시 초기 데이터 로드 (200개씩)."""
# 15m 캔들
raw_15m = await self.fetch_ohlcv(self.symbol, "15m", limit=200)
for candle in raw_15m:
self.klines_15m.append(candle)
if raw_15m:
self._last_15m_ts = raw_15m[-1][0]
# 1h 캔들
raw_1h = await self.fetch_ohlcv(self.symbol, "1h", limit=200)
for candle in raw_1h:
self.klines_1h.append(candle)
if raw_1h:
self._last_1h_ts = raw_1h[-1][0]
logger.info(
f"[DataFetcher] 초기화 완료: 15m={len(self.klines_15m)}개, 1h={len(self.klines_1h)}"
)
async def poll_update(self, interval: int = 30):
"""
30초 주기로 REST API 폴링. 새 캔들이 나오면 deque에 append.
무한 루프 — 백그라운드 태스크로 실행.
"""
logger.info(f"[DataFetcher] 폴링 시작 (interval={interval}s)")
while True:
try:
await asyncio.sleep(interval)
# 15m 업데이트: 최근 3개 fetch (중복 방지)
raw_15m = await self.fetch_ohlcv(self.symbol, "15m", limit=3)
new_15m = 0
for candle in raw_15m:
if candle[0] > self._last_15m_ts:
self.klines_15m.append(candle)
self._last_15m_ts = candle[0]
new_15m += 1
# 1h 업데이트: 최근 3개 fetch
raw_1h = await self.fetch_ohlcv(self.symbol, "1h", limit=3)
new_1h = 0
for candle in raw_1h:
if candle[0] > self._last_1h_ts:
self.klines_1h.append(candle)
self._last_1h_ts = candle[0]
new_1h += 1
if new_15m > 0 or new_1h > 0:
logger.info(
f"[DataFetcher] 캔들 업데이트: 15m +{new_15m} (총 {len(self.klines_15m)}), "
f"1h +{new_1h} (총 {len(self.klines_1h)})"
)
except Exception as e:
logger.error(f"[DataFetcher] 폴링 에러: {e}")
await asyncio.sleep(5) # 에러 시 짧은 대기 후 재시도
def get_15m_dataframe(self) -> Optional[pd.DataFrame]:
"""모든 15m 캔들을 DataFrame으로 반환."""
if not self.klines_15m:
return None
data = list(self.klines_15m)
df = pd.DataFrame(data, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
df = df.set_index("timestamp")
return df
def get_1h_dataframe_completed(self) -> Optional[pd.DataFrame]:
"""
'완성된' 1h 캔들만 반환.
핵심: [:-1] 슬라이싱으로 진행 중인 최신 1h 캔들 제외.
이유: Look-ahead bias 원천 차단 — 아직 완성되지 않은 캔들의
high/low/close는 미래 데이터이므로 지표 계산에 사용하면 안 됨.
"""
if len(self.klines_1h) < 2:
return None
completed = list(self.klines_1h)[:-1] # ← 핵심: 미완성 봉 제외
df = pd.DataFrame(completed, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
df = df.set_index("timestamp")
return df
async def close(self):
"""ccxt exchange 연결 정리."""
await self.exchange.close()
# ═══════════════════════════════════════════════════════════════════
# Module 2: MetaFilter
# ═══════════════════════════════════════════════════════════════════
class MetaFilter:
"""1시간봉 데이터로부터 거시 추세 판독."""
EMA_FAST = 50
EMA_SLOW = 200
ADX_THRESHOLD = 20
def __init__(self, data_fetcher: DataFetcher):
self.data_fetcher = data_fetcher
def _calc_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""1h DataFrame에 EMA50, EMA200, ADX, ATR 계산."""
df = df.copy()
df["ema50"] = ta.ema(df["close"], length=self.EMA_FAST)
df["ema200"] = ta.ema(df["close"], length=self.EMA_SLOW)
adx_df = ta.adx(df["high"], df["low"], df["close"], length=14)
df["adx"] = adx_df["ADX_14"]
df["atr"] = ta.atr(df["high"], df["low"], df["close"], length=14)
return df
def get_market_state(self) -> str:
"""
1h 메타필터 상태 반환.
Returns:
'LONG_ALLOWED': EMA50 > EMA200 & ADX > 20 → 상승 추세, LONG 진입 허용
'SHORT_ALLOWED': EMA50 < EMA200 & ADX > 20 → 하락 추세, SHORT 진입 허용
'WAIT': 그 외 (추세 약하거나 데이터 부족)
"""
df = self.data_fetcher.get_1h_dataframe_completed()
if df is None or len(df) < self.EMA_SLOW:
return "WAIT"
df = self._calc_indicators(df)
last = df.iloc[-1]
if pd.isna(last["ema50"]) or pd.isna(last["ema200"]) or pd.isna(last["adx"]):
return "WAIT"
if last["adx"] < self.ADX_THRESHOLD:
return "WAIT"
if last["ema50"] > last["ema200"]:
return "LONG_ALLOWED"
elif last["ema50"] < last["ema200"]:
return "SHORT_ALLOWED"
return "WAIT"
def get_current_atr(self) -> Optional[float]:
"""현재 1h ATR 값 반환 (SL/TP 계산용)."""
df = self.data_fetcher.get_1h_dataframe_completed()
if df is None or len(df) < 15: # ATR(14) 최소 데이터
return None
df = self._calc_indicators(df)
atr = df["atr"].iloc[-1]
return float(atr) if not pd.isna(atr) else None
def get_meta_info(self) -> Dict:
"""전체 메타 정보 반환 (디버깅용)."""
df = self.data_fetcher.get_1h_dataframe_completed()
if df is None or len(df) < self.EMA_SLOW:
return {"state": "WAIT", "ema50": None, "ema200": None,
"adx": None, "atr": None, "timestamp": None}
df = self._calc_indicators(df)
last = df.iloc[-1]
return {
"state": self.get_market_state(),
"ema50": float(last["ema50"]) if not pd.isna(last["ema50"]) else None,
"ema200": float(last["ema200"]) if not pd.isna(last["ema200"]) else None,
"adx": float(last["adx"]) if not pd.isna(last["adx"]) else None,
"atr": float(last["atr"]) if not pd.isna(last["atr"]) else None,
"timestamp": str(df.index[-1]),
}
# ═══════════════════════════════════════════════════════════════════
# Module 3: TriggerStrategy
# ═══════════════════════════════════════════════════════════════════
class TriggerStrategy:
"""
15분봉 Volume-backed Pullback 패턴을 3캔들 시퀀스로 인식.
3캔들 시퀀스:
t-2: 기준 캔들 (Vol_SMA20 산출 기준)
t-1: 풀백 캔들 (EMA 이탈 + 거래량 고갈 확인)
t : 돌파 캔들 (가장 최근 완성된 캔들, EMA 복귀 확인)
"""
EMA_PERIOD = 15
VOL_SMA_PERIOD = 20
VOL_THRESHOLD = 0.50 # vol < vol_sma20 * 0.50
def __init__(self):
self._last_info: Dict = {}
def _calc_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""15m DataFrame에 EMA15, Vol_SMA20 계산."""
df = df.copy()
df["ema15"] = ta.ema(df["close"], length=self.EMA_PERIOD)
df["vol_sma20"] = df["volume"].rolling(self.VOL_SMA_PERIOD).mean()
return df
def generate_signal(self, df_15m: pd.DataFrame, meta_state: str) -> str:
"""
3캔들 시퀀스 기반 진입 신호 생성.
Args:
df_15m: 15분봉 DataFrame (OHLCV)
meta_state: 'LONG_ALLOWED' | 'SHORT_ALLOWED' | 'WAIT'
Returns:
'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD'
"""
# Step 1: 데이터 유효성
if meta_state == "WAIT":
self._last_info = {"signal": "HOLD", "reason": "meta_state=WAIT"}
return "HOLD"
if df_15m is None or len(df_15m) < 25:
self._last_info = {"signal": "HOLD", "reason": f"데이터 부족 ({len(df_15m) if df_15m is not None else 0}행)"}
return "HOLD"
df = self._calc_indicators(df_15m)
# Step 2: 캔들 인덱싱
t = df.iloc[-1] # 최근 완성 캔들 (돌파 확인)
t_1 = df.iloc[-2] # 직전 캔들 (풀백 확인)
t_2 = df.iloc[-3] # 그 이전 캔들 (Vol SMA 기준)
# NaN 체크
if (pd.isna(t["ema15"]) or pd.isna(t_1["ema15"])
or pd.isna(t_2["vol_sma20"])):
self._last_info = {"signal": "HOLD", "reason": "지표 NaN"}
return "HOLD"
vol_sma20_t2 = t_2["vol_sma20"]
vol_t1 = t_1["volume"]
vol_ratio = vol_t1 / vol_sma20_t2 if vol_sma20_t2 > 0 else float("inf")
vol_dry = vol_ratio < self.VOL_THRESHOLD
# 공통 info 구성
self._last_info = {
"ema15_t": float(t["ema15"]),
"ema15_t1": float(t_1["ema15"]),
"vol_sma20_t2": float(vol_sma20_t2),
"vol_t1": float(vol_t1),
"vol_ratio": round(vol_ratio, 4),
"close_t1": float(t_1["close"]),
"close_t": float(t["close"]),
}
# Step 3: LONG 시그널
if meta_state == "LONG_ALLOWED":
pullback = t_1["close"] < t_1["ema15"] # t-1 EMA 아래로 이탈
resumption = t["close"] > t["ema15"] # t EMA 위로 복귀
if pullback and vol_dry and resumption:
self._last_info.update({
"signal": "EXECUTE_LONG",
"reason": f"풀백 이탈 + 거래량 고갈({vol_ratio:.2f}) + 돌파 복귀",
})
return "EXECUTE_LONG"
reasons = []
if not pullback:
reasons.append(f"이탈 없음(close_t1={t_1['close']:.4f} >= ema15={t_1['ema15']:.4f})")
if not vol_dry:
reasons.append(f"거래량 과다({vol_ratio:.2f} >= {self.VOL_THRESHOLD})")
if not resumption:
reasons.append(f"복귀 실패(close_t={t['close']:.4f} <= ema15={t['ema15']:.4f})")
self._last_info.update({"signal": "HOLD", "reason": " | ".join(reasons)})
return "HOLD"
# Step 4: SHORT 시그널
if meta_state == "SHORT_ALLOWED":
pullback = t_1["close"] > t_1["ema15"] # t-1 EMA 위로 이탈
resumption = t["close"] < t["ema15"] # t EMA 아래로 복귀
if pullback and vol_dry and resumption:
self._last_info.update({
"signal": "EXECUTE_SHORT",
"reason": f"풀백 이탈 + 거래량 고갈({vol_ratio:.2f}) + 돌파 복귀",
})
return "EXECUTE_SHORT"
reasons = []
if not pullback:
reasons.append(f"이탈 없음(close_t1={t_1['close']:.4f} <= ema15={t_1['ema15']:.4f})")
if not vol_dry:
reasons.append(f"거래량 과다({vol_ratio:.2f} >= {self.VOL_THRESHOLD})")
if not resumption:
reasons.append(f"복귀 실패(close_t={t['close']:.4f} >= ema15={t['ema15']:.4f})")
self._last_info.update({"signal": "HOLD", "reason": " | ".join(reasons)})
return "HOLD"
# Step 5: 기본값
self._last_info.update({"signal": "HOLD", "reason": f"미지원 meta_state={meta_state}"})
return "HOLD"
def get_trigger_info(self) -> Dict:
"""디버깅 및 로그용 트리거 상태 정보 반환."""
return self._last_info.copy()
# ═══════════════════════════════════════════════════════════════════
# Module 4: ExecutionManager
# ═══════════════════════════════════════════════════════════════════
class ExecutionManager:
"""
TriggerStrategy의 신호를 받아 포지션 상태를 관리하고
SL/TP를 계산하여 가상 주문을 실행한다 (Dry-run 모드).
"""
ATR_SL_MULT = 1.5
ATR_TP_MULT = 2.3
def __init__(self):
self.current_position: Optional[str] = None # None | 'LONG' | 'SHORT'
self._entry_price: Optional[float] = None
self._sl_price: Optional[float] = None
self._tp_price: Optional[float] = None
def execute(self, signal: str, current_price: float, atr_value: Optional[float]) -> Optional[Dict]:
"""
신호에 따라 가상 주문 실행.
Args:
signal: 'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD'
current_price: 현재 시장가
atr_value: 1h ATR 값
Returns:
주문 정보 Dict 또는 None (HOLD / 중복 포지션 / ATR 무효)
"""
if signal == "HOLD":
return None
if self.current_position is not None:
logger.debug(
f"[ExecutionManager] 포지션 중복 차단: "
f"현재={self.current_position}, 신호={signal}"
)
return None
if atr_value is None or atr_value <= 0 or pd.isna(atr_value):
logger.warning(f"[ExecutionManager] ATR 무효({atr_value}), 주문 차단")
return None
entry_price = current_price
if signal == "EXECUTE_LONG":
sl_price = entry_price - (atr_value * self.ATR_SL_MULT)
tp_price = entry_price + (atr_value * self.ATR_TP_MULT)
side = "LONG"
elif signal == "EXECUTE_SHORT":
sl_price = entry_price + (atr_value * self.ATR_SL_MULT)
tp_price = entry_price - (atr_value * self.ATR_TP_MULT)
side = "SHORT"
else:
return None
self.current_position = side
self._entry_price = entry_price
self._sl_price = sl_price
self._tp_price = tp_price
sl_dist = abs(entry_price - sl_price)
tp_dist = abs(tp_price - entry_price)
rr_ratio = tp_dist / sl_dist if sl_dist > 0 else 0
# ── Dry-run 로그 ──
logger.info(
f"\n┌──────────────────────────────────────────────┐\n"
f"│ [DRY-RUN] 가상 주문 실행 │\n"
f"│ 방향: {side:<5} | 진입가: {entry_price:.4f}\n"
f"│ SL: {sl_price:.4f} ({'-' if side == 'LONG' else '+'}{sl_dist:.4f}, ATR×{self.ATR_SL_MULT}) │\n"
f"│ TP: {tp_price:.4f} ({'+' if side == 'LONG' else '-'}{tp_dist:.4f}, ATR×{self.ATR_TP_MULT}) │\n"
f"│ R:R = 1:{rr_ratio:.1f}\n"
f"└──────────────────────────────────────────────┘"
)
# ── 실주문 (프로덕션 전환 시 주석 해제) ──
# if side == "LONG":
# await self.exchange.create_market_buy_order(symbol, amount)
# await self.exchange.create_order(symbol, 'stop_market', 'sell', amount, params={'stopPrice': sl_price})
# await self.exchange.create_order(symbol, 'take_profit_market', 'sell', amount, params={'stopPrice': tp_price})
# elif side == "SHORT":
# await self.exchange.create_market_sell_order(symbol, amount)
# await self.exchange.create_order(symbol, 'stop_market', 'buy', amount, params={'stopPrice': sl_price})
# await self.exchange.create_order(symbol, 'take_profit_market', 'buy', amount, params={'stopPrice': tp_price})
return {
"action": side,
"entry_price": entry_price,
"sl_price": sl_price,
"tp_price": tp_price,
"atr": atr_value,
"risk_reward": round(rr_ratio, 2),
}
def close_position(self, reason: str) -> None:
"""포지션 청산 (상태 초기화)."""
if self.current_position is None:
logger.debug("[ExecutionManager] 청산할 포지션 없음")
return
logger.info(
f"[ExecutionManager] 포지션 청산: {self.current_position} "
f"(진입: {self._entry_price:.4f}) | 사유: {reason}"
)
# ── 실주문 (프로덕션 전환 시 주석 해제) ──
# if self.current_position == "LONG":
# await self.exchange.create_market_sell_order(symbol, amount)
# elif self.current_position == "SHORT":
# await self.exchange.create_market_buy_order(symbol, amount)
self.current_position = None
self._entry_price = None
self._sl_price = None
self._tp_price = None
def get_position_info(self) -> Dict:
"""현재 포지션 정보 반환."""
return {
"position": self.current_position,
"entry_price": self._entry_price,
"sl_price": self._sl_price,
"tp_price": self._tp_price,
}
# ═══════════════════════════════════════════════════════════════════
# 검증 테스트
# ═══════════════════════════════════════════════════════════════════
# ═══════════════════════════════════════════════════════════════════
# Main Loop: OOS Dry-run
# ═══════════════════════════════════════════════════════════════════
class MTFPullbackBot:
"""MTF Pullback Bot 메인 루프 — Dry-run OOS 검증용."""
POLL_INTERVAL = 30 # 초
def __init__(self, symbol: str = "XRP/USDT:USDT"):
self.symbol = symbol
self.fetcher = DataFetcher(symbol=symbol)
self.meta = MetaFilter(self.fetcher)
self.trigger = TriggerStrategy()
self.executor = ExecutionManager()
self._last_15m_check_ts: int = 0 # 중복 체크 방지
async def run(self):
"""메인 루프: 30초 폴링 → 15m 캔들 close 감지 → 신호 판정."""
logger.info(f"[MTFBot] 시작: {self.symbol} (Dry-run OOS 모드)")
await self.fetcher.initialize()
# 초기 상태 출력
meta_state = self.meta.get_market_state()
atr = self.meta.get_current_atr()
logger.info(f"[MTFBot] 초기 상태: Meta={meta_state}, ATR={atr}")
try:
while True:
await asyncio.sleep(self.POLL_INTERVAL)
try:
await self._poll_and_update()
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
# 15m 캔들 close 감지
if TimeframeSync.is_15m_candle_closed(now_ms):
if now_ms - self._last_15m_check_ts > 60_000: # 1분 이내 중복 방지
self._last_15m_check_ts = now_ms
await self._on_15m_close()
# 포지션 보유 중이면 SL/TP 모니터링
if self.executor.current_position is not None:
self._check_sl_tp()
except Exception as e:
logger.error(f"[MTFBot] 루프 에러: {e}")
except asyncio.CancelledError:
logger.info("[MTFBot] 종료 시그널 수신")
finally:
await self.fetcher.close()
logger.info("[MTFBot] 종료 완료")
async def _poll_and_update(self):
"""데이터 폴링 업데이트."""
# 15m
raw_15m = await self.fetcher.fetch_ohlcv(self.symbol, "15m", limit=3)
for candle in raw_15m:
if candle[0] > self.fetcher._last_15m_ts:
self.fetcher.klines_15m.append(candle)
self.fetcher._last_15m_ts = candle[0]
# 1h
raw_1h = await self.fetcher.fetch_ohlcv(self.symbol, "1h", limit=3)
for candle in raw_1h:
if candle[0] > self.fetcher._last_1h_ts:
self.fetcher.klines_1h.append(candle)
self.fetcher._last_1h_ts = candle[0]
async def _on_15m_close(self):
"""15m 캔들 종료 시 신호 판정."""
df_15m = self.fetcher.get_15m_dataframe()
meta_state = self.meta.get_market_state()
atr = self.meta.get_current_atr()
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
logger.info(f"[MTFBot] ── 15m 캔들 close ({now_str}) ──")
logger.info(f"[MTFBot] Meta: {meta_state} | ATR: {atr:.6f}" if atr else f"[MTFBot] Meta: {meta_state} | ATR: N/A")
signal = self.trigger.generate_signal(df_15m, meta_state)
info = self.trigger.get_trigger_info()
if signal != "HOLD":
logger.info(f"[MTFBot] 신호: {signal} | {info.get('reason', '')}")
current_price = float(df_15m.iloc[-1]["close"])
result = self.executor.execute(signal, current_price, atr)
if result:
logger.info(f"[MTFBot] 거래 기록: {result}")
else:
logger.debug(f"[MTFBot] HOLD | {info.get('reason', '')}")
def _check_sl_tp(self):
"""현재 가격으로 SL/TP 도달 여부 확인 (15m 캔들 high/low 기반)."""
df_15m = self.fetcher.get_15m_dataframe()
if df_15m is None or len(df_15m) < 1:
return
last = df_15m.iloc[-1]
pos = self.executor.current_position
sl = self.executor._sl_price
tp = self.executor._tp_price
entry = self.executor._entry_price
if pos is None or sl is None or tp is None:
return
hit_sl = hit_tp = False
if pos == "LONG":
hit_sl = last["low"] <= sl
hit_tp = last["high"] >= tp
else:
hit_sl = last["high"] >= sl
hit_tp = last["low"] <= tp
if hit_sl and hit_tp:
# 보수적: SL 우선
exit_price = sl
pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry
logger.info(f"[MTFBot] SL+TP 동시 히트 → SL 우선 청산 | PnL: {pnl*10000:+.1f}bps")
self.executor.close_position(f"SL 히트 ({exit_price:.4f})")
elif hit_sl:
exit_price = sl
pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry
logger.info(f"[MTFBot] SL 히트 | 청산가: {exit_price:.4f} | PnL: {pnl*10000:+.1f}bps")
self.executor.close_position(f"SL 히트 ({exit_price:.4f})")
elif hit_tp:
exit_price = tp
pnl = (exit_price - entry) / entry if pos == "LONG" else (entry - exit_price) / entry
logger.info(f"[MTFBot] TP 히트 | 청산가: {exit_price:.4f} | PnL: {pnl*10000:+.1f}bps")
self.executor.close_position(f"TP 히트 ({exit_price:.4f})")
# ═══════════════════════════════════════════════════════════════════
# 검증 테스트
# ═══════════════════════════════════════════════════════════════════
async def test_module_1_2():
"""Module 1 & 2 검증 테스트."""
print("=" * 60)
print(" MTF Bot Module 1 & 2 검증 테스트")
print("=" * 60)
# ── 1. TimeframeSync 검증 ──
print("\n[1] TimeframeSync 검증")
# 2026-01-01 01:00:03 UTC (1h 캔들 close 직후)
ts_1h_close = int(datetime(2026, 1, 1, 1, 0, 3, tzinfo=timezone.utc).timestamp() * 1000)
# 2026-01-01 00:15:04 UTC (15m 캔들 close 직후)
ts_15m_close = int(datetime(2026, 1, 1, 0, 15, 4, tzinfo=timezone.utc).timestamp() * 1000)
# 2026-01-01 00:15:00 UTC (정각 — 아직 딜레이 전)
ts_too_early = int(datetime(2026, 1, 1, 0, 15, 0, tzinfo=timezone.utc).timestamp() * 1000)
# 2026-01-01 00:15:10 UTC (너무 늦음)
ts_too_late = int(datetime(2026, 1, 1, 0, 15, 10, tzinfo=timezone.utc).timestamp() * 1000)
# 2026-01-01 00:07:03 UTC (15m 경계 아님)
ts_not_boundary = int(datetime(2026, 1, 1, 0, 7, 3, tzinfo=timezone.utc).timestamp() * 1000)
assert TimeframeSync.is_1h_candle_closed(ts_1h_close) is True, "1h close 판별 실패"
assert TimeframeSync.is_15m_candle_closed(ts_15m_close) is True, "15m close 판별 실패"
assert TimeframeSync.is_15m_candle_closed(ts_too_early) is False, "정각(0초)에 True 반환"
assert TimeframeSync.is_15m_candle_closed(ts_too_late) is False, "10초에 True 반환"
assert TimeframeSync.is_15m_candle_closed(ts_not_boundary) is False, "비경계 시점에 True 반환"
assert TimeframeSync.is_1h_candle_closed(ts_15m_close) is False, "15분에 1h close True 반환"
print(" ✓ TimeframeSync: second 2~5 범위에서만 True 반환 확인")
# ── 2. DataFetcher 초기화 ──
print("\n[2] DataFetcher 초기화")
fetcher = DataFetcher(symbol="XRP/USDT:USDT")
try:
await fetcher.initialize()
assert len(fetcher.klines_15m) == 200, f"15m 캔들 {len(fetcher.klines_15m)}개 (200 예상)"
assert len(fetcher.klines_1h) == 200, f"1h 캔들 {len(fetcher.klines_1h)}개 (200 예상)"
print(f" ✓ 초기화 완료: 15m={len(fetcher.klines_15m)}개, 1h={len(fetcher.klines_1h)}")
# ── 3. [:-1] 슬라이싱 검증 ──
print("\n[3] get_1h_dataframe_completed() [:-1] 검증")
df_1h = fetcher.get_1h_dataframe_completed()
assert df_1h is not None, "1h DataFrame이 None"
assert len(df_1h) == 199, f"1h completed 캔들 {len(df_1h)}개 (199 예상)"
# 마지막 완성 봉의 timestamp < 현재 진행 중 봉의 timestamp
last_completed_ts = df_1h.index[-1]
last_raw_ts = pd.to_datetime(fetcher.klines_1h[-1][0], unit="ms", utc=True)
assert last_completed_ts < last_raw_ts, "completed 봉이 진행 중 봉을 포함"
print(f" ✓ 1h completed: {len(df_1h)}개 (200 - 1 = 199, 미완성 봉 제외 확인)")
print(f" 마지막 완성 봉: {last_completed_ts}")
print(f" 진행 중 봉: {last_raw_ts} (제외됨)")
# 15m DataFrame 검증
df_15m = fetcher.get_15m_dataframe()
assert df_15m is not None and len(df_15m) == 200
print(f" ✓ 15m DataFrame: {len(df_15m)}")
# ── 4. MetaFilter 검증 ──
print("\n[4] MetaFilter 검증")
meta = MetaFilter(fetcher)
state = meta.get_market_state()
assert state in ("LONG_ALLOWED", "SHORT_ALLOWED", "WAIT"), f"비정상 상태: {state}"
print(f" ✓ MetaFilter 상태: {state}")
atr = meta.get_current_atr()
assert atr is not None and atr > 0, f"ATR 비정상: {atr}"
print(f" ✓ ATR: {atr:.6f} (> 0 확인)")
info = meta.get_meta_info()
print(f" ✓ Meta Info: {info}")
# ATR 범위 검증 (XRP 기준 0.0001 ~ 0.1)
assert 0.0001 <= atr <= 0.1, f"ATR 범위 이탈: {atr}"
print(f" ✓ ATR 범위 정상: 0.0001 <= {atr:.6f} <= 0.1")
finally:
await fetcher.close()
print("\n" + "=" * 60)
print(" 모든 검증 통과 ✓")
print("=" * 60)
async def test_module_3_4():
"""
Module 3 + 4 통합 테스트.
검증 항목:
[Module 3 - TriggerStrategy]
1. 신호 생성: 'EXECUTE_LONG' | 'EXECUTE_SHORT' | 'HOLD' 중 하나 반환
2. EMA15: NaN 아님, 양수, 현실적 범위
3. Vol_SMA20: NaN 아님, 양수
4. vol_ratio: 0.0 ~ 2.0+ 범위 내
5. 3캔들 시퀀스: t-2, t-1, t 인덱싱 정확성
6. meta_state 필터: 'LONG_ALLOWED'에서만 LONG, 'SHORT_ALLOWED'에서만 SHORT
[Module 4 - ExecutionManager]
7. 포지션 중복 방지
8. SL/TP 계산: ATR * 1.5 (SL), ATR * 2.3 (TP)
9. Dry-run 로그 출력
10. 청산 후 재진입 가능
"""
print("=" * 60)
print(" MTF Bot Module 3 & 4 통합 테스트")
print("=" * 60)
# ── DataFetcher로 실제 데이터 로드 ──
fetcher = DataFetcher(symbol="XRP/USDT:USDT")
try:
await fetcher.initialize()
df_15m = fetcher.get_15m_dataframe()
assert df_15m is not None and len(df_15m) >= 25, "15m 데이터 부족"
meta = MetaFilter(fetcher)
meta_state = meta.get_market_state()
atr = meta.get_current_atr()
print(f"\n[환경] MetaFilter: {meta_state} | ATR: {atr}")
# ── [Module 3] TriggerStrategy 검증 ──
print("\n[1] TriggerStrategy 신호 생성")
trigger = TriggerStrategy()
# 테스트 1: 실제 데이터로 신호 생성
signal = trigger.generate_signal(df_15m, meta_state)
assert signal in ("EXECUTE_LONG", "EXECUTE_SHORT", "HOLD"), f"비정상 신호: {signal}"
print(f" ✓ 신호: {signal}")
info = trigger.get_trigger_info()
print(f" ✓ Trigger Info: {info}")
# 테스트 2: 지표 값 검증
if "ema15_t" in info:
assert not pd.isna(info["ema15_t"]) and info["ema15_t"] > 0, "EMA15 비정상"
assert not pd.isna(info["vol_sma20_t2"]) and info["vol_sma20_t2"] > 0, "Vol SMA20 비정상"
assert 0 <= info["vol_ratio"] <= 100, f"vol_ratio 비정상: {info['vol_ratio']}"
print(f" ✓ EMA15(t): {info['ema15_t']:.4f}")
print(f" ✓ Vol SMA20(t-2): {info['vol_sma20_t2']:.0f}")
print(f" ✓ Vol ratio: {info['vol_ratio']:.4f} ({'고갈' if info['vol_ratio'] < 0.5 else '정상'})")
# 테스트 3: meta_state=WAIT → 무조건 HOLD
signal_wait = trigger.generate_signal(df_15m, "WAIT")
assert signal_wait == "HOLD", "WAIT 상태에서 HOLD 아닌 신호 발생"
print(f" ✓ meta_state=WAIT → {signal_wait}")
# 테스트 4: 데이터 부족 → HOLD
signal_short = trigger.generate_signal(df_15m.iloc[:10], "LONG_ALLOWED")
assert signal_short == "HOLD", "데이터 부족에서 HOLD 아닌 신호 발생"
print(f" ✓ 데이터 부족(10행) → {signal_short}")
# 테스트 5: None DataFrame → HOLD
signal_none = trigger.generate_signal(None, "LONG_ALLOWED")
assert signal_none == "HOLD"
print(f" ✓ None DataFrame → HOLD")
# ── [Module 4] ExecutionManager 검증 ──
print(f"\n[2] ExecutionManager 검증")
executor = ExecutionManager()
# 테스트 6: HOLD → None
result = executor.execute("HOLD", 2.5, 0.01)
assert result is None, "HOLD에서 주문 실행됨"
print(f" ✓ HOLD → None")
# 테스트 7: ATR 무효 → None
result = executor.execute("EXECUTE_LONG", 2.5, None)
assert result is None, "ATR=None에서 주문 실행됨"
result = executor.execute("EXECUTE_LONG", 2.5, 0)
assert result is None, "ATR=0에서 주문 실행됨"
print(f" ✓ ATR 무효 → None")
# 테스트 8: 정상 LONG 주문
print(f"\n [LONG 가상 주문 테스트]")
test_atr = 0.01
result = executor.execute("EXECUTE_LONG", 2.5340, test_atr)
assert result is not None, "정상 주문이 None 반환"
assert result["action"] == "LONG"
assert abs(result["sl_price"] - (2.5340 - 0.01 * 1.5)) < 1e-8, "SL 계산 오류"
assert abs(result["tp_price"] - (2.5340 + 0.01 * 2.3)) < 1e-8, "TP 계산 오류"
assert result["risk_reward"] == 1.53, f"R:R 오류: {result['risk_reward']}"
print(f" ✓ LONG 주문: entry={result['entry_price']}, SL={result['sl_price']:.4f}, TP={result['tp_price']:.4f}")
print(f" ✓ R:R = 1:{result['risk_reward']}")
# 테스트 9: 포지션 중복 방지
result_dup = executor.execute("EXECUTE_SHORT", 2.5000, test_atr)
assert result_dup is None, "중복 포지션 허용됨"
assert executor.current_position == "LONG", "포지션 상태 변경됨"
print(f" ✓ 중복 차단: LONG 포지션 중 SHORT 신호 → None")
# 테스트 10: 청산 후 재진입
executor.close_position("테스트 청산")
assert executor.current_position is None, "청산 후 포지션 잔존"
print(f" ✓ 청산 완료, 포지션=None")
# 테스트 11: SHORT 주문
print(f"\n [SHORT 가상 주문 테스트]")
result_short = executor.execute("EXECUTE_SHORT", 2.5340, test_atr)
assert result_short is not None
assert result_short["action"] == "SHORT"
assert abs(result_short["sl_price"] - (2.5340 + 0.01 * 1.5)) < 1e-8, "SHORT SL 오류"
assert abs(result_short["tp_price"] - (2.5340 - 0.01 * 2.3)) < 1e-8, "SHORT TP 오류"
print(f" ✓ SHORT 주문: entry={result_short['entry_price']}, SL={result_short['sl_price']:.4f}, TP={result_short['tp_price']:.4f}")
executor.close_position("테스트 종료")
# 테스트 12: 빈 포지션 청산 → 에러 없이 처리
executor.close_position("이미 청산됨")
print(f" ✓ 빈 포지션 청산 → 에러 없음")
finally:
await fetcher.close()
print("\n" + "=" * 60)
print(" Module 3 & 4 모든 검증 통과 ✓")
print("=" * 60)
async def test_all():
"""Module 1~4 전체 검증."""
await test_module_1_2()
print("\n")
await test_module_3_4()
if __name__ == "__main__":
asyncio.run(test_all())