fix: critical production issues — WebSocket reconnect, ghost positions, ONNX NaN

- fix(data_stream): add reconnect loop to MultiSymbolStream matching UserDataStream pattern
  Prevents bot-wide crash on WebSocket disconnect (#3 Critical)

- fix(data_stream): increase buffer_size 200→300 and preload 200→300
  Ensures z-score window (288) has sufficient data (#5 Important)

- fix(bot): sync risk manager when Binance has no position but local state does
  Prevents ghost entries in open_positions blocking future trades (#1 Critical)

- fix(ml_filter): add np.nan_to_num for ONNX input to handle NaN features
  Prevents all signals being blocked during initial ~2h warmup (#2 Critical)

- fix(bot): replace _close_handled_by_sync with current_trade_side==None guard
  Eliminates race window in SYNC PnL double recording (#4 Important)

- feat(bot): add _ensure_sl_tp_orders in _recover_position
  Detects and re-places missing SL/TP orders on bot restart (#6 Important)

- feat(exchange): add get_open_orders method for SL/TP verification

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-19 23:37:47 +09:00
parent e3a78974b3
commit e648ae7ca0
4 changed files with 109 additions and 16 deletions

View File

@@ -77,7 +77,6 @@ class TradingBot:
self._entry_quantity: float | None = None
self._is_reentering: bool = False # _close_and_reenter 중 콜백 상태 초기화 방지
self._close_event = asyncio.Event() # 콜백 청산 완료 대기용
self._close_handled_by_sync: bool = False # SYNC 감지 시 콜백 중복 방지
self._prev_oi: float | None = None # OI 변화율 계산용 이전 값
self._oi_history: deque = deque(maxlen=96) # z-score 윈도우(96=1일분 15분봉)
self._funding_history: deque = deque(maxlen=96)
@@ -200,7 +199,8 @@ class TradingBot:
await self.process_candle(primary_df, btc_df=btc_df, eth_df=eth_df)
async def _recover_position(self) -> None:
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구.
SL/TP 주문이 누락된 경우 ATR 기반으로 재배치한다."""
position = await self.exchange.get_position()
if position is not None:
amt = float(position["positionAmt"])
@@ -212,6 +212,8 @@ class TradingBot:
f"[{self.symbol}] 기존 포지션 복구: {self.current_trade_side} | "
f"진입가={entry:.4f} | 수량={abs(amt)}"
)
# SL/TP 주문 존재 여부 확인 후 누락 시 재배치
await self._ensure_sl_tp_orders(position)
self.notifier.notify_info(
f"봇 재시작 - 기존 포지션 감지: {self.current_trade_side} "
f"진입가={entry:.4f} 수량={abs(amt)}"
@@ -219,6 +221,55 @@ class TradingBot:
else:
logger.info(f"[{self.symbol}] 기존 포지션 없음 - 신규 진입 대기")
async def _ensure_sl_tp_orders(self, position: dict) -> None:
"""포지션에 SL/TP 주문이 없으면 ATR 기반으로 재배치한다."""
try:
open_orders = await self.exchange.get_open_orders()
has_sl = any(o.get("type") == "STOP_MARKET" for o in open_orders)
has_tp = any(o.get("type") == "TAKE_PROFIT_MARKET" for o in open_orders)
if has_sl and has_tp:
return
missing = []
if not has_sl:
missing.append("SL")
if not has_tp:
missing.append("TP")
logger.warning(f"[{self.symbol}] {'/'.join(missing)} 주문 누락 감지 — 재배치")
# 캔들 데이터로 ATR 기반 SL/TP 계산
primary_df = self.stream.get_dataframe(self.symbol)
if primary_df is None:
logger.warning(f"[{self.symbol}] 캔들 데이터 부족 — SL/TP 재배치 건너뜀")
return
ind = Indicators(primary_df)
df_ind = ind.calculate_all()
entry = self._entry_price
qty = self._entry_quantity
sl, tp = ind.get_atr_stop(
df_ind, self.current_trade_side, entry,
atr_sl_mult=self.strategy.atr_sl_mult,
atr_tp_mult=self.strategy.atr_tp_mult,
)
sl_side = "SELL" if self.current_trade_side == "LONG" else "BUY"
if not has_sl:
await self.exchange.place_order(
side=sl_side, quantity=qty,
order_type="STOP_MARKET",
stop_price=self.exchange._round_price(sl),
reduce_only=True,
)
logger.info(f"[{self.symbol}] SL 재배치: {sl:.4f}")
if not has_tp:
await self.exchange.place_order(
side=sl_side, quantity=qty,
order_type="TAKE_PROFIT_MARKET",
stop_price=self.exchange._round_price(tp),
reduce_only=True,
)
logger.info(f"[{self.symbol}] TP 재배치: {tp:.4f}")
except Exception as e:
logger.warning(f"[{self.symbol}] SL/TP 재배치 실패: {e}")
async def _init_oi_history(self) -> None:
"""봇 시작 시 최근 OI 변화율 히스토리를 조회하여 deque를 채운다."""
try:
@@ -311,7 +362,16 @@ class TradingBot:
position = await self.exchange.get_position()
if position is None and raw_signal != "HOLD":
self.current_trade_side = None
# Binance에 포지션이 없는데 로컬에 남아있으면 risk manager 동기화
if self.current_trade_side is not None:
logger.warning(
f"[{self.symbol}] 포지션 불일치: 로컬={self.current_trade_side}, "
f"바이낸스=없음 — risk manager 동기화"
)
await self.risk.close_position(self.symbol, 0.0)
self.current_trade_side = None
self._entry_price = None
self._entry_quantity = None
if not await self.risk.can_open_new_position(self.symbol, raw_signal):
logger.info(f"[{self.symbol}] 포지션 오픈 불가")
return
@@ -429,9 +489,9 @@ class TradingBot:
exit_price: float,
) -> None:
"""User Data Stream에서 청산 감지 시 호출되는 콜백."""
# SYNC 핸들러가 이미 처리한 경우 중복 기록 방지
if self._close_handled_by_sync:
logger.debug(f"[{self.symbol}] SYNC에서 이미 처리된 청산 — 콜백 건너뜀")
# 이미 Flat 상태면 중복 처리 방지 (SYNC 또는 process_candle에서 먼저 처리됨)
if self.current_trade_side is None and not self._is_reentering:
logger.debug(f"[{self.symbol}] 이미 Flat 상태 — 콜백 건너뜀")
self._close_event.set()
return
@@ -488,8 +548,6 @@ class TradingBot:
f"[{self.symbol}] 포지션 불일치 감지: "
f"봇={self.current_trade_side}, 바이낸스=포지션 없음 — 상태 동기화"
)
# 콜백 중복 방지 플래그 설정
self._close_handled_by_sync = True
# Binance income API에서 실제 PnL 조회
realized_pnl = 0.0
commission = 0.0
@@ -531,7 +589,6 @@ class TradingBot:
self._entry_price = None
self._entry_quantity = None
self._close_event.set()
self._close_handled_by_sync = False
continue
except Exception as e:
logger.debug(f"[{self.symbol}] 포지션 동기화 확인 실패 (무시): {e}")

View File

@@ -10,8 +10,10 @@ from loguru import logger
_MIN_CANDLES_FOR_SIGNAL = 100
# 초기 구동 시 REST API로 가져올 과거 캔들 수.
# 15분봉 200개 = 50시간치 — EMA50(12.5h) 대비 4배 여유.
_PRELOAD_LIMIT = 200
# z-score 윈도우(288) + EMA50(50) 안정화 여유분. 15분봉 300개 = 75시간.
_PRELOAD_LIMIT = 300
_RECONNECT_DELAY = 5 # WebSocket 재연결 대기 초
@@ -105,7 +107,7 @@ class MultiSymbolStream:
self,
symbols: list[str],
interval: str = "15m",
buffer_size: int = 200,
buffer_size: int = 300,
on_candle: Callable = None,
):
self.symbols = [s.lower() for s in symbols]
@@ -199,9 +201,34 @@ class MultiSymbolStream:
]
logger.info(f"Combined WebSocket 시작: {streams}")
try:
async with bm.futures_multiplex_socket(streams) as stream:
while True:
msg = await stream.recv()
await self.handle_message(msg)
await self._run_loop(bm, streams)
finally:
await client.close_connection()
async def _run_loop(self, bm: BinanceSocketManager, streams: list[str]) -> None:
"""WebSocket 연결 → 재연결 무한 루프."""
while True:
try:
async with bm.futures_multiplex_socket(streams) as stream:
logger.info("Kline WebSocket 연결 완료")
while True:
msg = await stream.recv()
if isinstance(msg, dict) and msg.get("e") == "error":
logger.warning(
f"Kline WebSocket 에러 수신: {msg.get('m', msg)} — 재연결"
)
break
await self.handle_message(msg)
except asyncio.CancelledError:
logger.info("Kline WebSocket 정상 종료")
raise
except Exception as e:
logger.warning(
f"Kline WebSocket 끊김: {e}"
f"{_RECONNECT_DELAY}초 후 재연결"
)
await asyncio.sleep(_RECONNECT_DELAY)

View File

@@ -145,6 +145,14 @@ class BinanceFuturesClient:
return p
return None
async def get_open_orders(self) -> list[dict]:
"""현재 심볼의 오픈 주문 목록을 조회한다."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: self.client.futures_get_open_orders(symbol=self.symbol),
)
async def cancel_all_orders(self):
"""오픈 주문을 모두 취소한다."""
loop = asyncio.get_running_loop()

View File

@@ -139,6 +139,7 @@ class MLFilter:
if self._onnx_session is not None:
input_name = self._onnx_session.get_inputs()[0].name
X = features[FEATURE_COLS].values.astype(np.float32).reshape(1, -1)
X = np.nan_to_num(X, nan=0.0)
proba = float(self._onnx_session.run(None, {input_name: X})[0][0])
else:
available = [c for c in FEATURE_COLS if c in features.index]