diff --git a/src/bot.py b/src/bot.py index 4eb79b7..7e6ed28 100644 --- a/src/bot.py +++ b/src/bot.py @@ -22,6 +22,7 @@ class TradingBot: self.current_trade_side: str | None = None # "LONG" | "SHORT" self._entry_price: float | None = None self._entry_quantity: float | None = None + self._is_reentering: bool = False # _close_and_reenter 중 콜백 상태 초기화 방지 self._prev_oi: float | None = None # OI 변화율 계산용 이전 값 self.stream = MultiSymbolStream( symbols=[config.symbol, "BTCUSDT", "ETHUSDT"], @@ -225,6 +226,10 @@ class TradingBot: f"순수익={net_pnl:+.4f}, 차이={diff:+.4f} USDT" ) + # _close_and_reenter 중이면 신규 포지션 상태를 덮어쓰지 않는다 + if self._is_reentering: + return + # Flat 상태로 초기화 self.current_trade_side = None self._entry_price = None @@ -249,23 +254,28 @@ class TradingBot: funding_rate: float = 0.0, ) -> None: """기존 포지션을 청산하고, ML 필터 통과 시 반대 방향으로 즉시 재진입한다.""" - await self._close_position(position) + # 재진입 플래그: User Data Stream 콜백이 신규 포지션 상태를 초기화하지 않도록 보호 + self._is_reentering = True + try: + await self._close_position(position) - if not self.risk.can_open_new_position(): - logger.info("최대 포지션 수 도달 — 재진입 건너뜀") - return - - if self.ml_filter.is_model_loaded(): - features = build_features( - df, signal, - btc_df=btc_df, eth_df=eth_df, - oi_change=oi_change, funding_rate=funding_rate, - ) - if not self.ml_filter.should_enter(features): - logger.info(f"ML 필터 차단: {signal} 재진입 무시") + if not self.risk.can_open_new_position(): + logger.info("최대 포지션 수 도달 — 재진입 건너뜀") return - await self._open_position(signal, df) + if self.ml_filter.is_model_loaded(): + features = build_features( + df, signal, + btc_df=btc_df, eth_df=eth_df, + oi_change=oi_change, funding_rate=funding_rate, + ) + if not self.ml_filter.should_enter(features): + logger.info(f"ML 필터 차단: {signal} 재진입 무시") + return + + await self._open_position(signal, df) + finally: + self._is_reentering = False async def run(self): logger.info(f"봇 시작: {self.config.symbol}, 레버리지 {self.config.leverage}x") @@ -275,7 +285,7 @@ class TradingBot: logger.info(f"기준 잔고 설정: {balance:.2f} USDT (동적 증거금 비율 기준점)") user_stream = UserDataStream( - exchange=self.exchange, + symbol=self.config.symbol, on_order_filled=self._on_position_closed, ) diff --git a/src/user_data_stream.py b/src/user_data_stream.py index d05b2b7..41e0b04 100644 --- a/src/user_data_stream.py +++ b/src/user_data_stream.py @@ -3,8 +3,7 @@ from typing import Callable from binance import AsyncClient, BinanceSocketManager from loguru import logger -_KEEPALIVE_INTERVAL = 30 * 60 # 30분 (listenKey 만료 60분의 절반) -_RECONNECT_DELAY = 5 # 재연결 대기 초 +_RECONNECT_DELAY = 5 # 재연결 대기 초 _CLOSE_ORDER_TYPES = {"TAKE_PROFIT_MARKET", "STOP_MARKET"} @@ -13,20 +12,18 @@ class UserDataStream: """ Binance Futures User Data Stream을 구독하여 주문 체결 이벤트를 처리한다. - - listenKey 30분 keepalive 백그라운드 태스크 + - python-binance BinanceSocketManager의 내장 keepalive 활용 - 네트워크 단절 시 무한 재연결 루프 - - ORDER_TRADE_UPDATE 이벤트에서 청산 주문만 필터링하여 콜백 호출 + - ORDER_TRADE_UPDATE 이벤트에서 지정 심볼의 청산 주문만 필터링하여 콜백 호출 """ def __init__( self, - exchange, # BinanceFuturesClient 인스턴스 + symbol: str, # 감시할 심볼 (예: "XRPUSDT") on_order_filled: Callable, # bot._on_position_closed 콜백 ): - self._exchange = exchange + self._symbol = symbol.upper() self._on_order_filled = on_order_filled - self._listen_key: str | None = None - self._keepalive_task: asyncio.Task | None = None async def start(self, api_key: str, api_secret: str) -> None: """User Data Stream 메인 루프 — 봇 종료 시까지 실행.""" @@ -41,30 +38,16 @@ class UserDataStream: await client.close_connection() async def _run_loop(self, bm: BinanceSocketManager) -> None: - """listenKey 발급 → 연결 → 재연결 무한 루프.""" + """연결 → 재연결 무한 루프. BinanceSocketManager가 listenKey keepalive를 내부 처리한다.""" while True: try: - self._listen_key = await self._exchange.create_listen_key() - logger.info(f"User Data Stream listenKey 발급: {self._listen_key[:8]}...") - - self._keepalive_task = asyncio.create_task( - self._keepalive_loop(self._listen_key) - ) - - async with bm.futures_user_socket(self._listen_key) as stream: - logger.info("User Data Stream 연결 완료") + async with bm.futures_user_socket() as stream: + logger.info(f"User Data Stream 연결 완료 (심볼 필터: {self._symbol})") async for msg in stream: await self._handle_message(msg) except asyncio.CancelledError: logger.info("User Data Stream 정상 종료") - if self._listen_key: - try: - await self._exchange.delete_listen_key(self._listen_key) - except Exception: - pass - if self._keepalive_task: - self._keepalive_task.cancel() raise except Exception as e: @@ -72,22 +55,8 @@ class UserDataStream: f"User Data Stream 끊김: {e} — " f"{_RECONNECT_DELAY}초 후 재연결" ) - if self._keepalive_task: - self._keepalive_task.cancel() - self._keepalive_task = None await asyncio.sleep(_RECONNECT_DELAY) - async def _keepalive_loop(self, listen_key: str) -> None: - """30분마다 listenKey를 갱신한다.""" - while True: - await asyncio.sleep(_KEEPALIVE_INTERVAL) - try: - await self._exchange.keepalive_listen_key(listen_key) - logger.debug("listenKey 갱신 완료") - except Exception as e: - logger.warning(f"listenKey 갱신 실패: {e} — 재연결 루프가 처리") - break - async def _handle_message(self, msg: dict) -> None: """ORDER_TRADE_UPDATE 이벤트에서 청산 주문을 필터링하여 콜백을 호출한다.""" if msg.get("e") != "ORDER_TRADE_UPDATE": @@ -95,6 +64,10 @@ class UserDataStream: order = msg.get("o", {}) + # 심볼 필터링: 봇이 관리하는 심볼만 처리 + if order.get("s", "") != self._symbol: + return + # x: Execution Type, X: Order Status if order.get("x") != "TRADE" or order.get("X") != "FILLED": return