diff --git a/src/bot.py b/src/bot.py index bb7b020..ced999a 100644 --- a/src/bot.py +++ b/src/bot.py @@ -77,7 +77,6 @@ class TradingBot: self._entry_quantity: float | None = None self._is_reentering: bool = False # _close_and_reenter 중 콜백 상태 초기화 방지 self._close_event = asyncio.Event() # 콜백 청산 완료 대기용 - self._close_handled_by_sync: bool = False # SYNC 감지 시 콜백 중복 방지 self._prev_oi: float | None = None # OI 변화율 계산용 이전 값 self._oi_history: deque = deque(maxlen=96) # z-score 윈도우(96=1일분 15분봉) self._funding_history: deque = deque(maxlen=96) @@ -200,7 +199,8 @@ class TradingBot: await self.process_candle(primary_df, btc_df=btc_df, eth_df=eth_df) async def _recover_position(self) -> None: - """재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구.""" + """재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구. + SL/TP 주문이 누락된 경우 ATR 기반으로 재배치한다.""" position = await self.exchange.get_position() if position is not None: amt = float(position["positionAmt"]) @@ -212,6 +212,8 @@ class TradingBot: f"[{self.symbol}] 기존 포지션 복구: {self.current_trade_side} | " f"진입가={entry:.4f} | 수량={abs(amt)}" ) + # SL/TP 주문 존재 여부 확인 후 누락 시 재배치 + await self._ensure_sl_tp_orders(position) self.notifier.notify_info( f"봇 재시작 - 기존 포지션 감지: {self.current_trade_side} " f"진입가={entry:.4f} 수량={abs(amt)}" @@ -219,6 +221,55 @@ class TradingBot: else: logger.info(f"[{self.symbol}] 기존 포지션 없음 - 신규 진입 대기") + async def _ensure_sl_tp_orders(self, position: dict) -> None: + """포지션에 SL/TP 주문이 없으면 ATR 기반으로 재배치한다.""" + try: + open_orders = await self.exchange.get_open_orders() + has_sl = any(o.get("type") == "STOP_MARKET" for o in open_orders) + has_tp = any(o.get("type") == "TAKE_PROFIT_MARKET" for o in open_orders) + if has_sl and has_tp: + return + missing = [] + if not has_sl: + missing.append("SL") + if not has_tp: + missing.append("TP") + logger.warning(f"[{self.symbol}] {'/'.join(missing)} 주문 누락 감지 — 재배치") + + # 캔들 데이터로 ATR 기반 SL/TP 계산 + primary_df = self.stream.get_dataframe(self.symbol) + if primary_df is None: + logger.warning(f"[{self.symbol}] 캔들 데이터 부족 — SL/TP 재배치 건너뜀") + return + ind = Indicators(primary_df) + df_ind = ind.calculate_all() + entry = self._entry_price + qty = self._entry_quantity + sl, tp = ind.get_atr_stop( + df_ind, self.current_trade_side, entry, + atr_sl_mult=self.strategy.atr_sl_mult, + atr_tp_mult=self.strategy.atr_tp_mult, + ) + sl_side = "SELL" if self.current_trade_side == "LONG" else "BUY" + if not has_sl: + await self.exchange.place_order( + side=sl_side, quantity=qty, + order_type="STOP_MARKET", + stop_price=self.exchange._round_price(sl), + reduce_only=True, + ) + logger.info(f"[{self.symbol}] SL 재배치: {sl:.4f}") + if not has_tp: + await self.exchange.place_order( + side=sl_side, quantity=qty, + order_type="TAKE_PROFIT_MARKET", + stop_price=self.exchange._round_price(tp), + reduce_only=True, + ) + logger.info(f"[{self.symbol}] TP 재배치: {tp:.4f}") + except Exception as e: + logger.warning(f"[{self.symbol}] SL/TP 재배치 실패: {e}") + async def _init_oi_history(self) -> None: """봇 시작 시 최근 OI 변화율 히스토리를 조회하여 deque를 채운다.""" try: @@ -311,7 +362,16 @@ class TradingBot: position = await self.exchange.get_position() if position is None and raw_signal != "HOLD": - self.current_trade_side = None + # Binance에 포지션이 없는데 로컬에 남아있으면 risk manager 동기화 + if self.current_trade_side is not None: + logger.warning( + f"[{self.symbol}] 포지션 불일치: 로컬={self.current_trade_side}, " + f"바이낸스=없음 — risk manager 동기화" + ) + await self.risk.close_position(self.symbol, 0.0) + self.current_trade_side = None + self._entry_price = None + self._entry_quantity = None if not await self.risk.can_open_new_position(self.symbol, raw_signal): logger.info(f"[{self.symbol}] 포지션 오픈 불가") return @@ -429,9 +489,9 @@ class TradingBot: exit_price: float, ) -> None: """User Data Stream에서 청산 감지 시 호출되는 콜백.""" - # SYNC 핸들러가 이미 처리한 경우 중복 기록 방지 - if self._close_handled_by_sync: - logger.debug(f"[{self.symbol}] SYNC에서 이미 처리된 청산 — 콜백 건너뜀") + # 이미 Flat 상태면 중복 처리 방지 (SYNC 또는 process_candle에서 먼저 처리됨) + if self.current_trade_side is None and not self._is_reentering: + logger.debug(f"[{self.symbol}] 이미 Flat 상태 — 콜백 건너뜀") self._close_event.set() return @@ -488,8 +548,6 @@ class TradingBot: f"[{self.symbol}] 포지션 불일치 감지: " f"봇={self.current_trade_side}, 바이낸스=포지션 없음 — 상태 동기화" ) - # 콜백 중복 방지 플래그 설정 - self._close_handled_by_sync = True # Binance income API에서 실제 PnL 조회 realized_pnl = 0.0 commission = 0.0 @@ -531,7 +589,6 @@ class TradingBot: self._entry_price = None self._entry_quantity = None self._close_event.set() - self._close_handled_by_sync = False continue except Exception as e: logger.debug(f"[{self.symbol}] 포지션 동기화 확인 실패 (무시): {e}") diff --git a/src/data_stream.py b/src/data_stream.py index bc0efd7..d500701 100644 --- a/src/data_stream.py +++ b/src/data_stream.py @@ -10,8 +10,10 @@ from loguru import logger _MIN_CANDLES_FOR_SIGNAL = 100 # 초기 구동 시 REST API로 가져올 과거 캔들 수. -# 15분봉 200개 = 50시간치 — EMA50(12.5h) 대비 4배 여유. -_PRELOAD_LIMIT = 200 +# z-score 윈도우(288) + EMA50(50) 안정화 여유분. 15분봉 300개 = 75시간. +_PRELOAD_LIMIT = 300 + +_RECONNECT_DELAY = 5 # WebSocket 재연결 대기 초 @@ -105,7 +107,7 @@ class MultiSymbolStream: self, symbols: list[str], interval: str = "15m", - buffer_size: int = 200, + buffer_size: int = 300, on_candle: Callable = None, ): self.symbols = [s.lower() for s in symbols] @@ -199,9 +201,34 @@ class MultiSymbolStream: ] logger.info(f"Combined WebSocket 시작: {streams}") try: - async with bm.futures_multiplex_socket(streams) as stream: - while True: - msg = await stream.recv() - await self.handle_message(msg) + await self._run_loop(bm, streams) finally: await client.close_connection() + + async def _run_loop(self, bm: BinanceSocketManager, streams: list[str]) -> None: + """WebSocket 연결 → 재연결 무한 루프.""" + while True: + try: + async with bm.futures_multiplex_socket(streams) as stream: + logger.info("Kline WebSocket 연결 완료") + while True: + msg = await stream.recv() + + if isinstance(msg, dict) and msg.get("e") == "error": + logger.warning( + f"Kline WebSocket 에러 수신: {msg.get('m', msg)} — 재연결" + ) + break + + await self.handle_message(msg) + + except asyncio.CancelledError: + logger.info("Kline WebSocket 정상 종료") + raise + + except Exception as e: + logger.warning( + f"Kline WebSocket 끊김: {e} — " + f"{_RECONNECT_DELAY}초 후 재연결" + ) + await asyncio.sleep(_RECONNECT_DELAY) diff --git a/src/exchange.py b/src/exchange.py index 7a1dbb6..b039df7 100644 --- a/src/exchange.py +++ b/src/exchange.py @@ -145,6 +145,14 @@ class BinanceFuturesClient: return p return None + async def get_open_orders(self) -> list[dict]: + """현재 심볼의 오픈 주문 목록을 조회한다.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + lambda: self.client.futures_get_open_orders(symbol=self.symbol), + ) + async def cancel_all_orders(self): """오픈 주문을 모두 취소한다.""" loop = asyncio.get_running_loop() diff --git a/src/ml_filter.py b/src/ml_filter.py index 7e39ea3..fe908c0 100644 --- a/src/ml_filter.py +++ b/src/ml_filter.py @@ -139,6 +139,7 @@ class MLFilter: if self._onnx_session is not None: input_name = self._onnx_session.get_inputs()[0].name X = features[FEATURE_COLS].values.astype(np.float32).reshape(1, -1) + X = np.nan_to_num(X, nan=0.0) proba = float(self._onnx_session.run(None, {input_name: X})[0][0]) else: available = [c for c in FEATURE_COLS if c in features.index]