diff --git a/src/bot.py b/src/bot.py index 320da94..522ae29 100644 --- a/src/bot.py +++ b/src/bot.py @@ -1,5 +1,8 @@ import asyncio +import json +import os from collections import deque +from datetime import datetime, timezone from pathlib import Path import pandas as pd from loguru import logger @@ -13,6 +16,12 @@ from src.ml_filter import MLFilter from src.ml_features import build_features_aligned from src.user_data_stream import UserDataStream +# ── 킬스위치 상수 ────────────────────────────────────────────────── +_FAST_KILL_STREAK = 8 # 연속 손실 N회 → 즉시 중단 +_SLOW_KILL_WINDOW = 15 # 최근 N거래 PF 산출 +_SLOW_KILL_PF_THRESHOLD = 0.75 # PF < 이 값이면 중단 +_TRADE_HISTORY_DIR = Path("data/trade_history") + class TradingBot: def __init__(self, config: Config, symbol: str = None, risk: RiskManager = None): @@ -42,11 +51,108 @@ class TradingBot: self._prev_oi: float | None = None # OI 변화율 계산용 이전 값 self._oi_history: deque = deque(maxlen=5) self._latest_ret_1: float = 0.0 + self._killed: bool = False # 킬스위치 발동 상태 + self._trade_history: list[dict] = [] # 최근 거래 이력 (net_pnl 기록) self.stream = MultiSymbolStream( symbols=[self.symbol] + config.correlation_symbols, interval="15m", on_candle=self._on_candle_closed, ) + # 부팅 시 거래 이력 복원 및 킬스위치 소급 검증 + self._restore_trade_history() + self._restore_kill_switch() + + # ── 킬스위치 ────────────────────────────────────────────────────── + + def _trade_history_path(self) -> Path: + return _TRADE_HISTORY_DIR / f"{self.symbol.lower()}.jsonl" + + def _restore_trade_history(self) -> None: + """부팅 시 파일에서 거래 이력을 복원한다.""" + path = self._trade_history_path() + if not path.exists(): + return + try: + with open(path) as f: + for line in f: + line = line.strip() + if line: + self._trade_history.append(json.loads(line)) + logger.info(f"[{self.symbol}] 거래 이력 복원: {len(self._trade_history)}건") + except Exception as e: + logger.warning(f"[{self.symbol}] 거래 이력 복원 실패: {e}") + + def _restore_kill_switch(self) -> None: + """부팅 시 .env 리셋 플래그 확인 후, 이력 기반으로 킬스위치 소급 검증.""" + reset_key = f"RESET_KILL_SWITCH_{self.symbol}" + if os.environ.get(reset_key, "").lower() == "true": + logger.info(f"[{self.symbol}] 킬스위치 수동 해제 감지 ({reset_key}=True)") + self._killed = False + return + # 소급 검증 + if self._check_kill_switch(silent=True): + logger.warning(f"[{self.symbol}] 부팅 시 킬스위치 조건 충족 — 신규 진입 차단") + + def _append_trade(self, net_pnl: float, close_reason: str) -> None: + """거래 기록을 메모리 + 파일에 추가한다.""" + record = { + "net_pnl": round(net_pnl, 4), + "reason": close_reason, + "ts": datetime.now(timezone.utc).isoformat(), + } + self._trade_history.append(record) + # 파일에 append (JSONL) + try: + _TRADE_HISTORY_DIR.mkdir(parents=True, exist_ok=True) + with open(self._trade_history_path(), "a") as f: + f.write(json.dumps(record) + "\n") + except Exception as e: + logger.warning(f"[{self.symbol}] 거래 기록 저장 실패: {e}") + + def _check_kill_switch(self, silent: bool = False) -> bool: + """킬스위치 조건을 검사하고, 발동 시 True를 반환한다. + + Fast Kill: 최근 8연속 순손실 + Slow Kill: 최근 15거래 PF < 0.75 + """ + trades = self._trade_history + if not trades: + return False + + # Fast Kill: 8연속 순손실 + if len(trades) >= _FAST_KILL_STREAK: + recent = trades[-_FAST_KILL_STREAK:] + if all(t["net_pnl"] < 0 for t in recent): + reason = f"Fast Kill ({_FAST_KILL_STREAK}연속 순손실)" + self._trigger_kill_switch(reason, silent) + return True + + # Slow Kill: 최근 15거래 PF < 0.75 + if len(trades) >= _SLOW_KILL_WINDOW: + recent = trades[-_SLOW_KILL_WINDOW:] + gross_profit = sum(t["net_pnl"] for t in recent if t["net_pnl"] > 0) + gross_loss = abs(sum(t["net_pnl"] for t in recent if t["net_pnl"] < 0)) + if gross_loss > 0: + pf = gross_profit / gross_loss + if pf < _SLOW_KILL_PF_THRESHOLD: + reason = f"Slow Kill (최근 {_SLOW_KILL_WINDOW}거래 PF={pf:.2f})" + self._trigger_kill_switch(reason, silent) + return True + + return False + + def _trigger_kill_switch(self, reason: str, silent: bool = False) -> None: + """킬스위치 발동: 상태 변경 + 알림.""" + self._killed = True + msg = ( + f"🚨 [KILL SWITCH] {self.symbol} 신규 진입 중단\n" + f"사유: {reason}\n" + f"기존 포지션 SL/TP는 정상 작동합니다.\n" + f"해제: RESET_KILL_SWITCH_{self.symbol}=True 후 봇 재시작" + ) + logger.error(msg) + if not silent: + self.notifier.notify_info(msg) async def _on_candle_closed(self, candle: dict): primary_df = self.stream.get_dataframe(self.symbol) @@ -138,6 +244,10 @@ class TradingBot: logger.warning(f"[{self.symbol}] 리스크 한도 초과 - 거래 중단") return + # 킬스위치: 신규 진입만 차단, 기존 포지션 모니터링은 계속 + if self._killed: + return + ind = Indicators(df) df_with_indicators = ind.calculate_all() raw_signal, signal_detail = ind.get_signal( @@ -295,6 +405,10 @@ class TradingBot: f"순수익={net_pnl:+.4f}, 차이={diff:+.4f} USDT" ) + # 거래 기록 저장 + 킬스위치 검사 (청산 후 항상 수행) + self._append_trade(net_pnl, close_reason) + self._check_kill_switch() + # _close_and_reenter 중이면 신규 포지션 상태를 덮어쓰지 않는다 if self._is_reentering: return @@ -356,6 +470,8 @@ class TradingBot: f"rp={realized_pnl:+.4f}, commission={commission:.4f}, " f"net_pnl={net_pnl:+.4f}" ) + self._append_trade(net_pnl, "SYNC") + self._check_kill_switch() self.current_trade_side = None self._entry_price = None self._entry_quantity = None @@ -410,6 +526,10 @@ class TradingBot: self._entry_quantity = None await self.risk.close_position(self.symbol, 0.0) if prev_side and self.symbol not in self.risk.open_positions else None + if self._killed: + logger.info(f"[{self.symbol}] 킬스위치 활성 — 재진입 건너뜀 (청산만 수행)") + return + if not await self.risk.can_open_new_position(self.symbol, signal): logger.info(f"[{self.symbol}] 최대 포지션 수 도달 — 재진입 건너뜀") return