fix: remove manual listenKey mgmt, add symbol filter, fix reenter race condition

Made-with: Cursor
This commit is contained in:
21in7
2026-03-02 16:31:40 +09:00
parent 6237efe4d3
commit 05ae88dc61
2 changed files with 37 additions and 54 deletions

View File

@@ -3,8 +3,7 @@ from typing import Callable
from binance import AsyncClient, BinanceSocketManager
from loguru import logger
_KEEPALIVE_INTERVAL = 30 * 60 # 30분 (listenKey 만료 60분의 절반)
_RECONNECT_DELAY = 5 # 재연결 대기 초
_RECONNECT_DELAY = 5 # 재연결 대기 초
_CLOSE_ORDER_TYPES = {"TAKE_PROFIT_MARKET", "STOP_MARKET"}
@@ -13,20 +12,18 @@ class UserDataStream:
"""
Binance Futures User Data Stream을 구독하여 주문 체결 이벤트를 처리한다.
- listenKey 30분 keepalive 백그라운드 태스크
- python-binance BinanceSocketManager의 내장 keepalive 활용
- 네트워크 단절 시 무한 재연결 루프
- ORDER_TRADE_UPDATE 이벤트에서 청산 주문만 필터링하여 콜백 호출
- ORDER_TRADE_UPDATE 이벤트에서 지정 심볼의 청산 주문만 필터링하여 콜백 호출
"""
def __init__(
self,
exchange, # BinanceFuturesClient 인스턴스
symbol: str, # 감시할 심볼 (예: "XRPUSDT")
on_order_filled: Callable, # bot._on_position_closed 콜백
):
self._exchange = exchange
self._symbol = symbol.upper()
self._on_order_filled = on_order_filled
self._listen_key: str | None = None
self._keepalive_task: asyncio.Task | None = None
async def start(self, api_key: str, api_secret: str) -> None:
"""User Data Stream 메인 루프 — 봇 종료 시까지 실행."""
@@ -41,30 +38,16 @@ class UserDataStream:
await client.close_connection()
async def _run_loop(self, bm: BinanceSocketManager) -> None:
"""listenKey 발급 → 연결 → 재연결 무한 루프."""
"""연결 → 재연결 무한 루프. BinanceSocketManager가 listenKey keepalive를 내부 처리한다."""
while True:
try:
self._listen_key = await self._exchange.create_listen_key()
logger.info(f"User Data Stream listenKey 발급: {self._listen_key[:8]}...")
self._keepalive_task = asyncio.create_task(
self._keepalive_loop(self._listen_key)
)
async with bm.futures_user_socket(self._listen_key) as stream:
logger.info("User Data Stream 연결 완료")
async with bm.futures_user_socket() as stream:
logger.info(f"User Data Stream 연결 완료 (심볼 필터: {self._symbol})")
async for msg in stream:
await self._handle_message(msg)
except asyncio.CancelledError:
logger.info("User Data Stream 정상 종료")
if self._listen_key:
try:
await self._exchange.delete_listen_key(self._listen_key)
except Exception:
pass
if self._keepalive_task:
self._keepalive_task.cancel()
raise
except Exception as e:
@@ -72,22 +55,8 @@ class UserDataStream:
f"User Data Stream 끊김: {e}"
f"{_RECONNECT_DELAY}초 후 재연결"
)
if self._keepalive_task:
self._keepalive_task.cancel()
self._keepalive_task = None
await asyncio.sleep(_RECONNECT_DELAY)
async def _keepalive_loop(self, listen_key: str) -> None:
"""30분마다 listenKey를 갱신한다."""
while True:
await asyncio.sleep(_KEEPALIVE_INTERVAL)
try:
await self._exchange.keepalive_listen_key(listen_key)
logger.debug("listenKey 갱신 완료")
except Exception as e:
logger.warning(f"listenKey 갱신 실패: {e} — 재연결 루프가 처리")
break
async def _handle_message(self, msg: dict) -> None:
"""ORDER_TRADE_UPDATE 이벤트에서 청산 주문을 필터링하여 콜백을 호출한다."""
if msg.get("e") != "ORDER_TRADE_UPDATE":
@@ -95,6 +64,10 @@ class UserDataStream:
order = msg.get("o", {})
# 심볼 필터링: 봇이 관리하는 심볼만 처리
if order.get("s", "") != self._symbol:
return
# x: Execution Type, X: Order Status
if order.get("x") != "TRADE" or order.get("X") != "FILLED":
return