feat: shared RiskManager with async lock, same-direction limit, per-symbol tracking

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-05 23:06:41 +09:00
parent 7aef391b69
commit 9318fb887e
2 changed files with 89 additions and 9 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
from loguru import logger
from src.config import Config
@@ -5,10 +6,11 @@ from src.config import Config
class RiskManager:
def __init__(self, config: Config, max_daily_loss_pct: float = 0.05):
self.config = config
self.max_daily_loss_pct = max_daily_loss_pct # 일일 최대 손실 5%
self.max_daily_loss_pct = max_daily_loss_pct
self.daily_pnl: float = 0.0
self.initial_balance: float = 0.0
self.open_positions: list = []
self.open_positions: dict[str, str] = {} # {symbol: side}
self._lock = asyncio.Lock()
def is_trading_allowed(self) -> bool:
"""일일 최대 손실 초과 시 거래 중단"""
@@ -22,9 +24,33 @@ class RiskManager:
return False
return True
def can_open_new_position(self) -> bool:
"""최대 동시 포지션 수 체크"""
return len(self.open_positions) < self.config.max_positions
async def can_open_new_position(self, symbol: str, side: str) -> bool:
"""포지션 오픈 가능 여부 (전체 한도 + 중복 진입 + 동일 방향 제한)"""
async with self._lock:
if len(self.open_positions) >= self.config.max_positions:
logger.info(f"최대 포지션 수 도달: {len(self.open_positions)}/{self.config.max_positions}")
return False
if symbol in self.open_positions:
logger.info(f"{symbol} 이미 포지션 보유 중")
return False
same_dir = sum(1 for s in self.open_positions.values() if s == side)
if same_dir >= self.config.max_same_direction:
logger.info(f"동일 방향({side}) 한도 도달: {same_dir}/{self.config.max_same_direction}")
return False
return True
async def register_position(self, symbol: str, side: str):
"""포지션 등록"""
async with self._lock:
self.open_positions[symbol] = side
logger.info(f"포지션 등록: {symbol} {side} (현재 {len(self.open_positions)}개)")
async def close_position(self, symbol: str, pnl: float):
"""포지션 닫기 + PnL 기록"""
async with self._lock:
self.open_positions.pop(symbol, None)
self.daily_pnl += pnl
logger.info(f"포지션 종료: {symbol}, PnL={pnl:+.4f}, 누적={self.daily_pnl:+.4f}")
def record_pnl(self, pnl: float):
self.daily_pnl += pnl
@@ -36,7 +62,7 @@ class RiskManager:
logger.info("일일 PnL 초기화")
def set_base_balance(self, balance: float) -> None:
"""봇 시작 시 기준 잔고 설정 (동적 비율 계산 기준점)"""
"""봇 시작 시 기준 잔고 설정"""
self.initial_balance = balance
def get_dynamic_margin_ratio(self, balance: float) -> float:

View File

@@ -29,10 +29,13 @@ def test_trading_allowed_normal(config):
assert rm.is_trading_allowed() is True
def test_position_size_capped(config):
@pytest.mark.asyncio
async def test_position_size_capped(config):
rm = RiskManager(config, max_daily_loss_pct=0.05)
rm.open_positions = ["pos1", "pos2", "pos3"]
assert rm.can_open_new_position() is False
await rm.register_position("XRPUSDT", "LONG")
await rm.register_position("TRXUSDT", "SHORT")
await rm.register_position("DOGEUSDT", "LONG")
assert await rm.can_open_new_position("SOLUSDT", "SHORT") is False
# --- 동적 증거금 비율 테스트 ---
@@ -81,3 +84,54 @@ def test_ratio_clamped_at_max(risk):
"""잔고가 기준보다 작아도 최대 비율(50%) 초과하지 않음"""
ratio = risk.get_dynamic_margin_ratio(5.0)
assert ratio == pytest.approx(0.50, abs=1e-6)
# --- 멀티심볼 공유 RiskManager 테스트 ---
@pytest.fixture
def shared_risk(config):
config.max_same_direction = 2
return RiskManager(config)
@pytest.mark.asyncio
async def test_can_open_new_position_async(shared_risk):
"""비동기 포지션 오픈 허용 체크."""
assert await shared_risk.can_open_new_position("XRPUSDT", "LONG") is True
@pytest.mark.asyncio
async def test_register_and_close_position(shared_risk):
"""포지션 등록 후 닫기."""
await shared_risk.register_position("XRPUSDT", "LONG")
assert "XRPUSDT" in shared_risk.open_positions
await shared_risk.close_position("XRPUSDT", pnl=1.5)
assert "XRPUSDT" not in shared_risk.open_positions
assert shared_risk.daily_pnl == 1.5
@pytest.mark.asyncio
async def test_same_symbol_blocked(shared_risk):
"""같은 심볼 중복 진입 차단."""
await shared_risk.register_position("XRPUSDT", "LONG")
assert await shared_risk.can_open_new_position("XRPUSDT", "SHORT") is False
@pytest.mark.asyncio
async def test_max_same_direction_limit(shared_risk):
"""같은 방향 2개 초과 차단."""
await shared_risk.register_position("XRPUSDT", "LONG")
await shared_risk.register_position("TRXUSDT", "LONG")
# 3번째 LONG 차단
assert await shared_risk.can_open_new_position("DOGEUSDT", "LONG") is False
# SHORT은 허용
assert await shared_risk.can_open_new_position("DOGEUSDT", "SHORT") is True
@pytest.mark.asyncio
async def test_max_positions_global_limit(shared_risk):
"""전체 포지션 수 한도 초과 차단."""
shared_risk.config.max_positions = 2
await shared_risk.register_position("XRPUSDT", "LONG")
await shared_risk.register_position("TRXUSDT", "SHORT")
assert await shared_risk.can_open_new_position("DOGEUSDT", "LONG") is False