fix: resolve 6 warning issues from code review
5. Add daily PnL reset loop — UTC midnight auto-reset via
_daily_reset_loop in main.py, prevents stale daily_pnl accumulation
6. Fix set_base_balance race condition — call once in main.py before
spawning bots, instead of each bot calling independently
7. Remove realized_pnl != 0 from close detection — prevents entry
orders with small rp values being misclassified as closes
8. Rename xrp_btc_rs/xrp_eth_rs → primary_btc_rs/primary_eth_rs —
generic column names for multi-symbol support (dataset_builder,
ml_features, and tests updated consistently)
9. Replace asyncio.get_event_loop() → get_running_loop() — fixes
DeprecationWarning on Python 3.10+
10. Parallelize candle preload — asyncio.gather for all symbols
instead of sequential REST calls, ~3x faster startup
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,7 +10,7 @@
|
||||
| 심각도 | 건수 |
|
||||
|--------|------|
|
||||
| 🔴 심각 (버그 / 실제 자금 손실 위험) | 4 (✅ 전부 수정 완료) |
|
||||
| 🟡 경고 (논리 오류 / 운영 리스크) | 6 |
|
||||
| 🟡 경고 (논리 오류 / 운영 리스크) | 6 (✅ 전부 수정 완료) |
|
||||
| 🔵 개선 (코드 품질 / 유지보수) | 5 |
|
||||
|
||||
아키텍처 설계 자체(멀티심볼 독립 인스턴스, 공유 RiskManager)는 합리적이다. 문제는 멀티심볼 확장 과정에서 공유 상태(`RiskManager`)에 대한 동시성 처리가 불완전하고, 자금 관련 계산 로직(마진 비율, PnL 폴백)에 실제 버그가 존재한다는 점이다.
|
||||
|
||||
24
main.py
24
main.py
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from src.config import Config
|
||||
@@ -9,18 +10,39 @@ from src.logger_setup import setup_logger
|
||||
load_dotenv()
|
||||
|
||||
|
||||
async def _daily_reset_loop(risk: RiskManager):
|
||||
"""매일 UTC 자정에 daily_pnl을 초기화한다."""
|
||||
while True:
|
||||
now = datetime.now(timezone.utc)
|
||||
next_midnight = (now + timedelta(days=1)).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0,
|
||||
)
|
||||
await asyncio.sleep((next_midnight - now).total_seconds())
|
||||
risk.reset_daily()
|
||||
|
||||
|
||||
async def main():
|
||||
setup_logger(log_level="INFO")
|
||||
config = Config()
|
||||
risk = RiskManager(config)
|
||||
|
||||
# 기준 잔고를 main에서 한 번만 설정 (경쟁 조건 방지)
|
||||
from src.exchange import BinanceFuturesClient
|
||||
exchange = BinanceFuturesClient(config, symbol=config.symbols[0])
|
||||
balance = await exchange.get_balance()
|
||||
risk.set_base_balance(balance)
|
||||
logger.info(f"기준 잔고 설정: {balance:.2f} USDT")
|
||||
|
||||
bots = []
|
||||
for symbol in config.symbols:
|
||||
bot = TradingBot(config, symbol=symbol, risk=risk)
|
||||
bots.append(bot)
|
||||
|
||||
logger.info(f"멀티심볼 봇 시작: {config.symbols} ({len(bots)}개 인스턴스)")
|
||||
await asyncio.gather(*[bot.run() for bot in bots])
|
||||
await asyncio.gather(
|
||||
*[bot.run() for bot in bots],
|
||||
_daily_reset_loop(risk),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -432,9 +432,6 @@ class TradingBot:
|
||||
logger.info(f"[{self.symbol}] 봇 시작, 레버리지 {self.config.leverage}x")
|
||||
await self._recover_position()
|
||||
await self._init_oi_history()
|
||||
balance = await self.exchange.get_balance()
|
||||
self.risk.set_base_balance(balance)
|
||||
logger.info(f"[{self.symbol}] 기준 잔고 설정: {balance:.2f} USDT (동적 증거금 비율 기준점)")
|
||||
|
||||
user_stream = UserDataStream(
|
||||
symbol=self.symbol,
|
||||
|
||||
@@ -161,26 +161,31 @@ class MultiSymbolStream:
|
||||
df.set_index("timestamp", inplace=True)
|
||||
return df
|
||||
|
||||
async def _preload_one(self, client: AsyncClient, symbol: str, limit: int):
|
||||
"""단일 심볼의 과거 캔들을 버퍼에 채운다."""
|
||||
logger.info(f"{symbol.upper()} 과거 캔들 {limit}개 로드 중...")
|
||||
klines = await client.futures_klines(
|
||||
symbol=symbol.upper(),
|
||||
interval=self.interval,
|
||||
limit=limit,
|
||||
)
|
||||
for k in klines[:-1]:
|
||||
self.buffers[symbol].append({
|
||||
"timestamp": k[0],
|
||||
"open": float(k[1]),
|
||||
"high": float(k[2]),
|
||||
"low": float(k[3]),
|
||||
"close": float(k[4]),
|
||||
"volume": float(k[5]),
|
||||
"is_closed": True,
|
||||
})
|
||||
logger.info(f"{symbol.upper()} {len(self.buffers[symbol])}개 로드 완료")
|
||||
|
||||
async def _preload_history(self, client: AsyncClient, limit: int = _PRELOAD_LIMIT):
|
||||
"""REST API로 모든 심볼의 과거 캔들을 버퍼에 미리 채운다."""
|
||||
for symbol in self.symbols:
|
||||
logger.info(f"{symbol.upper()} 과거 캔들 {limit}개 로드 중...")
|
||||
klines = await client.futures_klines(
|
||||
symbol=symbol.upper(),
|
||||
interval=self.interval,
|
||||
limit=limit,
|
||||
)
|
||||
for k in klines[:-1]:
|
||||
self.buffers[symbol].append({
|
||||
"timestamp": k[0],
|
||||
"open": float(k[1]),
|
||||
"high": float(k[2]),
|
||||
"low": float(k[3]),
|
||||
"close": float(k[4]),
|
||||
"volume": float(k[5]),
|
||||
"is_closed": True,
|
||||
})
|
||||
logger.info(f"{symbol.upper()} {len(self.buffers[symbol])}개 로드 완료")
|
||||
"""REST API로 모든 심볼의 과거 캔들을 병렬로 버퍼에 미리 채운다."""
|
||||
await asyncio.gather(*[
|
||||
self._preload_one(client, symbol, limit) for symbol in self.symbols
|
||||
])
|
||||
|
||||
async def start(self, api_key: str, api_secret: str):
|
||||
client = await AsyncClient.create(
|
||||
|
||||
@@ -266,15 +266,15 @@ def _calc_features_vectorized(
|
||||
eth_r3 = _align(eth_ret_3, n).astype(np.float32)
|
||||
eth_r5 = _align(eth_ret_5, n).astype(np.float32)
|
||||
|
||||
xrp_r1 = ret_1.astype(np.float32)
|
||||
xrp_btc_rs_raw = np.divide(
|
||||
xrp_r1, btc_r1,
|
||||
out=np.zeros_like(xrp_r1),
|
||||
primary_r1 = ret_1.astype(np.float32)
|
||||
primary_btc_rs_raw = np.divide(
|
||||
primary_r1, btc_r1,
|
||||
out=np.zeros_like(primary_r1),
|
||||
where=(btc_r1 != 0),
|
||||
).astype(np.float32)
|
||||
xrp_eth_rs_raw = np.divide(
|
||||
xrp_r1, eth_r1,
|
||||
out=np.zeros_like(xrp_r1),
|
||||
primary_eth_rs_raw = np.divide(
|
||||
primary_r1, eth_r1,
|
||||
out=np.zeros_like(primary_r1),
|
||||
where=(eth_r1 != 0),
|
||||
).astype(np.float32)
|
||||
|
||||
@@ -285,8 +285,8 @@ def _calc_features_vectorized(
|
||||
"eth_ret_1": _rolling_zscore(eth_r1),
|
||||
"eth_ret_3": _rolling_zscore(eth_r3),
|
||||
"eth_ret_5": _rolling_zscore(eth_r5),
|
||||
"xrp_btc_rs": _rolling_zscore(xrp_btc_rs_raw),
|
||||
"xrp_eth_rs": _rolling_zscore(xrp_eth_rs_raw),
|
||||
"primary_btc_rs": _rolling_zscore(primary_btc_rs_raw),
|
||||
"primary_eth_rs": _rolling_zscore(primary_eth_rs_raw),
|
||||
}, index=d.index)
|
||||
result = pd.concat([result, extra], axis=1)
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ class BinanceFuturesClient:
|
||||
return qty_rounded
|
||||
|
||||
async def set_leverage(self, leverage: int) -> dict:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
return await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.client.futures_change_leverage(
|
||||
@@ -82,7 +82,7 @@ class BinanceFuturesClient:
|
||||
)
|
||||
|
||||
async def get_balance(self) -> float:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
balances = await loop.run_in_executor(
|
||||
None, self.client.futures_account_balance
|
||||
)
|
||||
@@ -100,7 +100,7 @@ class BinanceFuturesClient:
|
||||
stop_price: float = None,
|
||||
reduce_only: bool = False,
|
||||
) -> dict:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
params = dict(
|
||||
symbol=self.symbol,
|
||||
@@ -123,7 +123,7 @@ class BinanceFuturesClient:
|
||||
raise
|
||||
|
||||
async def get_position(self) -> dict | None:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
positions = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.client.futures_position_information(
|
||||
@@ -137,7 +137,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def cancel_all_orders(self):
|
||||
"""오픈 주문을 모두 취소한다."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.client.futures_cancel_all_open_orders(
|
||||
@@ -147,7 +147,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def get_recent_income(self, limit: int = 5) -> list[dict]:
|
||||
"""최근 REALIZED_PNL + COMMISSION 내역을 조회한다."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
rows = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -168,7 +168,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def get_open_interest(self) -> float | None:
|
||||
"""현재 미결제약정(OI)을 조회한다. 오류 시 None 반환."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -181,7 +181,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def get_funding_rate(self) -> float | None:
|
||||
"""현재 펀딩비를 조회한다. 오류 시 None 반환."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -194,7 +194,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def get_oi_history(self, limit: int = 5) -> list[float]:
|
||||
"""최근 OI 변화율 히스토리를 조회한다 (봇 초기화용). 실패 시 빈 리스트."""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -218,7 +218,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def create_listen_key(self) -> str:
|
||||
"""POST /fapi/v1/listenKey — listenKey 신규 발급"""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.client.futures_stream_get_listen_key(),
|
||||
@@ -227,7 +227,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def keepalive_listen_key(self, listen_key: str) -> None:
|
||||
"""PUT /fapi/v1/listenKey — listenKey 만료 연장 (60분 → 리셋)"""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.client.futures_stream_keepalive(listenKey=listen_key),
|
||||
@@ -235,7 +235,7 @@ class BinanceFuturesClient:
|
||||
|
||||
async def delete_listen_key(self, listen_key: str) -> None:
|
||||
"""DELETE /fapi/v1/listenKey — listenKey 삭제 (정상 종료 시)"""
|
||||
loop = asyncio.get_event_loop()
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
|
||||
@@ -7,7 +7,7 @@ FEATURE_COLS = [
|
||||
"ret_1", "ret_3", "ret_5", "signal_strength", "side",
|
||||
"btc_ret_1", "btc_ret_3", "btc_ret_5",
|
||||
"eth_ret_1", "eth_ret_3", "eth_ret_5",
|
||||
"xrp_btc_rs", "xrp_eth_rs",
|
||||
"primary_btc_rs", "primary_eth_rs",
|
||||
# 시장 미시구조: OI 변화율(z-score), 펀딩비(z-score)
|
||||
"oi_change", "funding_rate",
|
||||
# OI 파생 피처
|
||||
@@ -28,11 +28,11 @@ def _calc_ret(closes: pd.Series, n: int) -> float:
|
||||
return (closes.iloc[-1] - prev) / prev if prev != 0 else 0.0
|
||||
|
||||
|
||||
def _calc_rs(xrp_ret: float, other_ret: float) -> float:
|
||||
"""상대강도 = xrp_ret / other_ret. 분모 0이면 0.0."""
|
||||
def _calc_rs(primary_ret: float, other_ret: float) -> float:
|
||||
"""상대강도 = primary_ret / other_ret. 분모 0이면 0.0."""
|
||||
if other_ret == 0.0:
|
||||
return 0.0
|
||||
return xrp_ret / other_ret
|
||||
return primary_ret / other_ret
|
||||
|
||||
|
||||
def _rolling_zscore_last(arr: np.ndarray, window: int = _ZSCORE_WINDOW) -> float:
|
||||
@@ -144,8 +144,8 @@ def build_features(
|
||||
"eth_ret_1": float(eth_ret_1),
|
||||
"eth_ret_3": float(eth_ret_3),
|
||||
"eth_ret_5": float(eth_ret_5),
|
||||
"xrp_btc_rs": float(_calc_rs(ret_1, btc_ret_1)),
|
||||
"xrp_eth_rs": float(_calc_rs(ret_1, eth_ret_1)),
|
||||
"primary_btc_rs": float(_calc_rs(ret_1, btc_ret_1)),
|
||||
"primary_eth_rs": float(_calc_rs(ret_1, eth_ret_1)),
|
||||
})
|
||||
|
||||
# 실시간에서 실제 값이 제공되면 사용, 없으면 0으로 채운다
|
||||
@@ -293,8 +293,8 @@ def build_features_aligned(
|
||||
"eth_ret_1": _rolling_zscore_last(eth_r1),
|
||||
"eth_ret_3": _rolling_zscore_last(eth_r3),
|
||||
"eth_ret_5": _rolling_zscore_last(eth_r5),
|
||||
"xrp_btc_rs": _rolling_zscore_last(rs_btc),
|
||||
"xrp_eth_rs": _rolling_zscore_last(rs_eth),
|
||||
"primary_btc_rs": _rolling_zscore_last(rs_btc),
|
||||
"primary_eth_rs": _rolling_zscore_last(rs_eth),
|
||||
})
|
||||
|
||||
# OI/펀딩비 z-score (실시간 값이 제공되면 히스토리 끝에 추가하여 z-score)
|
||||
|
||||
@@ -85,8 +85,8 @@ class UserDataStream:
|
||||
is_reduce = order.get("R", False)
|
||||
realized_pnl = float(order.get("rp", "0"))
|
||||
|
||||
# 청산 주문 판별: reduceOnly이거나, TP/SL 타입이거나, rp != 0
|
||||
is_close = is_reduce or order_type in _CLOSE_ORDER_TYPES or realized_pnl != 0
|
||||
# 청산 주문 판별: reduceOnly이거나 TP/SL 타입
|
||||
is_close = is_reduce or order_type in _CLOSE_ORDER_TYPES
|
||||
if not is_close:
|
||||
return
|
||||
|
||||
|
||||
@@ -203,11 +203,11 @@ def test_rs_zero_denominator():
|
||||
signal_arr = _calc_signals(d)
|
||||
feat = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
|
||||
|
||||
assert "xrp_btc_rs" in feat.columns, "xrp_btc_rs 컬럼이 있어야 함"
|
||||
assert not feat["xrp_btc_rs"].isin([np.inf, -np.inf]).any(), \
|
||||
"xrp_btc_rs에 inf가 있으면 안 됨"
|
||||
assert not feat["xrp_btc_rs"].isna().all(), \
|
||||
"xrp_btc_rs가 전부 nan이면 안 됨"
|
||||
assert "primary_btc_rs" in feat.columns, "primary_btc_rs 컬럼이 있어야 함"
|
||||
assert not feat["primary_btc_rs"].isin([np.inf, -np.inf]).any(), \
|
||||
"primary_btc_rs에 inf가 있으면 안 됨"
|
||||
assert not feat["primary_btc_rs"].isna().all(), \
|
||||
"primary_btc_rs가 전부 nan이면 안 됨"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -48,7 +48,7 @@ def test_build_features_rs_zero_when_btc_ret_zero():
|
||||
btc_df["close"] = 50000.0 # 모든 캔들 동일
|
||||
eth_df = _make_df(10, base_price=3000.0)
|
||||
features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df)
|
||||
assert features["xrp_btc_rs"] == 0.0
|
||||
assert features["primary_btc_rs"] == 0.0
|
||||
|
||||
def test_feature_cols_has_24_items():
|
||||
"""Legacy test — updated to 26 after OI derived features added."""
|
||||
|
||||
Reference in New Issue
Block a user