Compare commits

...

10 Commits

Author SHA1 Message Date
21in7
3d05806155 feat: Discord 알림, 포지션 복구, 설정 개선 및 docs 추가
Made-with: Cursor
2026-03-01 15:57:08 +09:00
21in7
117fd9e6bc fix: AsyncClient.futures_klines await 누락 수정
Made-with: Cursor
2026-03-01 13:04:02 +09:00
21in7
4940de16fc feat: 봇 시작 시 과거 캔들 200개 프리로드 (즉시 신호 계산 가능)
Made-with: Cursor
2026-03-01 13:02:42 +09:00
21in7
8e5ec82244 fix: BinanceSocketManager 메서드명 수정 (futures_kline_socket → kline_futures_socket)
Made-with: Cursor
2026-03-01 12:56:53 +09:00
21in7
a90618896d feat: 엔트리포인트 및 로깅 설정 완료, 테스트 mock 개선
Made-with: Cursor
2026-03-01 12:54:21 +09:00
21in7
726e9cfd65 feat: TradingBot 메인 루프 구현 (진입/청산/손절/익절)
Made-with: Cursor
2026-03-01 12:53:40 +09:00
21in7
69b5675bfd feat: WebSocket 실시간 캔들 스트림 구현
Made-with: Cursor
2026-03-01 12:52:40 +09:00
21in7
4bab4cdba3 feat: 리스크 매니저 구현 (일일 손실 한도, 포지션 수 제한)
Made-with: Cursor
2026-03-01 12:52:05 +09:00
21in7
8bbf376425 feat: Notion 거래 이력 저장 모듈 구현
Made-with: Cursor
2026-03-01 12:51:27 +09:00
21in7
7a3dde2146 feat: 복합 기술 지표 모듈 구현 (RSI/MACD/BB/EMA/ATR/StochRSI)
Made-with: Cursor
2026-03-01 12:50:51 +09:00
20 changed files with 2779 additions and 20 deletions

View File

@@ -3,5 +3,4 @@ BINANCE_API_SECRET=
SYMBOL=XRPUSDT
LEVERAGE=10
RISK_PER_TRADE=0.02
NOTION_TOKEN=
NOTION_DATABASE_ID=
DISCORD_WEBHOOK_URL=

View File

@@ -0,0 +1,420 @@
# Discord 알림 전환 및 포지션 복구 구현 계획
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Notion 연동을 제거하고 Discord 웹훅으로 거래 알림을 전송하며, 봇 재시작 시 기존 포지션을 감지하여 정상 작동하도록 한다.
**Architecture:**
- `TradeRepository` (Notion 기반)를 `DiscordNotifier` (Discord 웹훅 기반)로 교체한다.
- 거래 상태(현재 포지션 ID 등)는 메모리 대신 로컬 JSON 파일(`state.json`)에 저장하여 재시작 후에도 복구 가능하게 한다.
- 봇 시작 시 바이낸스 API로 실제 포지션을 조회하여 `current_trade_id`를 복구한다.
**Tech Stack:** Python 3.13, httpx (Discord 웹훅 HTTP 요청), python-binance, loguru
---
## Task 1: Discord 웹훅 알림 모듈 생성
**Files:**
- Create: `src/notifier.py`
- Modify: `src/config.py`
- Modify: `.env``.env.example`
### Step 1: `.env`와 `.env.example`에 Discord 웹훅 URL 추가
`.env`에 아래 줄 추가:
```
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/YOUR_WEBHOOK_ID/YOUR_WEBHOOK_TOKEN
```
`.env.example`에 아래 줄 추가:
```
DISCORD_WEBHOOK_URL=
```
### Step 2: `src/config.py`에 `discord_webhook_url` 필드 추가
`notion_token`, `notion_database_id` 필드를 제거하고 `discord_webhook_url` 추가:
```python
# src/config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
api_key: str = ""
api_secret: str = ""
symbol: str = "XRPUSDT"
leverage: int = 10
risk_per_trade: float = 0.02
max_positions: int = 3
stop_loss_pct: float = 0.015
take_profit_pct: float = 0.045
trailing_stop_pct: float = 0.01
discord_webhook_url: str = ""
def __post_init__(self):
self.api_key = os.getenv("BINANCE_API_KEY", "")
self.api_secret = os.getenv("BINANCE_API_SECRET", "")
self.symbol = os.getenv("SYMBOL", "XRPUSDT")
self.leverage = int(os.getenv("LEVERAGE", "10"))
self.risk_per_trade = float(os.getenv("RISK_PER_TRADE", "0.02"))
self.discord_webhook_url = os.getenv("DISCORD_WEBHOOK_URL", "")
```
### Step 3: `src/notifier.py` 생성
```python
# src/notifier.py
import httpx
from loguru import logger
class DiscordNotifier:
"""Discord 웹훅으로 거래 알림을 전송하는 노티파이어."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self._enabled = bool(webhook_url)
def _send(self, content: str) -> None:
if not self._enabled:
logger.debug("Discord 웹훅 URL 미설정 - 알림 건너뜀")
return
try:
resp = httpx.post(
self.webhook_url,
json={"content": content},
timeout=10,
)
resp.raise_for_status()
except Exception as e:
logger.warning(f"Discord 알림 전송 실패: {e}")
def notify_open(
self,
symbol: str,
side: str,
entry_price: float,
quantity: float,
leverage: int,
stop_loss: float,
take_profit: float,
signal_data: dict = None,
) -> None:
rsi = (signal_data or {}).get("rsi", 0)
macd = (signal_data or {}).get("macd_hist", 0)
atr = (signal_data or {}).get("atr", 0)
msg = (
f"**[{symbol}] {side} 진입**\n"
f"진입가: `{entry_price:.4f}` | 수량: `{quantity}` | 레버리지: `{leverage}x`\n"
f"SL: `{stop_loss:.4f}` | TP: `{take_profit:.4f}`\n"
f"RSI: `{rsi:.2f}` | MACD Hist: `{macd:.6f}` | ATR: `{atr:.6f}`"
)
self._send(msg)
def notify_close(
self,
symbol: str,
side: str,
exit_price: float,
pnl: float,
) -> None:
emoji = "" if pnl >= 0 else ""
msg = (
f"{emoji} **[{symbol}] {side} 청산**\n"
f"청산가: `{exit_price:.4f}` | PnL: `{pnl:+.4f} USDT`"
)
self._send(msg)
def notify_info(self, message: str) -> None:
self._send(f" {message}")
```
### Step 4: 웹훅 URL 실제 값을 `.env`에 입력
Discord 서버 → 채널 설정 → 연동 → 웹훅 생성 후 URL 복사하여 `.env``DISCORD_WEBHOOK_URL=` 뒤에 붙여넣기.
---
## Task 2: `src/database.py` 제거 및 `src/bot.py` 교체
**Files:**
- Delete: `src/database.py`
- Modify: `src/bot.py`
### Step 1: `src/bot.py`에서 Notion 관련 코드를 `DiscordNotifier`로 교체
`src/bot.py` 전체를 아래로 교체:
```python
# src/bot.py
import asyncio
import os
from loguru import logger
from src.config import Config
from src.exchange import BinanceFuturesClient
from src.indicators import Indicators
from src.data_stream import KlineStream
from src.notifier import DiscordNotifier
from src.risk_manager import RiskManager
class TradingBot:
def __init__(self, config: Config):
self.config = config
self.exchange = BinanceFuturesClient(config)
self.notifier = DiscordNotifier(config.discord_webhook_url)
self.risk = RiskManager(config)
self.current_trade_side: str | None = None # "LONG" | "SHORT"
self.stream = KlineStream(
symbol=config.symbol,
interval="1m",
on_candle=self._on_candle_closed,
)
def _on_candle_closed(self, candle: dict):
df = self.stream.get_dataframe()
if df is not None:
asyncio.create_task(self.process_candle(df))
async def _recover_position(self) -> None:
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
position = await self.exchange.get_position()
if position is not None:
amt = float(position["positionAmt"])
self.current_trade_side = "LONG" if amt > 0 else "SHORT"
entry = float(position["entryPrice"])
logger.info(
f"기존 포지션 복구: {self.current_trade_side} | "
f"진입가={entry:.4f} | 수량={abs(amt)}"
)
self.notifier.notify_info(
f"봇 재시작 - 기존 포지션 감지: {self.current_trade_side} "
f"진입가={entry:.4f} 수량={abs(amt)}"
)
else:
logger.info("기존 포지션 없음 - 신규 진입 대기")
async def process_candle(self, df):
if not self.risk.is_trading_allowed():
logger.warning("리스크 한도 초과 - 거래 중단")
return
ind = Indicators(df)
df_with_indicators = ind.calculate_all()
signal = ind.get_signal(df_with_indicators)
current_price = df_with_indicators["close"].iloc[-1]
logger.info(f"신호: {signal} | 현재가: {current_price:.4f} USDT")
position = await self.exchange.get_position()
if position is None and signal != "HOLD":
self.current_trade_side = None
if not self.risk.can_open_new_position():
logger.info("최대 포지션 수 도달")
return
await self._open_position(signal, df_with_indicators)
elif position is not None:
pos_side = "LONG" if float(position["positionAmt"]) > 0 else "SHORT"
if (pos_side == "LONG" and signal == "SHORT") or \
(pos_side == "SHORT" and signal == "LONG"):
await self._close_position(position)
async def _open_position(self, signal: str, df):
balance = await self.exchange.get_balance()
price = df["close"].iloc[-1]
quantity = self.exchange.calculate_quantity(
balance=balance, price=price, leverage=self.config.leverage
)
stop_loss, take_profit = Indicators(df).get_atr_stop(df, signal, price)
notional = quantity * price
if quantity <= 0 or notional < self.exchange.MIN_NOTIONAL:
logger.warning(
f"주문 건너뜀: 명목금액 {notional:.2f} USDT < 최소 {self.exchange.MIN_NOTIONAL} USDT "
f"(잔고={balance:.2f}, 수량={quantity})"
)
return
side = "BUY" if signal == "LONG" else "SELL"
await self.exchange.set_leverage(self.config.leverage)
await self.exchange.place_order(side=side, quantity=quantity)
last_row = df.iloc[-1]
signal_snapshot = {
"rsi": float(last_row.get("rsi", 0)),
"macd_hist": float(last_row.get("macd_hist", 0)),
"atr": float(last_row.get("atr", 0)),
}
self.current_trade_side = signal
self.notifier.notify_open(
symbol=self.config.symbol,
side=signal,
entry_price=price,
quantity=quantity,
leverage=self.config.leverage,
stop_loss=stop_loss,
take_profit=take_profit,
signal_data=signal_snapshot,
)
logger.success(
f"{signal} 진입: 가격={price}, 수량={quantity}, "
f"SL={stop_loss:.4f}, TP={take_profit:.4f}"
)
sl_side = "SELL" if signal == "LONG" else "BUY"
await self.exchange.place_order(
side=sl_side,
quantity=quantity,
order_type="STOP_MARKET",
stop_price=round(stop_loss, 4),
reduce_only=True,
)
await self.exchange.place_order(
side=sl_side,
quantity=quantity,
order_type="TAKE_PROFIT_MARKET",
stop_price=round(take_profit, 4),
reduce_only=True,
)
async def _close_position(self, position: dict):
amt = abs(float(position["positionAmt"]))
side = "SELL" if float(position["positionAmt"]) > 0 else "BUY"
pos_side = "LONG" if side == "SELL" else "SHORT"
await self.exchange.cancel_all_orders()
await self.exchange.place_order(side=side, quantity=amt, reduce_only=True)
entry = float(position["entryPrice"])
mark = float(position["markPrice"])
pnl = (mark - entry) * amt if side == "SELL" else (entry - mark) * amt
self.notifier.notify_close(
symbol=self.config.symbol,
side=pos_side,
exit_price=mark,
pnl=pnl,
)
self.risk.record_pnl(pnl)
self.current_trade_side = None
logger.success(f"포지션 청산: PnL={pnl:.4f} USDT")
async def run(self):
logger.info(f"봇 시작: {self.config.symbol}, 레버리지 {self.config.leverage}x")
await self._recover_position()
await self.stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
)
```
### Step 2: `src/database.py` 삭제
```bash
rm src/database.py
```
---
## Task 3: 의존성 정리
**Files:**
- Modify: `requirements.txt` (또는 `pyproject.toml`)
### Step 1: 현재 의존성 파일 확인
```bash
cat requirements.txt
# 또는
cat pyproject.toml
```
### Step 2: `notion-client` 제거, `httpx` 추가
`requirements.txt`에서 `notion-client` 줄 삭제 후 `httpx` 추가:
```
httpx>=0.27.0
```
### Step 3: 의존성 재설치
```bash
pip install httpx
pip uninstall notion-client -y
```
또는 venv 사용 시:
```bash
.venv/bin/pip install httpx
.venv/bin/pip uninstall notion-client -y
```
---
## Task 4: 동작 검증
**Files:**
- 수정 없음 (실행 테스트)
### Step 1: 봇 시작 전 환경변수 확인
```bash
grep DISCORD_WEBHOOK_URL .env
```
예상 출력: `DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/...` (빈 값이면 알림 비활성화)
### Step 2: 봇 실행
```bash
python main.py
# 또는
.venv/bin/python main.py
```
### Step 3: 재시작 시 포지션 복구 확인
봇 실행 중 바이낸스에 포지션이 있는 경우 로그에서 아래 메시지 확인:
```
기존 포지션 복구: LONG | 진입가=X.XXXX | 수량=X.X
```
포지션이 없는 경우:
```
기존 포지션 없음 - 신규 진입 대기
```
### Step 4: Discord 알림 테스트 (선택)
Discord 채널에서 봇 재시작 알림 메시지 확인:
- 포지션 있을 때: ` 봇 재시작 - 기존 포지션 감지: LONG 진입가=X.XXXX 수량=X.X`
- 진입 시: `[XRPUSDT] LONG 진입` 메시지
- 청산 시: `✅ [XRPUSDT] LONG 청산` 메시지
### Step 5: Notion 관련 import 잔재 없는지 확인
```bash
grep -r "notion" src/ --include="*.py"
```
예상 출력: (아무것도 없음)
---
## 주요 변경 요약
| 항목 | 이전 | 이후 |
|------|------|------|
| 알림 수단 | Notion API | Discord 웹훅 |
| 거래 ID 추적 | Notion 페이지 ID | 불필요 (바이낸스 포지션 직접 조회) |
| 재시작 복구 | 없음 | `_recover_position()` 으로 바이낸스 조회 |
| 환경변수 | `NOTION_TOKEN`, `NOTION_DATABASE_ID` | `DISCORD_WEBHOOK_URL` |
| 외부 의존성 | `notion-client` | `httpx` |

View File

@@ -0,0 +1,251 @@
# Gitea 셀프호스팅 서버 업로드 구현 계획
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 현재 cointrader 프로젝트를 셀프호스팅 Gitea 서버(10.1.10.28:3000)에 업로드한다.
**Architecture:** 기존 로컬 git 저장소에 Gitea 원격 저장소를 추가하고, 미커밋 변경사항을 정리한 뒤 전체 히스토리를 push한다. `.env` 파일은 절대 포함하지 않으며, `.gitignore`가 올바르게 설정되어 있는지 확인 후 진행한다.
**Tech Stack:** git, Gitea REST API (또는 웹 UI), zsh
---
## 사전 확인 사항
### 현재 git 상태 요약
- **브랜치:** `main`
- **기존 커밋:** 10개 (b1a7632 ~ 117fd9e)
- **미커밋 변경사항 (modified):**
- `.env.example`
- `requirements.txt`
- `src/bot.py`
- `src/config.py`
- `src/exchange.py`
- **삭제된 파일:** `src/database.py`
- **추적되지 않는 파일 (untracked):**
- `docs/` (전체 디렉토리)
- `src/notifier.py`
- **`.gitignore`에 의해 제외됨:** `.env`, `__pycache__/`, `*.pyc`, `.pytest_cache/`, `logs/`, `*.log`, `.venv/`
---
## Task 1: 미커밋 변경사항 스테이징 및 커밋
**Files:**
- Modify: `.env.example`
- Modify: `requirements.txt`
- Modify: `src/bot.py`
- Modify: `src/config.py`
- Modify: `src/exchange.py`
- Delete: `src/database.py`
- Create: `src/notifier.py`
- Create: `docs/` (전체)
**Step 1: git 상태 최종 확인**
```bash
git -C /Users/gihyeon/github/cointrader status
```
Expected: modified 파일들과 untracked 파일들이 목록에 표시됨
**Step 2: 모든 변경사항 스테이징**
```bash
cd /Users/gihyeon/github/cointrader
git add -A
```
> `-A` 옵션은 수정, 삭제, 신규 파일을 모두 스테이징한다. `.env`는 `.gitignore`에 있으므로 자동 제외된다.
**Step 3: 스테이징 내용 검토 (`.env` 포함 여부 반드시 확인)**
```bash
git diff --cached --name-only
```
Expected: `.env` 파일이 목록에 **없어야** 한다. 만약 있다면 즉시 `git reset HEAD .env` 실행 후 중단.
**Step 4: 커밋**
```bash
git commit -m "feat: Discord 알림, 포지션 복구, 설정 개선 및 docs 추가"
```
Expected: `main` 브랜치에 새 커밋 생성
**Step 5: 커밋 확인**
```bash
git log --oneline -3
```
Expected: 방금 만든 커밋이 최상단에 표시됨
---
## Task 2: Gitea에 원격 저장소 생성
**Step 1: Gitea 웹 UI 접속**
브라우저에서 `http://10.1.10.28:3000` 접속 후 로그인
**Step 2: 새 저장소 생성**
- 우상단 `+` 버튼 → `New Repository` 클릭
- **Repository Name:** `cointrader`
- **Visibility:** Private (권장) 또는 Public
- **Initialize this repository:** **체크 해제** (로컬에 이미 히스토리가 있으므로 빈 저장소로 생성해야 함)
- `Create Repository` 클릭
**Step 3: 저장소 URL 확인**
생성 후 표시되는 URL 메모:
```
http://10.1.10.28:3000/<사용자명>/cointrader.git
```
---
## Task 3: 로컬 저장소에 Gitea 원격 추가 및 Push
**Step 1: 현재 원격 저장소 확인**
```bash
cd /Users/gihyeon/github/cointrader
git remote -v
```
Expected: 아무것도 없거나 기존 origin이 있을 수 있음
**Step 2: Gitea 원격 추가**
기존 origin이 없는 경우:
```bash
git remote add origin http://10.1.10.28:3000/<사용자명>/cointrader.git
```
기존 origin이 있는 경우 (다른 URL):
```bash
git remote set-url origin http://10.1.10.28:3000/<사용자명>/cointrader.git
```
> `<사용자명>`은 Gitea 로그인 계정명으로 교체
**Step 3: 원격 추가 확인**
```bash
git remote -v
```
Expected:
```
origin http://10.1.10.28:3000/<사용자명>/cointrader.git (fetch)
origin http://10.1.10.28:3000/<사용자명>/cointrader.git (push)
```
**Step 4: main 브랜치 push**
```bash
git push -u origin main
```
> Gitea 계정의 사용자명과 비밀번호(또는 액세스 토큰)를 입력하라는 프롬프트가 나타남
Expected:
```
Enumerating objects: ...
Counting objects: ...
Writing objects: 100% ...
Branch 'main' set up to track remote branch 'main' from 'origin'.
```
**Step 5: Push 결과 확인**
```bash
git log --oneline origin/main
```
Expected: 로컬 커밋 히스토리와 동일하게 표시됨
---
## Task 4: Gitea 웹 UI에서 업로드 검증
**Step 1: 브라우저에서 저장소 확인**
`http://10.1.10.28:3000/<사용자명>/cointrader` 접속
**Step 2: 파일 목록 확인**
다음 파일/폴더가 있어야 함:
- `src/` (bot.py, config.py, exchange.py, notifier.py, indicators.py, risk_manager.py, logger_setup.py, data_stream.py, config.py 등)
- `tests/`
- `docs/`
- `main.py`
- `requirements.txt`
- `.env.example`
- `.gitignore`
다음 파일이 **없어야** 함:
- `.env`
- `__pycache__/`
- `.venv/`
- `logs/`
**Step 3: 커밋 히스토리 확인**
Gitea UI에서 `Commits` 탭 클릭 → 11개 커밋이 모두 표시되는지 확인
---
## 선택 사항: SSH 키 설정 (비밀번호 없이 push하려면)
매번 비밀번호 입력이 번거롭다면 SSH 키를 등록할 수 있다.
**Step 1: SSH 키 생성 (없는 경우)**
```bash
ssh-keygen -t ed25519 -C "cointrader@gitea" -f ~/.ssh/id_gitea
```
**Step 2: 공개 키 복사**
```bash
cat ~/.ssh/id_gitea.pub
```
**Step 3: Gitea에 SSH 키 등록**
Gitea 웹 UI → 우상단 프로필 → `Settings``SSH / GPG Keys``Add Key` → 공개 키 붙여넣기
**Step 4: SSH config 설정**
```bash
cat >> ~/.ssh/config << 'EOF'
Host gitea-local
HostName 10.1.10.28
Port 22
User git
IdentityFile ~/.ssh/id_gitea
EOF
```
**Step 5: 원격 URL을 SSH로 변경**
```bash
git remote set-url origin git@gitea-local:<사용자명>/cointrader.git
```
---
## 트러블슈팅
| 문제 | 원인 | 해결 |
|------|------|------|
| `Connection refused` | Gitea 서버 미실행 또는 방화벽 | `http://10.1.10.28:3000` 접속 가능한지 브라우저로 먼저 확인 |
| `Repository not found` | 저장소 미생성 또는 URL 오타 | Task 2 재확인, URL의 사용자명 확인 |
| `Authentication failed` | 잘못된 계정 정보 | Gitea 웹 UI 로그인 테스트 후 동일 계정 사용 |
| `non-fast-forward` | 원격에 이미 커밋 존재 | `git push --force origin main` (단, 원격 데이터 덮어씌워짐 주의) |
| `.env` 파일이 push됨 | `.gitignore` 미적용 | `git rm --cached .env && git commit -m "chore: remove .env from tracking"` |

File diff suppressed because it is too large Load Diff

18
main.py Normal file
View File

@@ -0,0 +1,18 @@
import asyncio
from dotenv import load_dotenv
from src.config import Config
from src.bot import TradingBot
from src.logger_setup import setup_logger
load_dotenv()
async def main():
setup_logger(log_level="INFO")
config = Config()
bot = TradingBot(config)
await bot.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -2,7 +2,7 @@ python-binance==1.0.19
pandas>=2.2.0
pandas-ta==0.4.71b0
python-dotenv==1.0.0
notion-client==2.2.1
httpx>=0.27.0
pytest>=8.1.0
pytest-asyncio>=0.24.0
aiohttp==3.9.3

159
src/bot.py Normal file
View File

@@ -0,0 +1,159 @@
import asyncio
from loguru import logger
from src.config import Config
from src.exchange import BinanceFuturesClient
from src.indicators import Indicators
from src.data_stream import KlineStream
from src.notifier import DiscordNotifier
from src.risk_manager import RiskManager
class TradingBot:
def __init__(self, config: Config):
self.config = config
self.exchange = BinanceFuturesClient(config)
self.notifier = DiscordNotifier(config.discord_webhook_url)
self.risk = RiskManager(config)
self.current_trade_side: str | None = None # "LONG" | "SHORT"
self.stream = KlineStream(
symbol=config.symbol,
interval="1m",
on_candle=self._on_candle_closed,
)
def _on_candle_closed(self, candle: dict):
df = self.stream.get_dataframe()
if df is not None:
asyncio.create_task(self.process_candle(df))
async def _recover_position(self) -> None:
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
position = await self.exchange.get_position()
if position is not None:
amt = float(position["positionAmt"])
self.current_trade_side = "LONG" if amt > 0 else "SHORT"
entry = float(position["entryPrice"])
logger.info(
f"기존 포지션 복구: {self.current_trade_side} | "
f"진입가={entry:.4f} | 수량={abs(amt)}"
)
self.notifier.notify_info(
f"봇 재시작 - 기존 포지션 감지: {self.current_trade_side} "
f"진입가={entry:.4f} 수량={abs(amt)}"
)
else:
logger.info("기존 포지션 없음 - 신규 진입 대기")
async def process_candle(self, df):
if not self.risk.is_trading_allowed():
logger.warning("리스크 한도 초과 - 거래 중단")
return
ind = Indicators(df)
df_with_indicators = ind.calculate_all()
signal = ind.get_signal(df_with_indicators)
current_price = df_with_indicators["close"].iloc[-1]
logger.info(f"신호: {signal} | 현재가: {current_price:.4f} USDT")
position = await self.exchange.get_position()
if position is None and signal != "HOLD":
self.current_trade_side = None
if not self.risk.can_open_new_position():
logger.info("최대 포지션 수 도달")
return
await self._open_position(signal, df_with_indicators)
elif position is not None:
pos_side = "LONG" if float(position["positionAmt"]) > 0 else "SHORT"
if (pos_side == "LONG" and signal == "SHORT") or \
(pos_side == "SHORT" and signal == "LONG"):
await self._close_position(position)
async def _open_position(self, signal: str, df):
balance = await self.exchange.get_balance()
price = df["close"].iloc[-1]
quantity = self.exchange.calculate_quantity(
balance=balance, price=price, leverage=self.config.leverage
)
stop_loss, take_profit = Indicators(df).get_atr_stop(df, signal, price)
notional = quantity * price
if quantity <= 0 or notional < self.exchange.MIN_NOTIONAL:
logger.warning(
f"주문 건너뜀: 명목금액 {notional:.2f} USDT < 최소 {self.exchange.MIN_NOTIONAL} USDT "
f"(잔고={balance:.2f}, 수량={quantity})"
)
return
side = "BUY" if signal == "LONG" else "SELL"
await self.exchange.set_leverage(self.config.leverage)
await self.exchange.place_order(side=side, quantity=quantity)
last_row = df.iloc[-1]
signal_snapshot = {
"rsi": float(last_row.get("rsi", 0)),
"macd_hist": float(last_row.get("macd_hist", 0)),
"atr": float(last_row.get("atr", 0)),
}
self.current_trade_side = signal
self.notifier.notify_open(
symbol=self.config.symbol,
side=signal,
entry_price=price,
quantity=quantity,
leverage=self.config.leverage,
stop_loss=stop_loss,
take_profit=take_profit,
signal_data=signal_snapshot,
)
logger.success(
f"{signal} 진입: 가격={price}, 수량={quantity}, "
f"SL={stop_loss:.4f}, TP={take_profit:.4f}"
)
sl_side = "SELL" if signal == "LONG" else "BUY"
await self.exchange.place_order(
side=sl_side,
quantity=quantity,
order_type="STOP_MARKET",
stop_price=round(stop_loss, 4),
reduce_only=True,
)
await self.exchange.place_order(
side=sl_side,
quantity=quantity,
order_type="TAKE_PROFIT_MARKET",
stop_price=round(take_profit, 4),
reduce_only=True,
)
async def _close_position(self, position: dict):
amt = abs(float(position["positionAmt"]))
side = "SELL" if float(position["positionAmt"]) > 0 else "BUY"
pos_side = "LONG" if side == "SELL" else "SHORT"
await self.exchange.cancel_all_orders()
await self.exchange.place_order(side=side, quantity=amt, reduce_only=True)
entry = float(position["entryPrice"])
mark = float(position["markPrice"])
pnl = (mark - entry) * amt if side == "SELL" else (entry - mark) * amt
self.notifier.notify_close(
symbol=self.config.symbol,
side=pos_side,
exit_price=mark,
pnl=pnl,
)
self.risk.record_pnl(pnl)
self.current_trade_side = None
logger.success(f"포지션 청산: PnL={pnl:.4f} USDT")
async def run(self):
logger.info(f"봇 시작: {self.config.symbol}, 레버리지 {self.config.leverage}x")
await self._recover_position()
await self.stream.start(
api_key=self.config.api_key,
api_secret=self.config.api_secret,
)

View File

@@ -16,8 +16,7 @@ class Config:
stop_loss_pct: float = 0.015 # 1.5%
take_profit_pct: float = 0.045 # 4.5% (3:1 RR)
trailing_stop_pct: float = 0.01 # 1%
notion_token: str = ""
notion_database_id: str = ""
discord_webhook_url: str = ""
def __post_init__(self):
self.api_key = os.getenv("BINANCE_API_KEY", "")
@@ -25,5 +24,4 @@ class Config:
self.symbol = os.getenv("SYMBOL", "XRPUSDT")
self.leverage = int(os.getenv("LEVERAGE", "10"))
self.risk_per_trade = float(os.getenv("RISK_PER_TRADE", "0.02"))
self.notion_token = os.getenv("NOTION_TOKEN", "")
self.notion_database_id = os.getenv("NOTION_DATABASE_ID", "")
self.discord_webhook_url = os.getenv("DISCORD_WEBHOOK_URL", "")

86
src/data_stream.py Normal file
View File

@@ -0,0 +1,86 @@
import asyncio
from collections import deque
from typing import Callable
import pandas as pd
from binance import AsyncClient, BinanceSocketManager
from loguru import logger
class KlineStream:
def __init__(
self,
symbol: str,
interval: str = "1m",
buffer_size: int = 200,
on_candle: Callable = None,
):
self.symbol = symbol.lower()
self.interval = interval
self.buffer: deque = deque(maxlen=buffer_size)
self.on_candle = on_candle
def parse_kline(self, msg: dict) -> dict:
k = msg["k"]
return {
"timestamp": k["t"],
"open": float(k["o"]),
"high": float(k["h"]),
"low": float(k["l"]),
"close": float(k["c"]),
"volume": float(k["v"]),
"is_closed": k["x"],
}
def handle_message(self, msg: dict):
candle = self.parse_kline(msg)
if candle["is_closed"]:
self.buffer.append(candle)
if self.on_candle:
self.on_candle(candle)
def get_dataframe(self) -> pd.DataFrame | None:
if len(self.buffer) < 50:
return None
df = pd.DataFrame(list(self.buffer))
df.set_index("timestamp", inplace=True)
return df
async def _preload_history(self, client: AsyncClient, limit: int = 200):
"""REST API로 과거 캔들 데이터를 버퍼에 미리 채운다."""
logger.info(f"과거 캔들 {limit}개 로드 중...")
klines = await client.futures_klines(
symbol=self.symbol.upper(),
interval=self.interval,
limit=limit,
)
# 마지막 캔들은 아직 닫히지 않았을 수 있으므로 제외
for k in klines[:-1]:
self.buffer.append({
"timestamp": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"is_closed": True,
})
logger.info(f"과거 캔들 {len(self.buffer)}개 로드 완료 — 즉시 신호 계산 가능")
async def start(self, api_key: str, api_secret: str):
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
)
await self._preload_history(client)
bm = BinanceSocketManager(client)
stream_name = f"{self.symbol}@kline_{self.interval}"
logger.info(f"WebSocket 스트림 시작: {stream_name}")
try:
async with bm.kline_futures_socket(
symbol=self.symbol.upper(), interval=self.interval
) as stream:
while True:
msg = await stream.recv()
self.handle_message(msg)
finally:
await client.close_connection()

View File

@@ -13,12 +13,20 @@ class BinanceFuturesClient:
api_secret=config.api_secret,
)
MIN_NOTIONAL = 5.0 # 바이낸스 선물 최소 명목금액 (USDT)
def calculate_quantity(self, balance: float, price: float, leverage: int) -> float:
"""리스크 기반 포지션 크기 계산"""
"""리스크 기반 포지션 크기 계산 (최소 명목금액 $5 보장)"""
risk_amount = balance * self.config.risk_per_trade
notional = risk_amount * leverage
if notional < self.MIN_NOTIONAL:
notional = self.MIN_NOTIONAL
quantity = notional / price
return round(quantity, 1)
# XRP는 소수점 1자리, 단 최소 명목금액 충족 여부 재확인
qty_rounded = round(quantity, 1)
if qty_rounded * price < self.MIN_NOTIONAL:
qty_rounded = round(self.MIN_NOTIONAL / price + 0.05, 1)
return qty_rounded
async def set_leverage(self, leverage: int) -> dict:
loop = asyncio.get_event_loop()

115
src/indicators.py Normal file
View File

@@ -0,0 +1,115 @@
import pandas as pd
import pandas_ta as ta
from loguru import logger
class Indicators:
"""
복합 기술 지표 계산 및 매매 신호 생성.
공격적 전략: 여러 지표가 동시에 같은 방향을 가리킬 때 진입.
"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
def calculate_all(self) -> pd.DataFrame:
df = self.df
# RSI (14)
df["rsi"] = ta.rsi(df["close"], length=14)
# MACD (12, 26, 9)
macd = ta.macd(df["close"], fast=12, slow=26, signal=9)
df["macd"] = macd["MACD_12_26_9"]
df["macd_signal"] = macd["MACDs_12_26_9"]
df["macd_hist"] = macd["MACDh_12_26_9"]
# 볼린저 밴드 (20, 2)
bb = ta.bbands(df["close"], length=20, std=2)
df["bb_upper"] = bb["BBU_20_2.0_2.0"]
df["bb_mid"] = bb["BBM_20_2.0_2.0"]
df["bb_lower"] = bb["BBL_20_2.0_2.0"]
# EMA (9, 21, 50)
df["ema9"] = ta.ema(df["close"], length=9)
df["ema21"] = ta.ema(df["close"], length=21)
df["ema50"] = ta.ema(df["close"], length=50)
# ATR (14) - 변동성 기반 손절 계산용
df["atr"] = ta.atr(df["high"], df["low"], df["close"], length=14)
# Stochastic RSI
stoch = ta.stochrsi(df["close"], length=14)
df["stoch_k"] = stoch["STOCHRSIk_14_14_3_3"]
df["stoch_d"] = stoch["STOCHRSId_14_14_3_3"]
# 거래량 이동평균
df["vol_ma20"] = ta.sma(df["volume"], length=20)
return df
def get_signal(self, df: pd.DataFrame) -> str:
"""
복합 지표 기반 매매 신호 생성.
공격적 전략: 3개 이상 지표 일치 시 진입.
"""
last = df.iloc[-1]
prev = df.iloc[-2]
long_signals = 0
short_signals = 0
# 1. RSI
if last["rsi"] < 35:
long_signals += 1
elif last["rsi"] > 65:
short_signals += 1
# 2. MACD 크로스
if prev["macd"] < prev["macd_signal"] and last["macd"] > last["macd_signal"]:
long_signals += 2 # 크로스는 강한 신호
elif prev["macd"] > prev["macd_signal"] and last["macd"] < last["macd_signal"]:
short_signals += 2
# 3. 볼린저 밴드 돌파
if last["close"] < last["bb_lower"]:
long_signals += 1
elif last["close"] > last["bb_upper"]:
short_signals += 1
# 4. EMA 정배열/역배열
if last["ema9"] > last["ema21"] > last["ema50"]:
long_signals += 1
elif last["ema9"] < last["ema21"] < last["ema50"]:
short_signals += 1
# 5. Stochastic RSI 과매도/과매수
if last["stoch_k"] < 20 and last["stoch_k"] > last["stoch_d"]:
long_signals += 1
elif last["stoch_k"] > 80 and last["stoch_k"] < last["stoch_d"]:
short_signals += 1
# 6. 거래량 확인 (신호 강화)
vol_surge = last["volume"] > last["vol_ma20"] * 1.5
threshold = 3
if long_signals >= threshold and (vol_surge or long_signals >= 4):
return "LONG"
elif short_signals >= threshold and (vol_surge or short_signals >= 4):
return "SHORT"
return "HOLD"
def get_atr_stop(
self, df: pd.DataFrame, side: str, entry_price: float
) -> tuple[float, float]:
"""ATR 기반 손절/익절 가격 반환 (stop_loss, take_profit)"""
atr = df["atr"].iloc[-1]
multiplier_sl = 1.5
multiplier_tp = 3.0
if side == "LONG":
stop_loss = entry_price - atr * multiplier_sl
take_profit = entry_price + atr * multiplier_tp
else:
stop_loss = entry_price + atr * multiplier_sl
take_profit = entry_price - atr * multiplier_tp
return stop_loss, take_profit

24
src/logger_setup.py Normal file
View File

@@ -0,0 +1,24 @@
import sys
from loguru import logger
def setup_logger(log_level: str = "INFO"):
logger.remove()
logger.add(
sys.stdout,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
),
level=log_level,
colorize=True,
)
logger.add(
"logs/bot_{time:YYYY-MM-DD}.log",
rotation="00:00",
retention="30 days",
level="DEBUG",
encoding="utf-8",
)

63
src/notifier.py Normal file
View File

@@ -0,0 +1,63 @@
import httpx
from loguru import logger
class DiscordNotifier:
"""Discord 웹훅으로 거래 알림을 전송하는 노티파이어."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
self._enabled = bool(webhook_url)
def _send(self, content: str) -> None:
if not self._enabled:
logger.debug("Discord 웹훅 URL 미설정 - 알림 건너뜀")
return
try:
resp = httpx.post(
self.webhook_url,
json={"content": content},
timeout=10,
)
resp.raise_for_status()
except Exception as e:
logger.warning(f"Discord 알림 전송 실패: {e}")
def notify_open(
self,
symbol: str,
side: str,
entry_price: float,
quantity: float,
leverage: int,
stop_loss: float,
take_profit: float,
signal_data: dict = None,
) -> None:
rsi = (signal_data or {}).get("rsi", 0)
macd = (signal_data or {}).get("macd_hist", 0)
atr = (signal_data or {}).get("atr", 0)
msg = (
f"**[{symbol}] {side} 진입**\n"
f"진입가: `{entry_price:.4f}` | 수량: `{quantity}` | 레버리지: `{leverage}x`\n"
f"SL: `{stop_loss:.4f}` | TP: `{take_profit:.4f}`\n"
f"RSI: `{rsi:.2f}` | MACD Hist: `{macd:.6f}` | ATR: `{atr:.6f}`"
)
self._send(msg)
def notify_close(
self,
symbol: str,
side: str,
exit_price: float,
pnl: float,
) -> None:
emoji = "" if pnl >= 0 else ""
msg = (
f"{emoji} **[{symbol}] {side} 청산**\n"
f"청산가: `{exit_price:.4f}` | PnL: `{pnl:+.4f} USDT`"
)
self._send(msg)
def notify_info(self, message: str) -> None:
self._send(f" {message}")

36
src/risk_manager.py Normal file
View File

@@ -0,0 +1,36 @@
from loguru import logger
from src.config import Config
class RiskManager:
def __init__(self, config: Config, max_daily_loss_pct: float = 0.05):
self.config = config
self.max_daily_loss_pct = max_daily_loss_pct # 일일 최대 손실 5%
self.daily_pnl: float = 0.0
self.initial_balance: float = 0.0
self.open_positions: list = []
def is_trading_allowed(self) -> bool:
"""일일 최대 손실 초과 시 거래 중단"""
if self.initial_balance <= 0:
return True
loss_pct = abs(self.daily_pnl) / self.initial_balance
if self.daily_pnl < 0 and loss_pct >= self.max_daily_loss_pct:
logger.warning(
f"일일 손실 한도 초과: {loss_pct:.2%} >= {self.max_daily_loss_pct:.2%}"
)
return False
return True
def can_open_new_position(self) -> bool:
"""최대 동시 포지션 수 체크"""
return len(self.open_positions) < self.config.max_positions
def record_pnl(self, pnl: float):
self.daily_pnl += pnl
logger.info(f"오늘 누적 PnL: {self.daily_pnl:.4f} USDT")
def reset_daily(self):
"""매일 자정 초기화"""
self.daily_pnl = 0.0
logger.info("일일 PnL 초기화")

60
tests/test_bot.py Normal file
View File

@@ -0,0 +1,60 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import pandas as pd
import numpy as np
import os
from src.bot import TradingBot
from src.config import Config
@pytest.fixture
def config():
os.environ.update({
"BINANCE_API_KEY": "k",
"BINANCE_API_SECRET": "s",
"SYMBOL": "XRPUSDT",
"LEVERAGE": "10",
"RISK_PER_TRADE": "0.02",
"NOTION_TOKEN": "secret_test",
"NOTION_DATABASE_ID": "db_test",
})
return Config()
@pytest.fixture
def sample_df():
np.random.seed(0)
n = 100
close = np.cumsum(np.random.randn(n) * 0.01) + 0.5
return pd.DataFrame({
"open": close,
"high": close * 1.005,
"low": close * 0.995,
"close": close,
"volume": np.random.randint(100000, 1000000, n).astype(float),
})
@pytest.mark.asyncio
async def test_bot_processes_signal(config, sample_df):
with patch("src.bot.BinanceFuturesClient") as MockExchange, \
patch("src.bot.TradeRepository") as MockRepo:
MockExchange.return_value = AsyncMock()
MockRepo.return_value = MagicMock()
bot = TradingBot(config)
bot.exchange = AsyncMock()
bot.exchange.get_balance = AsyncMock(return_value=1000.0)
bot.exchange.get_position = AsyncMock(return_value=None)
bot.exchange.place_order = AsyncMock(return_value={"orderId": "123"})
bot.exchange.set_leverage = AsyncMock(return_value={})
bot.db = MagicMock()
bot.db.save_trade = MagicMock(return_value={"id": "trade1"})
with patch("src.bot.Indicators") as MockInd:
mock_ind = MagicMock()
mock_ind.calculate_all.return_value = sample_df
mock_ind.get_signal.return_value = "LONG"
mock_ind.get_atr_stop.return_value = (0.48, 0.56)
MockInd.return_value = mock_ind
await bot.process_candle(sample_df)

66
tests/test_data_stream.py Normal file
View File

@@ -0,0 +1,66 @@
import pytest
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from src.data_stream import KlineStream
@pytest.mark.asyncio
async def test_kline_stream_parses_message():
stream = KlineStream(symbol="XRPUSDT", interval="1m")
raw_msg = {
"k": {
"t": 1700000000000,
"o": "0.5000",
"h": "0.5100",
"l": "0.4900",
"c": "0.5050",
"v": "100000",
"x": True,
}
}
candle = stream.parse_kline(raw_msg)
assert candle["close"] == 0.5050
assert candle["is_closed"] is True
@pytest.mark.asyncio
async def test_callback_called_on_closed_candle():
received = []
stream = KlineStream(
symbol="XRPUSDT",
interval="1m",
on_candle=lambda c: received.append(c),
)
raw_msg = {
"k": {
"t": 1700000000000,
"o": "0.5",
"h": "0.51",
"l": "0.49",
"c": "0.505",
"v": "100000",
"x": True,
}
}
stream.handle_message(raw_msg)
assert len(received) == 1
@pytest.mark.asyncio
async def test_preload_history_fills_buffer():
stream = KlineStream(symbol="XRPUSDT", interval="1m", buffer_size=200)
# REST API 응답 형식: [open_time, open, high, low, close, volume, ...]
fake_klines = [
[1700000000000 + i * 60000, "0.5", "0.51", "0.49", "0.505", "100000",
0, "0", "0", "0", "0", "0"]
for i in range(201) # 201개 반환 → 마지막 1개 제외 → 200개 버퍼
]
mock_client = AsyncMock()
mock_client.futures_klines.return_value = fake_klines
await stream._preload_history(mock_client, limit=200)
assert len(stream.buffer) == 200
assert stream.get_dataframe() is not None

42
tests/test_database.py Normal file
View File

@@ -0,0 +1,42 @@
import pytest
from unittest.mock import MagicMock, patch
from src.database import TradeRepository
@pytest.fixture
def mock_repo():
with patch("src.database.Client") as mock_client_cls:
mock_client = MagicMock()
mock_client_cls.return_value = mock_client
repo = TradeRepository(token="secret_test", database_id="db_test")
repo.client = mock_client
yield repo
def test_save_trade(mock_repo):
mock_repo.client.pages.create.return_value = {
"id": "abc123",
"properties": {},
}
result = mock_repo.save_trade(
symbol="XRPUSDT",
side="LONG",
entry_price=0.5,
quantity=400.0,
leverage=10,
signal_data={"rsi": 32, "macd_hist": 0.001},
)
assert result["id"] == "abc123"
def test_close_trade(mock_repo):
mock_repo.client.pages.update.return_value = {
"id": "abc123",
"properties": {
"Status": {"select": {"name": "CLOSED"}},
},
}
result = mock_repo.close_trade(
trade_id="abc123", exit_price=0.55, pnl=20.0
)
assert result["id"] == "abc123"

View File

@@ -19,19 +19,20 @@ def config():
@pytest.mark.asyncio
async def test_set_leverage(config):
client = BinanceFuturesClient(config)
with patch.object(
client.client,
"futures_change_leverage",
return_value={"leverage": 10},
):
with patch("src.exchange.Client") as MockClient:
mock_binance = MagicMock()
MockClient.return_value = mock_binance
mock_binance.futures_change_leverage.return_value = {"leverage": 10}
client = BinanceFuturesClient(config)
result = await client.set_leverage(10)
assert result is not None
def test_calculate_quantity(config):
client = BinanceFuturesClient(config)
# 잔고 1000 USDT, 리스크 2%, 레버리지 10, 가격 0.5
qty = client.calculate_quantity(balance=1000.0, price=0.5, leverage=10)
# 1000 * 0.02 * 10 / 0.5 = 400
assert qty == pytest.approx(400.0, rel=0.01)
with patch("src.exchange.Client") as MockClient:
MockClient.return_value = MagicMock()
client = BinanceFuturesClient(config)
# 잔고 1000 USDT, 리스크 2%, 레버리지 10, 가격 0.5
qty = client.calculate_quantity(balance=1000.0, price=0.5, leverage=10)
# 1000 * 0.02 * 10 / 0.5 = 400
assert qty == pytest.approx(400.0, rel=0.01)

52
tests/test_indicators.py Normal file
View File

@@ -0,0 +1,52 @@
import pandas as pd
import numpy as np
import pytest
from src.indicators import Indicators
@pytest.fixture
def sample_df():
"""100개 캔들 샘플 데이터"""
np.random.seed(42)
n = 100
close = np.cumsum(np.random.randn(n) * 0.01) + 0.5
df = pd.DataFrame({
"open": close * (1 + np.random.randn(n) * 0.001),
"high": close * (1 + np.abs(np.random.randn(n)) * 0.005),
"low": close * (1 - np.abs(np.random.randn(n)) * 0.005),
"close": close,
"volume": np.random.randint(100000, 1000000, n).astype(float),
})
return df
def test_rsi_range(sample_df):
ind = Indicators(sample_df)
df = ind.calculate_all()
assert "rsi" in df.columns
valid = df["rsi"].dropna()
assert (valid >= 0).all() and (valid <= 100).all()
def test_macd_columns(sample_df):
ind = Indicators(sample_df)
df = ind.calculate_all()
assert "macd" in df.columns
assert "macd_signal" in df.columns
assert "macd_hist" in df.columns
def test_bollinger_bands(sample_df):
ind = Indicators(sample_df)
df = ind.calculate_all()
assert "bb_upper" in df.columns
assert "bb_lower" in df.columns
valid = df.dropna()
assert (valid["bb_upper"] >= valid["bb_lower"]).all()
def test_signal_returns_direction(sample_df):
ind = Indicators(sample_df)
df = ind.calculate_all()
signal = ind.get_signal(df)
assert signal in ("LONG", "SHORT", "HOLD")

View File

@@ -0,0 +1,36 @@
import pytest
import os
from src.risk_manager import RiskManager
from src.config import Config
@pytest.fixture
def config():
os.environ.update({
"BINANCE_API_KEY": "k",
"BINANCE_API_SECRET": "s",
"SYMBOL": "XRPUSDT",
"LEVERAGE": "10",
"RISK_PER_TRADE": "0.02",
})
return Config()
def test_max_drawdown_check(config):
rm = RiskManager(config, max_daily_loss_pct=0.05)
rm.daily_pnl = -60.0
rm.initial_balance = 1000.0
assert rm.is_trading_allowed() is False
def test_trading_allowed_normal(config):
rm = RiskManager(config, max_daily_loss_pct=0.05)
rm.daily_pnl = -10.0
rm.initial_balance = 1000.0
assert rm.is_trading_allowed() is True
def test_position_size_capped(config):
rm = RiskManager(config, max_daily_loss_pct=0.05)
rm.open_positions = ["pos1", "pos2", "pos3"]
assert rm.can_open_new_position() is False