feat: add testnet mode support and fix UDS order type classification

- Add BINANCE_TESTNET env var to switch between live/demo API keys
- Add KLINE_INTERVAL env var (default 15m) for configurable candle interval
- Pass testnet flag through to Exchange, DataStream, UDS, Notifier
- Add demo mode in bot: forced LONG entry with fixed 0.5% SL / 2% TP
- Fix UDS close_reason: use ot (original order type) field to correctly
  classify STOP_MARKET/TAKE_PROFIT_MARKET triggers (was MANUAL)
- Add UDS raw event logging with ot field for debugging
- Add backtest market context (BTC/ETH regime, L/S ratio per fold)
- Separate testnet trade history to data/trade_history/testnet/

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-23 13:05:38 +09:00
parent 1135efc5be
commit 0ddd1f6764
9 changed files with 511 additions and 20 deletions

View File

@@ -0,0 +1,242 @@
# Testnet UDS 검증 설계
**일자**: 2026-03-22
**상태**: 설계 완료, 구현 대기
---
## 목적
Binance Futures Testnet에서 User Data Stream(UDS)의 reconnect 동작을 검증한다. 현재 프로덕션 15분봉 설정 그대로 testnet에 연결하여, UDS 연결 → ~30분 후 reconnect → ORDER_TRADE_UPDATE 수신까지 전체 경로가 정상 작동하는지 확인한다.
**이것은 UDS 검증 전용이다.** 1분봉 전환, 125x 레버리지, ML 파이프라인 변경은 포함하지 않는다. 기존 설계(`2026-03-03-testnet-1m-125x`)는 ML OFF 확정 후 전제가 바뀌었으므로 별도 취급한다.
---
## 접근 방식
python-binance 1.0.35에서 `testnet=True` 파라미터가 REST API와 WebSocket(kline + User Data Stream) 모두 자동 라우팅한다. 별도 URL 오버라이드 불필요.
**검증된 라우팅 경로 (python-binance 소스 확인):**
- REST API: `https://testnet.binancefuture.com`
- Kline WebSocket: `wss://stream.binancefuture.com/` (`BinanceSocketManager._get_futures_socket()`에서 `self.testnet` 체크)
- User Data Stream WebSocket: `wss://stream.binancefuture.com/` (`futures_user_socket()`에서 `self.testnet` 체크)
`AsyncClient.create(testnet=True)``BinanceSocketManager(client)``client.testnet` 플래그가 자동 전파.
---
## 수정 대상 파일
| 파일 | 변경 내용 |
|------|----------|
| `src/config.py` | `testnet: bool` 필드 추가, `BINANCE_TESTNET` env var 파싱, testnet이면 testnet API key 사용 |
| `src/exchange.py` | `Client(..., testnet=config.testnet)` 전달 |
| `src/user_data_stream.py` | `AsyncClient.create(..., testnet=testnet)` 전달 |
| `src/data_stream.py` | `AsyncClient.create(..., testnet=testnet)` 전달 (KlineStream + MultiSymbolStream) |
| `src/notifier.py` | testnet일 때 Discord 메시지에 `[TESTNET]` 접두사 추가 |
| `src/bot.py` | testnet 플래그를 각 스트림/notifier에 전달 + trade_history 경로 분리 + 시작 시 TESTNET 경고 로그 |
### 변경하지 않는 것
- 지표 계산 (`src/indicators.py`) — 그대로
- ML 필터 (`src/ml_filter.py`) — NO_ML_FILTER=true 상태 그대로
- 학습 파이프라인 — 변경 없음
- 리스크 매니저 — 그대로
- Discord 알림 — testnet일 때 메시지에 `[TESTNET]` 접두사 추가 (아래 상세 변경 참조)
- `.env` 프로덕션 설정 — 변경 없음 (BINANCE_TESTNET 추가만)
---
## 상세 변경
### 1. Config (`src/config.py`)
```python
# 필드 추가
testnet: bool = False
# __post_init__에서:
self.testnet = os.getenv("BINANCE_TESTNET", "").lower() in ("true", "1", "yes")
if self.testnet:
self.api_key = os.getenv("BINANCE_TESTNET_API_KEY", "")
self.api_secret = os.getenv("BINANCE_TESTNET_API_SECRET", "")
else:
self.api_key = os.getenv("BINANCE_API_KEY", "")
self.api_secret = os.getenv("BINANCE_API_SECRET", "")
```
- testnet이면 `BINANCE_TESTNET_API_KEY/SECRET` 사용
- 나머지 설정(SYMBOLS, LEVERAGE 등)은 동일하게 적용
### 2. Exchange (`src/exchange.py`)
```python
# 현재:
self.client = Client(
api_key=config.api_key,
api_secret=config.api_secret,
)
# 변경:
self.client = Client(
api_key=config.api_key,
api_secret=config.api_secret,
testnet=config.testnet,
)
```
### 3. UserDataStream (`src/user_data_stream.py`)
`_run_loop()` 시그니처에 `testnet` 파라미터 추가:
```python
# start()에 testnet 파라미터 추가
async def start(self, api_key: str, api_secret: str, testnet: bool = False) -> None:
...
await self._run_loop(api_key, api_secret, testnet)
# _run_loop()에서 AsyncClient.create에 전달
async def _run_loop(self, api_key: str, api_secret: str, testnet: bool = False) -> None:
while True:
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
testnet=testnet,
)
```
### 4. DataStream (`src/data_stream.py`)
KlineStream.start()과 MultiSymbolStream.start() 모두 동일 패턴:
```python
async def start(self, api_key: str, api_secret: str, testnet: bool = False):
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
testnet=testnet,
)
```
MultiSymbolStream._run_loop()에서도 reconnect 시 AsyncClient.create에 testnet 전달.
### 5. Notifier (`src/notifier.py`)
testnet일 때 Discord 메시지에 `[TESTNET]` 접두사를 추가하여 프로덕션 알림과 구분:
```python
# Notifier.__init__()에 testnet 파라미터 추가
def __init__(self, webhook_url: str, testnet: bool = False):
self.webhook_url = webhook_url
self.testnet = testnet
# 메시지 전송 시 접두사 추가
async def _send(self, content: str):
if self.testnet:
content = f"[TESTNET] {content}"
...
```
Bot에서 Notifier 생성 시 `testnet=self.config.testnet` 전달.
### 6. Bot (`src/bot.py`)
**시작 로그에 TESTNET 명시 (warning 레벨):**
```python
async def run(self):
if self.config.testnet:
logger.warning("⚠️ TESTNET MODE ENABLED — 실제 자금이 아닌 테스트넷에서 실행 중")
...
```
**stream.start()와 user_stream.start()에 testnet 전달:**
```python
await asyncio.gather(
self.stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
testnet=self.config.testnet,
),
user_stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
testnet=self.config.testnet,
),
self._position_monitor(),
)
```
**trade_history 경로 분리:**
```python
# 현재 (line 24):
_TRADE_HISTORY_DIR = Path("data/trade_history")
# 변경 — _trade_history_path() 메서드에서 분기:
def _trade_history_path(self) -> Path:
base = Path("data/trade_history")
if self.config.testnet:
base = base / "testnet"
base.mkdir(parents=True, exist_ok=True)
return base / f"{self.symbol.lower()}.jsonl"
```
- testnet: `data/trade_history/testnet/xrpusdt.jsonl`
- production: `data/trade_history/xrpusdt.jsonl` (기존과 동일)
- Kill Switch 판정이 testnet 트레이드로 오염되지 않음
---
## .env 설정
```bash
# 기존 프로덕션 설정 유지 + 아래 추가
BINANCE_TESTNET=true # testnet 모드 활성화
BINANCE_TESTNET_API_KEY=xxx # testnet.binancefuture.com에서 발급
BINANCE_TESTNET_API_SECRET=xxx
```
- `BINANCE_TESTNET=true`를 설정하면 testnet 모드로 전환
- 프로덕션 복귀 시 `BINANCE_TESTNET=false` 또는 줄 삭제
**주의**: .env에 이미 `BINANCE_TESTNET_API_KEY`/`BINANCE_TESTNET_API_SECRET` 자리가 마련되어 있음.
---
## 검증 절차
### 1단계: Testnet API 키 발급
- `testnet.binancefuture.com` 접속 → API 키 발급
- `.env`에 설정
### 2단계: 봇 실행 + UDS 검증
```bash
# .env에 BINANCE_TESTNET=true 설정 후
python main.py
```
확인 사항:
1. 시작 로그에 testnet 표시 확인
2. User Data Stream 연결 로그 확인
3. ~30분 대기 → reconnect 발생하는지 확인
4. reconnect 후 ORDER_TRADE_UPDATE 수신되는지 확인
5. trade_history가 `data/trade_history/testnet/` 에 기록되는지 확인
### 3단계: Kill Switch 경로 확인
- testnet 트레이드가 `data/trade_history/testnet/xrpusdt.jsonl`에만 기록되는지 확인
- 프로덕션 `data/trade_history/xrpusdt.jsonl`이 변경되지 않았는지 확인
---
## 주의사항
- **테스트넷 가격은 실제 시장과 다름**: 전략 성과 판단 불가, UDS 동작 검증만 목적
- **trade_history 분리 필수**: testnet 트레이드가 프로덕션 Kill Switch를 오염시키면 안 됨
- **프로덕션 배포 시 BINANCE_TESTNET 제거 확인**: `.env``BINANCE_TESTNET=true`가 남아있으면 프로덕션이 testnet으로 연결됨

View File

@@ -20,6 +20,8 @@ import json
from datetime import datetime
import numpy as np
import pandas as pd
import pandas_ta as ta
from loguru import logger
@@ -107,6 +109,174 @@ def print_fold_table(folds: list[dict]):
print("=" * 90)
def _classify_regime(btc_return: float, btc_avg_adx: float) -> str:
"""BTC ADX와 수익률 기반 시장 레짐 분류."""
if btc_avg_adx >= 25:
return "상승 추세" if btc_return > 0 else "하락 추세"
return "횡보"
def _calc_fold_market_context(
raw_df: pd.DataFrame, test_start: str, test_end: str
) -> dict:
"""폴드 기간의 BTC/ETH 수익률과 시장 레짐 계산."""
ts_start = pd.Timestamp(test_start)
ts_end = pd.Timestamp(test_end)
idx = raw_df.index
if idx.tz is not None:
idx = idx.tz_localize(None)
if ts_start.tz is not None:
ts_start = ts_start.tz_localize(None)
if ts_end.tz is not None:
ts_end = ts_end.tz_localize(None)
fold_df = raw_df[(idx >= ts_start) & (idx < ts_end)]
if len(fold_df) < 20:
return None
# BTC return
btc_start = fold_df["close_btc"].iloc[0]
btc_end = fold_df["close_btc"].iloc[-1]
btc_return = (btc_end - btc_start) / btc_start * 100
# ETH return
eth_start = fold_df["close_eth"].iloc[0]
eth_end = fold_df["close_eth"].iloc[-1]
eth_return = (eth_end - eth_start) / eth_start * 100
# BTC ADX (period average)
adx_df = ta.adx(fold_df["high_btc"], fold_df["low_btc"], fold_df["close_btc"], length=14)
btc_avg_adx = adx_df["ADX_14"].mean()
if np.isnan(btc_avg_adx):
btc_avg_adx = 0.0
regime = _classify_regime(btc_return, btc_avg_adx)
return {
"btc_return_pct": round(btc_return, 1),
"eth_return_pct": round(eth_return, 1),
"btc_avg_adx": round(btc_avg_adx, 1),
"market_regime": regime,
}
def _load_ls_ratio(symbol: str, test_start: str, test_end: str) -> dict | None:
"""폴드 기간의 L/S ratio 평균값 로드. 데이터 없으면 None."""
path = Path(f"data/{symbol.lower()}/ls_ratio_15m.parquet")
if not path.exists():
return None
df = pd.read_parquet(path)
ts_start = pd.Timestamp(test_start)
ts_end = pd.Timestamp(test_end)
# tz 맞추기
if df["timestamp"].dt.tz is not None:
if ts_start.tz is None:
ts_start = ts_start.tz_localize("UTC")
if ts_end.tz is None:
ts_end = ts_end.tz_localize("UTC")
mask = (df["timestamp"] >= ts_start) & (df["timestamp"] < ts_end)
period_df = df[mask]
if period_df.empty:
return None
return {
"top_acct_avg": round(period_df["top_acct_ls_ratio"].mean(), 2),
"global_avg": round(period_df["global_ls_ratio"].mean(), 2),
}
def calc_market_context(folds: list[dict], symbols: list[str]) -> list[dict]:
"""각 폴드에 대한 시장 컨텍스트 계산."""
# XRP parquet에서 BTC/ETH 데이터 로드 (임베딩됨)
primary_sym = symbols[0].lower()
raw_path = Path(f"data/{primary_sym}/combined_15m.parquet")
if not raw_path.exists():
logger.warning(f"데이터 파일 없음: {raw_path}")
return []
raw_df = pd.read_parquet(raw_path)
if "close_btc" not in raw_df.columns or "close_eth" not in raw_df.columns:
logger.warning("BTC/ETH 상관 데이터 없음")
return []
contexts = []
for fold in folds:
test_start = fold.get("test_start")
test_end = fold.get("test_end")
if not test_start or not test_end:
contexts.append({"fold": fold["fold"], "market_context": None})
continue
ctx = _calc_fold_market_context(raw_df, test_start, test_end)
if ctx is None:
contexts.append({"fold": fold["fold"], "market_context": None})
continue
# L/S ratio (XRP, BTC, ETH)
ls_data = {}
for ls_sym in ["xrpusdt", "btcusdt", "ethusdt"]:
ls = _load_ls_ratio(ls_sym, test_start, test_end)
if ls:
ls_data[ls_sym.replace("usdt", "")] = ls
ctx["ls_ratio"] = ls_data if ls_data else None
contexts.append({"fold": fold["fold"], "market_context": ctx})
return contexts
def print_market_context(contexts: list[dict]):
"""시장 컨텍스트 테이블 출력."""
if not contexts:
return
# Market Regime 테이블
print("\n📊 Market Context per Fold")
print(f"{'' * 80}")
print(f" {'Fold':>4} {'BTC Return':>12} {'ETH Return':>12} {'Market Regime':<32}")
print(f"{'' * 80}")
for c in contexts:
ctx = c.get("market_context")
if ctx is None:
print(f" {c['fold']:>4} {'N/A':>12} {'N/A':>12} {'N/A':<32}")
else:
regime_str = f"{ctx['market_regime']} (BTC ADX {ctx['btc_avg_adx']:.0f})"
print(f" {c['fold']:>4} {ctx['btc_return_pct']:>+11.1f}% "
f"{ctx['eth_return_pct']:>+11.1f}% {regime_str:<32}")
print(f"{'' * 80}")
# L/S Ratio 테이블 (데이터 있는 폴드가 하나라도 있으면)
has_ls = any(
c.get("market_context") and c["market_context"].get("ls_ratio")
for c in contexts
)
if has_ls:
print("\n📊 L/S Ratio Context per Fold (period avg)")
print(f"{'' * 80}")
print(f" {'Fold':>4} {'XRP Top/Global':>18} {'BTC Top/Global':>18} {'ETH Top/Global':>18}")
print(f"{'' * 80}")
for c in contexts:
ctx = c.get("market_context")
ls = ctx.get("ls_ratio") if ctx else None
parts = []
for sym in ["xrp", "btc", "eth"]:
if ls and sym in ls:
parts.append(f"{ls[sym]['top_acct_avg']:.2f} / {ls[sym]['global_avg']:.2f}")
else:
parts.append("N/A")
print(f" {c['fold']:>4} {parts[0]:>18} {parts[1]:>18} {parts[2]:>18}")
print(f"{'' * 80}")
else:
print(" L/S ratio 데이터 없음 — collector 데이터 축적 후 표시됩니다")
def save_result(result: dict, cfg):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
mode = result.get("mode", "standard")
@@ -183,6 +353,11 @@ def compare_ml(symbols: list[str], args):
print_summary(result["summary"], cfg, mode="walk_forward")
if result.get("folds"):
print_fold_table(result["folds"])
# 시장 컨텍스트는 첫 번째 실행에서만 출력 (동일 데이터)
if label == "ML OFF":
contexts = calc_market_context(result["folds"], symbols)
if contexts:
print_market_context(contexts)
_print_comparison(results, symbols)
@@ -343,6 +518,12 @@ def main():
print_summary(result["summary"], cfg, mode="walk_forward")
if result.get("folds"):
print_fold_table(result["folds"])
contexts = calc_market_context(result["folds"], symbols)
if contexts:
print_market_context(contexts)
# JSON에 market_context 추가
for fold, ctx in zip(result["folds"], contexts):
fold["market_context"] = ctx.get("market_context")
save_result(result, cfg)
else:
cfg = BacktestConfig(

View File

@@ -701,6 +701,8 @@ class WalkForwardBacktester:
"fold": i + 1,
"train_period": f"{train_start.date()} ~ {train_end.date()}",
"test_period": f"{test_start.date()} ~ {test_end.date()}",
"test_start": test_start.isoformat(),
"test_end": test_end.isoformat(),
"summary": result["summary"],
})

View File

@@ -58,7 +58,7 @@ class TradingBot:
self.symbol = symbol or config.symbol
self.strategy = config.get_symbol_params(self.symbol)
self.exchange = BinanceFuturesClient(config, symbol=self.symbol)
self.notifier = DiscordNotifier(config.discord_webhook_url)
self.notifier = DiscordNotifier(config.discord_webhook_url, testnet=config.testnet)
self.risk = risk or RiskManager(config)
# 심볼별 모델 디렉토리. 없으면 기존 models/ 루트로 폴백
symbol_model_dir = Path(f"models/{self.symbol.lower()}")
@@ -88,7 +88,7 @@ class TradingBot:
self._trade_history: list[dict] = [] # 최근 거래 이력 (net_pnl 기록)
self.stream = MultiSymbolStream(
symbols=[self.symbol] + config.correlation_symbols,
interval="15m",
interval=config.kline_interval,
on_candle=self._on_candle_closed,
)
# 부팅 시 거래 이력 복원 및 킬스위치 소급 검증
@@ -98,7 +98,11 @@ class TradingBot:
# ── 킬스위치 ──────────────────────────────────────────────────────
def _trade_history_path(self) -> Path:
return _TRADE_HISTORY_DIR / f"{self.symbol.lower()}.jsonl"
base = _TRADE_HISTORY_DIR
if self.config.testnet:
base = base / "testnet"
base.mkdir(parents=True, exist_ok=True)
return base / f"{self.symbol.lower()}.jsonl"
def _restore_trade_history(self) -> None:
"""부팅 시 파일 마지막 N줄만 읽어 거래 이력을 복원한다.
@@ -327,6 +331,23 @@ class TradingBot:
return change
async def process_candle(self, df, btc_df=None, eth_df=None):
# Demo 모드: 시그널/필터 전부 우회, 포지션 없을 때만 1회 LONG 진입 (UDS 검증용)
if self.config.testnet:
ind = Indicators(df)
df_with_indicators = ind.calculate_all()
current_price = df_with_indicators["close"].iloc[-1]
# 로컬 상태 + 바이낸스 포지션 모두 체크
if self.current_trade_side is not None:
logger.info(f"[{self.symbol}] [DEMO] 포지션 보유 중 (로컬) — SL/TP 대기 | 현재가: {current_price:.4f}")
return
position = await self.exchange.get_position()
if position is not None:
logger.info(f"[{self.symbol}] [DEMO] 포지션 보유 중 (바이낸스) — SL/TP 대기 | 현재가: {current_price:.4f}")
return
logger.info(f"[{self.symbol}] [DEMO] 강제 LONG 진입 | 현재가: {current_price:.4f}")
await self._open_position("LONG", df_with_indicators)
return
self.ml_filter.check_and_reload()
# 가격 수익률 계산 (oi_price_spread용)
@@ -418,15 +439,26 @@ class TradingBot:
balance=per_symbol_balance, price=price, leverage=self.config.leverage, margin_ratio=margin_ratio
)
logger.info(f"[{self.symbol}] 포지션 크기: 잔고={per_symbol_balance:.2f}/{balance:.2f} USDT, 증거금비율={margin_ratio:.1%}, 수량={quantity}")
# df는 이미 calculate_all() 적용된 df_with_indicators이므로
# Indicators를 재생성하지 않고 ATR을 직접 사용
atr = df["atr"].iloc[-1]
if signal == "LONG":
stop_loss = price - atr * self.strategy.atr_sl_mult
take_profit = price + atr * self.strategy.atr_tp_mult
# Demo 모드: 고정 퍼센트 SL/TP (ATR이 너무 작아 즉시 트리거 방지)
if self.config.testnet:
sl_pct = 0.005 # 0.5%
tp_pct = 0.02
if signal == "LONG":
stop_loss = price * (1 - sl_pct)
take_profit = price * (1 + tp_pct)
else:
stop_loss = price * (1 + sl_pct)
take_profit = price * (1 - tp_pct)
else:
stop_loss = price + atr * self.strategy.atr_sl_mult
take_profit = price - atr * self.strategy.atr_tp_mult
# df는 이미 calculate_all() 적용된 df_with_indicators이므로
# Indicators를 재생성하지 않고 ATR을 직접 사용
atr = df["atr"].iloc[-1]
if signal == "LONG":
stop_loss = price - atr * self.strategy.atr_sl_mult
take_profit = price + atr * self.strategy.atr_tp_mult
else:
stop_loss = price + atr * self.strategy.atr_sl_mult
take_profit = price - atr * self.strategy.atr_tp_mult
notional = quantity * price
if quantity <= 0 or notional < self.exchange.MIN_NOTIONAL:
@@ -755,6 +787,9 @@ class TradingBot:
self._is_reentering = False
async def run(self):
if self.config.testnet:
logger.warning("⚠️ TESTNET MODE ENABLED — 실제 자금이 아닌 테스트넷에서 실행 중")
s = self.strategy
logger.info(
f"[{self.symbol}] 봇 시작, 레버리지 {self.config.leverage}x | "
@@ -773,10 +808,12 @@ class TradingBot:
self.stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
testnet=self.config.testnet,
),
user_stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
testnet=self.config.testnet,
),
self._position_monitor(),
)

View File

@@ -35,10 +35,18 @@ class Config:
signal_threshold: int = 3
adx_threshold: float = 25.0
volume_multiplier: float = 2.5
kline_interval: str = "15m"
testnet: bool = False
def __post_init__(self):
self.api_key = os.getenv("BINANCE_API_KEY", "")
self.api_secret = os.getenv("BINANCE_API_SECRET", "")
self.testnet = os.getenv("BINANCE_TESTNET", "").lower() in ("true", "1", "yes")
if self.testnet:
self.api_key = os.getenv("BINANCE_DEMO_API_KEY", "")
self.api_secret = os.getenv("BINANCE_DEMO_API_SECRET", "")
else:
self.api_key = os.getenv("BINANCE_API_KEY", "")
self.api_secret = os.getenv("BINANCE_API_SECRET", "")
self.symbol = os.getenv("SYMBOL", "XRPUSDT")
self.leverage = int(os.getenv("LEVERAGE", "10"))
self.discord_webhook_url = os.getenv("DISCORD_WEBHOOK_URL", "")
@@ -52,6 +60,7 @@ class Config:
self.signal_threshold = int(os.getenv("SIGNAL_THRESHOLD", "3"))
self.adx_threshold = float(os.getenv("ADX_THRESHOLD", "25"))
self.volume_multiplier = float(os.getenv("VOL_MULTIPLIER", "2.5"))
self.kline_interval = os.getenv("KLINE_INTERVAL", "15m")
# symbols: SYMBOLS 환경변수 우선, 없으면 SYMBOL에서 변환
symbols_env = os.getenv("SYMBOLS", "")

View File

@@ -77,10 +77,11 @@ class KlineStream:
})
logger.info(f"과거 캔들 {len(self.buffer)}개 로드 완료 — 즉시 신호 계산 가능")
async def start(self, api_key: str, api_secret: str):
async def start(self, api_key: str, api_secret: str, testnet: bool = False):
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
demo=testnet,
)
await self._preload_history(client)
bm = BinanceSocketManager(client)
@@ -189,10 +190,11 @@ class MultiSymbolStream:
self._preload_one(client, symbol, limit) for symbol in self.symbols
])
async def start(self, api_key: str, api_secret: str):
async def start(self, api_key: str, api_secret: str, testnet: bool = False):
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
demo=testnet,
)
await self._preload_history(client)
bm = BinanceSocketManager(client)

View File

@@ -20,6 +20,7 @@ class BinanceFuturesClient:
self.client = Client(
api_key=config.api_key,
api_secret=config.api_secret,
demo=config.testnet,
)
self._qty_precision: int | None = None
self._price_precision: int | None = None

View File

@@ -6,12 +6,15 @@ from loguru import logger
class DiscordNotifier:
"""Discord 웹훅으로 거래 알림을 전송하는 노티파이어."""
def __init__(self, webhook_url: str):
def __init__(self, webhook_url: str, testnet: bool = False):
self.webhook_url = webhook_url
self._enabled = bool(webhook_url)
self._testnet = testnet
def _send(self, content: str) -> None:
"""알림 전송. 이벤트 루프 내에서는 백그라운드 스레드로 실행하여 블로킹 방지."""
if self._testnet:
content = f"[TESTNET] {content}"
if not self._enabled:
logger.debug("Discord 웹훅 URL 미설정 - 알림 건너뜀")
return

View File

@@ -28,11 +28,11 @@ class UserDataStream:
# 부분 체결 누적용: order_id → {rp, commission}
self._partial_fills: dict[int, dict[str, float]] = {}
async def start(self, api_key: str, api_secret: str) -> None:
async def start(self, api_key: str, api_secret: str, testnet: bool = False) -> None:
"""User Data Stream 메인 루프 — 봇 종료 시까지 실행."""
await self._run_loop(api_key, api_secret)
await self._run_loop(api_key, api_secret, testnet)
async def _run_loop(self, api_key: str, api_secret: str) -> None:
async def _run_loop(self, api_key: str, api_secret: str, testnet: bool = False) -> None:
"""연결 → 재연결 무한 루프.
매 재연결마다 AsyncClient + BinanceSocketManager를 새로 생성한다.
@@ -44,6 +44,7 @@ class UserDataStream:
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
demo=testnet,
)
try:
bm = BinanceSocketManager(client)
@@ -93,12 +94,19 @@ class UserDataStream:
if order.get("s", "") != self._symbol:
return
logger.info(
f"[{self._symbol}] UDS 원본: s={order.get('s')} o={order.get('o')} "
f"ot={order.get('ot')} x={order.get('x')} X={order.get('X')} "
f"R={order.get('R')} S={order.get('S')} ap={order.get('ap')} "
f"rp={order.get('rp')}"
)
# x: Execution Type — TRADE만 처리
if order.get("x") != "TRADE":
return
order_status = order.get("X", "")
order_type = order.get("o", "")
order_type = order.get("ot", order.get("o", ""))
is_reduce = order.get("R", False)
order_id = order.get("i", 0)
@@ -107,6 +115,12 @@ class UserDataStream:
if not is_close:
return
logger.info(
f"[{self._symbol}] 청산 주문 상세: "
f"type={order_type}, status={order_status}, "
f"reduce={is_reduce}, id={order_id}"
)
fill_rp = float(order.get("rp", "0"))
fill_commission = abs(float(order.get("n", "0")))