feat: add dual-layer kill switch (Fast Kill + Slow Kill)

- Fast Kill: 8 consecutive net losses → block new entries for symbol
- Slow Kill: last 15 trades PF < 0.75 → block new entries for symbol
- Trade history persisted to data/trade_history/{symbol}.jsonl (survives restart)
- Boot-time retrospective check restores kill state from history
- Manual reset via RESET_KILL_SWITCH_{SYMBOL}=True in .env + restart
- Entry blocked, exits (SL/TP/manual) always work normally
- Discord alert on kill switch activation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-18 23:48:52 +09:00
parent f890009a92
commit 4930140b19

View File

@@ -1,5 +1,8 @@
import asyncio
import json
import os
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd
from loguru import logger
@@ -13,6 +16,12 @@ from src.ml_filter import MLFilter
from src.ml_features import build_features_aligned
from src.user_data_stream import UserDataStream
# ── 킬스위치 상수 ──────────────────────────────────────────────────
_FAST_KILL_STREAK = 8 # 연속 손실 N회 → 즉시 중단
_SLOW_KILL_WINDOW = 15 # 최근 N거래 PF 산출
_SLOW_KILL_PF_THRESHOLD = 0.75 # PF < 이 값이면 중단
_TRADE_HISTORY_DIR = Path("data/trade_history")
class TradingBot:
def __init__(self, config: Config, symbol: str = None, risk: RiskManager = None):
@@ -42,11 +51,108 @@ class TradingBot:
self._prev_oi: float | None = None # OI 변화율 계산용 이전 값
self._oi_history: deque = deque(maxlen=5)
self._latest_ret_1: float = 0.0
self._killed: bool = False # 킬스위치 발동 상태
self._trade_history: list[dict] = [] # 최근 거래 이력 (net_pnl 기록)
self.stream = MultiSymbolStream(
symbols=[self.symbol] + config.correlation_symbols,
interval="15m",
on_candle=self._on_candle_closed,
)
# 부팅 시 거래 이력 복원 및 킬스위치 소급 검증
self._restore_trade_history()
self._restore_kill_switch()
# ── 킬스위치 ──────────────────────────────────────────────────────
def _trade_history_path(self) -> Path:
return _TRADE_HISTORY_DIR / f"{self.symbol.lower()}.jsonl"
def _restore_trade_history(self) -> None:
"""부팅 시 파일에서 거래 이력을 복원한다."""
path = self._trade_history_path()
if not path.exists():
return
try:
with open(path) as f:
for line in f:
line = line.strip()
if line:
self._trade_history.append(json.loads(line))
logger.info(f"[{self.symbol}] 거래 이력 복원: {len(self._trade_history)}")
except Exception as e:
logger.warning(f"[{self.symbol}] 거래 이력 복원 실패: {e}")
def _restore_kill_switch(self) -> None:
"""부팅 시 .env 리셋 플래그 확인 후, 이력 기반으로 킬스위치 소급 검증."""
reset_key = f"RESET_KILL_SWITCH_{self.symbol}"
if os.environ.get(reset_key, "").lower() == "true":
logger.info(f"[{self.symbol}] 킬스위치 수동 해제 감지 ({reset_key}=True)")
self._killed = False
return
# 소급 검증
if self._check_kill_switch(silent=True):
logger.warning(f"[{self.symbol}] 부팅 시 킬스위치 조건 충족 — 신규 진입 차단")
def _append_trade(self, net_pnl: float, close_reason: str) -> None:
"""거래 기록을 메모리 + 파일에 추가한다."""
record = {
"net_pnl": round(net_pnl, 4),
"reason": close_reason,
"ts": datetime.now(timezone.utc).isoformat(),
}
self._trade_history.append(record)
# 파일에 append (JSONL)
try:
_TRADE_HISTORY_DIR.mkdir(parents=True, exist_ok=True)
with open(self._trade_history_path(), "a") as f:
f.write(json.dumps(record) + "\n")
except Exception as e:
logger.warning(f"[{self.symbol}] 거래 기록 저장 실패: {e}")
def _check_kill_switch(self, silent: bool = False) -> bool:
"""킬스위치 조건을 검사하고, 발동 시 True를 반환한다.
Fast Kill: 최근 8연속 순손실
Slow Kill: 최근 15거래 PF < 0.75
"""
trades = self._trade_history
if not trades:
return False
# Fast Kill: 8연속 순손실
if len(trades) >= _FAST_KILL_STREAK:
recent = trades[-_FAST_KILL_STREAK:]
if all(t["net_pnl"] < 0 for t in recent):
reason = f"Fast Kill ({_FAST_KILL_STREAK}연속 순손실)"
self._trigger_kill_switch(reason, silent)
return True
# Slow Kill: 최근 15거래 PF < 0.75
if len(trades) >= _SLOW_KILL_WINDOW:
recent = trades[-_SLOW_KILL_WINDOW:]
gross_profit = sum(t["net_pnl"] for t in recent if t["net_pnl"] > 0)
gross_loss = abs(sum(t["net_pnl"] for t in recent if t["net_pnl"] < 0))
if gross_loss > 0:
pf = gross_profit / gross_loss
if pf < _SLOW_KILL_PF_THRESHOLD:
reason = f"Slow Kill (최근 {_SLOW_KILL_WINDOW}거래 PF={pf:.2f})"
self._trigger_kill_switch(reason, silent)
return True
return False
def _trigger_kill_switch(self, reason: str, silent: bool = False) -> None:
"""킬스위치 발동: 상태 변경 + 알림."""
self._killed = True
msg = (
f"🚨 [KILL SWITCH] {self.symbol} 신규 진입 중단\n"
f"사유: {reason}\n"
f"기존 포지션 SL/TP는 정상 작동합니다.\n"
f"해제: RESET_KILL_SWITCH_{self.symbol}=True 후 봇 재시작"
)
logger.error(msg)
if not silent:
self.notifier.notify_info(msg)
async def _on_candle_closed(self, candle: dict):
primary_df = self.stream.get_dataframe(self.symbol)
@@ -138,6 +244,10 @@ class TradingBot:
logger.warning(f"[{self.symbol}] 리스크 한도 초과 - 거래 중단")
return
# 킬스위치: 신규 진입만 차단, 기존 포지션 모니터링은 계속
if self._killed:
return
ind = Indicators(df)
df_with_indicators = ind.calculate_all()
raw_signal, signal_detail = ind.get_signal(
@@ -295,6 +405,10 @@ class TradingBot:
f"순수익={net_pnl:+.4f}, 차이={diff:+.4f} USDT"
)
# 거래 기록 저장 + 킬스위치 검사 (청산 후 항상 수행)
self._append_trade(net_pnl, close_reason)
self._check_kill_switch()
# _close_and_reenter 중이면 신규 포지션 상태를 덮어쓰지 않는다
if self._is_reentering:
return
@@ -356,6 +470,8 @@ class TradingBot:
f"rp={realized_pnl:+.4f}, commission={commission:.4f}, "
f"net_pnl={net_pnl:+.4f}"
)
self._append_trade(net_pnl, "SYNC")
self._check_kill_switch()
self.current_trade_side = None
self._entry_price = None
self._entry_quantity = None
@@ -410,6 +526,10 @@ class TradingBot:
self._entry_quantity = None
await self.risk.close_position(self.symbol, 0.0) if prev_side and self.symbol not in self.risk.open_positions else None
if self._killed:
logger.info(f"[{self.symbol}] 킬스위치 활성 — 재진입 건너뜀 (청산만 수행)")
return
if not await self.risk.can_open_new_position(self.symbol, signal):
logger.info(f"[{self.symbol}] 최대 포지션 수 도달 — 재진입 건너뜀")
return