Files
cointrader/docs/plans/2026-03-02-user-data-stream-tp-sl-detection-design.md
21in7 52affb5532 feat: implement User Data Stream for real-time TP/SL detection and PnL tracking
- Introduced User Data Stream to detect TP/SL executions in real-time.
- Added a new class `UserDataStream` for managing the stream and handling events.
- Updated `bot.py` to initialize and run the User Data Stream in parallel with the candle stream.
- Enhanced `notifier.py` to send detailed Discord notifications including estimated vs actual PnL.
- Added methods in `exchange.py` for managing listenKey lifecycle (create, keepalive, delete).
- Refactored PnL recording and notification logic to streamline handling of position closures.

Made-with: Cursor
2026-03-02 16:33:08 +09:00

9.8 KiB
Raw Blame History

User Data Stream TP/SL 감지 설계

날짜: 2026-03-02
목적: Binance Futures User Data Stream을 도입하여 TP/SL 작동을 실시간 감지하고, 순수익(Net PnL)을 기록하며, Discord에 상세 청산 알림을 전송한다.


배경 및 문제

기존 봇은 매 캔들 마감마다 get_position()을 폴링하여 포지션 소멸 여부를 확인하는 방식이었다. 이 구조의 한계:

  1. TP/SL 작동 후 최대 15분 지연 — 캔들 마감 전까지 감지 불가
  2. 청산 원인 구분 불가 — TP인지 SL인지 수동 청산인지 알 수 없음
  3. PnL 기록 누락_close_position()을 봇이 직접 호출하지 않으면 record_pnl() 미실행
  4. Discord 알림 누락 — 동일 이유로 notify_close() 미호출

선택한 접근 방식

방식 A: python-binance 내장 User Data Stream + 30분 수동 keepalive 보강

  • 기존 BinanceSocketManager 활용으로 추가 의존성 없음
  • futures_user_socket(listenKey)로 User Data Stream 연결
  • 별도 30분 keepalive 백그라운드 태스크로 안정성 보강
  • while True: try-except 무한 재연결 루프로 네트워크 단절 복구

전체 아키텍처

파일 변경 목록

파일 변경 유형 내용
src/user_data_stream.py 신규 User Data Stream 전담 클래스
src/bot.py 수정 UserDataStream 초기화, run() 병렬 실행, _on_position_closed() 콜백, _entry_price/_entry_quantity 상태 추가
src/exchange.py 수정 create_listen_key(), keepalive_listen_key(), delete_listen_key() 메서드 추가
src/notifier.py 수정 notify_close()close_reason, estimated_pnl, net_pnl 파라미터 추가
src/risk_manager.py 수정 record_pnl()이 net_pnl을 받도록 유지 (인터페이스 변경 없음)

실행 흐름

bot.run()
  └── AsyncClient 단일 인스턴스 생성
        └── asyncio.gather()
              ├── MultiSymbolStream.start(client)   ← 기존 캔들 스트림
              └── UserDataStream.start()             ← 신규
                    ├── [백그라운드] _keepalive_loop()  30분마다 PUT /listenKey
                    └── [메인루프]  while True:
                                    try:
                                      listenKey 발급
                                      futures_user_socket() 연결
                                      async for msg: _handle_message()
                                    except CancelledError: break
                                    except Exception: sleep(5) → 재연결

섹션 1: UserDataStream 클래스 (src/user_data_stream.py)

상수

KEEPALIVE_INTERVAL = 30 * 60   # 30분 (listenKey 만료 60분의 절반)
RECONNECT_DELAY    = 5         # 재연결 대기 초

listenKey 생명주기

단계 API 시점
발급 POST /fapi/v1/listenKey 연결 시작 / 재연결 시
갱신 PUT /fapi/v1/listenKey 30분마다 (백그라운드 태스크)
삭제 DELETE /fapi/v1/listenKey 봇 정상 종료 시 (CancelledError)

재연결 로직

while True:
    try:
        listen_key = await exchange.create_listen_key()
        keepalive_task = asyncio.create_task(_keepalive_loop(listen_key))
        async with bm.futures_user_socket(listen_key):
            async for msg:
                await _handle_message(msg)
    except asyncio.CancelledError:
        await exchange.delete_listen_key(listen_key)
        keepalive_task.cancel()
        break
    except Exception as e:
        logger.warning(f"User Data Stream 끊김: {e}, {RECONNECT_DELAY}초 후 재연결")
        keepalive_task.cancel()
        await asyncio.sleep(RECONNECT_DELAY)
        # while True 상단으로 돌아가 listenKey 재발급

keepalive 백그라운드 태스크

async def _keepalive_loop(listen_key: str):
    while True:
        await asyncio.sleep(KEEPALIVE_INTERVAL)
        try:
            await exchange.keepalive_listen_key(listen_key)
            logger.debug("listenKey 갱신 완료")
        except Exception:
            logger.warning("listenKey 갱신 실패 → 재연결 루프가 처리")
            break  # 재연결 루프가 새 태스크 생성

섹션 2: 이벤트 파싱 로직

페이로드 구조 (Binance Futures ORDER_TRADE_UPDATE)

주문 상세 정보는 최상위가 아닌 내부 "o" 딕셔너리에 중첩되어 있다.

{
  "e": "ORDER_TRADE_UPDATE",
  "o": {
    "x": "TRADE",          // Execution Type
    "X": "FILLED",         // Order Status
    "o": "TAKE_PROFIT_MARKET",  // Order Type
    "R": true,             // reduceOnly
    "rp": "0.48210000",    // realizedProfit
    "n": "0.02100000",     // commission
    "ap": "1.3393"         // average price (체결가)
  }
}

판단 트리

msg["e"] == "ORDER_TRADE_UPDATE"?
  └── order = msg["o"]
      order["x"] == "TRADE" AND order["X"] == "FILLED"?
        └── 청산 주문인가?
            (order["R"] == true OR float(order["rp"]) != 0
             OR order["o"] in {"TAKE_PROFIT_MARKET", "STOP_MARKET"})
              ├── NO  → 무시 (진입 주문)
              └── YES → close_reason 판별:
                          "TAKE_PROFIT_MARKET" → "TP"
                          "STOP_MARKET"        → "SL"
                          그 외               → "MANUAL"
                        net_pnl = float(rp) - abs(float(n))
                        exit_price = float(order["ap"])
                        await on_order_filled(net_pnl, close_reason, exit_price)

섹션 3: _on_position_closed() 콜백 (src/bot.py)

진입가 상태 저장

_open_position() 내부에서 진입가와 수량을 인스턴스 변수로 저장한다. 청산 시점에는 포지션이 이미 사라져 있으므로 사전 저장이 필수다.

# __init__에 추가
self._entry_price: float | None = None
self._entry_quantity: float | None = None

# _open_position() 내부에서 저장
self._entry_price = price
self._entry_quantity = quantity

예상 PnL 계산

def _calc_estimated_pnl(self, exit_price: float) -> float:
    if self._entry_price is None or self._entry_quantity is None:
        return 0.0
    if self.current_trade_side == "LONG":
        return (exit_price - self._entry_price) * self._entry_quantity
    else:  # SHORT
        return (self._entry_price - exit_price) * self._entry_quantity

콜백 전체 흐름

async def _on_position_closed(
    self,
    net_pnl: float,
    close_reason: str,   # "TP" | "SL" | "MANUAL"
    exit_price: float,
):
    estimated_pnl = self._calc_estimated_pnl(exit_price)
    diff = net_pnl - estimated_pnl  # 슬리피지 + 수수료 차이

    # RiskManager에 순수익 기록
    self.risk.record_pnl(net_pnl)

    # Discord 알림
    self.notifier.notify_close(
        symbol=self.config.symbol,
        side=self.current_trade_side or "UNKNOWN",
        close_reason=close_reason,
        exit_price=exit_price,
        estimated_pnl=estimated_pnl,
        net_pnl=net_pnl,
        diff=diff,
    )

    logger.success(
        f"포지션 청산({close_reason}): 예상={estimated_pnl:+.4f}, "
        f"순수익={net_pnl:+.4f}, 차이={diff:+.4f} USDT"
    )

    # 봇 상태 초기화 (Flat 상태로 복귀)
    self.current_trade_side = None
    self._entry_price = None
    self._entry_quantity = None

기존 _close_position() 변경

봇이 직접 청산하는 경우(_close_and_reenter)에도 User Data Stream의 ORDER_TRADE_UPDATE가 발생한다. 중복 처리 방지를 위해 _close_position()에서 notify_close()record_pnl() 호출을 제거한다. 모든 청산 후처리는 _on_position_closed() 콜백 하나로 일원화한다.


섹션 4: Discord 알림 포맷 (src/notifier.py)

notify_close() 시그니처 변경

def notify_close(
    self,
    symbol: str,
    side: str,
    close_reason: str,    # "TP" | "SL" | "MANUAL"
    exit_price: float,
    estimated_pnl: float,
    net_pnl: float,
    diff: float,          # net_pnl - estimated_pnl
) -> None:

알림 포맷

✅ [XRPUSDT] SHORT TP 청산
청산가:          `1.3393`
예상 수익:       `+0.4821 USDT`
실제 순수익:     `+0.4612 USDT`
차이(슬리피지+수수료): `-0.0209 USDT`
청산 원인 이모지
TP
SL
MANUAL 🔶

섹션 5: src/exchange.py 추가 메서드

async def create_listen_key(self) -> str:
    """POST /fapi/v1/listenKey — listenKey 신규 발급"""

async def keepalive_listen_key(self, listen_key: str) -> None:
    """PUT /fapi/v1/listenKey — listenKey 만료 연장"""

async def delete_listen_key(self, listen_key: str) -> None:
    """DELETE /fapi/v1/listenKey — listenKey 삭제 (정상 종료 시)"""

데이터 흐름 요약

Binance WebSocket
  → ORDER_TRADE_UPDATE (FILLED, reduceOnly)
  → UserDataStream._handle_message()
  → net_pnl = rp - |commission|
  → bot._on_position_closed(net_pnl, close_reason, exit_price)
      ├── estimated_pnl = (exit - entry) × qty  (봇 계산)
      ├── diff = net_pnl - estimated_pnl
      ├── risk.record_pnl(net_pnl)              → 일일 PnL 누적
      ├── notifier.notify_close(...)             → Discord 알림
      └── 상태 초기화 (current_trade_side, _entry_price, _entry_quantity = None)

제외 범위 (YAGNI)

  • DB 영구 저장 (SQLite/Postgres) — 현재 로그 기반으로 충분
  • 진입 주문 체결 알림 (TRADE + not reduceOnly) — 기존 notify_open()으로 커버
  • 부분 청산(partial fill) 처리 — 현재 봇은 전량 청산만 사용