From d1af736bfcfd8c4d27e614a34941bbdfdc9d2714 Mon Sep 17 00:00:00 2001 From: 21in7 Date: Sun, 1 Mar 2026 19:30:17 +0900 Subject: [PATCH] feat: implement BTC/ETH correlation features for improved model accuracy - Added a new design document outlining the integration of BTC/ETH candle data as additional features in the XRP ML filter, enhancing prediction accuracy. - Introduced `MultiSymbolStream` for combined WebSocket data retrieval of XRP, BTC, and ETH. - Expanded feature set from 13 to 21 by including 8 new BTC/ETH-related features. - Updated various scripts and modules to support the new feature set and data handling. - Enhanced training and deployment scripts to accommodate the new dataset structure. This commit lays the groundwork for improved model performance by leveraging the correlation between BTC and ETH with XRP. --- ...-01-btc-eth-correlation-features-design.md | 119 +++ ...03-01-btc-eth-correlation-features-plan.md | 815 ++++++++++++++++++ models/training_log.json | 7 + scripts/deploy_model.sh | 13 +- scripts/fetch_history.py | 139 ++- scripts/train_and_deploy.sh | 15 +- scripts/train_model.py | 28 +- src/bot.py | 18 +- src/data_stream.py | 103 +++ src/dataset_builder.py | 60 +- src/ml_features.py | 57 +- tests/test_bot.py | 13 + tests/test_data_stream.py | 37 + tests/test_dataset_builder.py | 34 +- tests/test_ml_features.py | 58 +- 15 files changed, 1448 insertions(+), 68 deletions(-) create mode 100644 docs/plans/2026-03-01-btc-eth-correlation-features-design.md create mode 100644 docs/plans/2026-03-01-btc-eth-correlation-features-plan.md diff --git a/docs/plans/2026-03-01-btc-eth-correlation-features-design.md b/docs/plans/2026-03-01-btc-eth-correlation-features-design.md new file mode 100644 index 0000000..e250c9f --- /dev/null +++ b/docs/plans/2026-03-01-btc-eth-correlation-features-design.md @@ -0,0 +1,119 @@ +# BTC/ETH 상관관계 피처 추가 설계 문서 + +**날짜:** 2026-03-01 + +## 목적 + +XRP 선물 ML 필터에 BTC/ETH 캔들 데이터를 추가 피처로 활용하여 모델 예측 정확도를 향상시킨다. XRP는 BTC/ETH의 움직임에 강하게 연동되는 경향이 있으므로, 이 컨텍스트를 ML 피처로 명시적으로 제공한다. + +--- + +## 아키텍처 개요 + +### 변경 전 + +``` +KlineStream(XRPUSDT) → bot.process_candle() → Indicators → MLFilter(13개 피처) +``` + +### 변경 후 + +``` +MultiSymbolStream(XRP+BTC+ETH, Combined WebSocket) + ↓ XRP 캔들 닫힐 때 +bot.process_candle(xrp_df, btc_df, eth_df) + ↓ +Indicators(XRP) → build_features(xrp_df, btc_df, eth_df, signal) + ↓ +MLFilter(13 + 8 = 21개 피처) +``` + +--- + +## 추가 피처 8개 + +| 피처 | 설명 | +|---|---| +| `btc_ret_1` | BTC 1캔들 수익률 | +| `btc_ret_3` | BTC 3캔들 수익률 | +| `btc_ret_5` | BTC 5캔들 수익률 | +| `eth_ret_1` | ETH 1캔들 수익률 | +| `eth_ret_3` | ETH 3캔들 수익률 | +| `eth_ret_5` | ETH 5캔들 수익률 | +| `xrp_btc_rs` | XRP ret_1 / BTC ret_1 (XRP 상대강도 vs BTC) | +| `xrp_eth_rs` | XRP ret_1 / ETH ret_1 (XRP 상대강도 vs ETH) | + +기존 13개 피처(`rsi`, `macd_hist`, `bb_pct`, `ema_align`, `stoch_k`, `stoch_d`, `atr_pct`, `vol_ratio`, `ret_1`, `ret_3`, `ret_5`, `signal_strength`, `side`)는 그대로 유지. + +--- + +## 변경 파일 목록 + +| 파일 | 변경 유형 | 내용 | +|---|---|---| +| `src/data_stream.py` | 수정 | `KlineStream` → `MultiSymbolStream` (Combined WebSocket) | +| `src/ml_features.py` | 수정 | `build_features(xrp_df, btc_df, eth_df, signal)` — 피처 21개로 확장 | +| `scripts/fetch_history.py` | 수정 | BTC/ETH 동시 수집 후 타임스탬프 기준 병합 저장 | +| `scripts/train_model.py` | 수정 | 병합된 데이터셋으로 21개 피처 학습 | +| `src/bot.py` | 수정 | `MultiSymbolStream` 사용, `process_candle`에 btc_df/eth_df 전달 | +| `src/dataset_builder.py` | 수정 | 레이블 생성 시 BTC/ETH 피처 포함 | + +--- + +## 데이터 흐름 + +### 실시간 (봇 운영) + +``` +Binance Combined WebSocket + ├── xrpusdt@kline_1m → xrp_buffer (deque 200) + ├── btcusdt@kline_1m → btc_buffer (deque 200) + └── ethusdt@kline_1m → eth_buffer (deque 200) + ↓ XRP 캔들 닫힐 때만 트리거 + bot.process_candle(xrp_df, btc_df, eth_df) +``` + +### 학습 데이터 수집 + +``` +fetch_history.py → XRPUSDT + BTCUSDT + ETHUSDT 각 90일 수집 + → 타임스탬프 기준 inner join 병합 + → data/combined_1m.parquet 저장 +train_model.py → 21개 피처로 LightGBM 재학습 + → models/lgbm_filter.pkl 교체 +``` + +--- + +## 에러 처리 + +| 상황 | 처리 방법 | +|---|---| +| BTC/ETH 버퍼 50개 미만 (봇 시작 직후) | btc/eth 피처 전부 0.0으로 채움, 거래는 정상 진행 | +| Combined WebSocket 연결 끊김 | 예외 발생 → 상위에서 재연결 | +| BTC/ETH ret 분모가 0 | `xrp_btc_rs`, `xrp_eth_rs` = 0.0으로 처리 | +| 기존 모델(13개 피처) 파일이 남아있는 경우 | 피처 수 불일치 → MLFilter 폴백(신호 통과)으로 자동 처리 | + +--- + +## 재학습 순서 + +기존 `lgbm_filter.pkl`(13개 피처)은 새 데이터셋(21개 피처) 재학습 후 자동 교체된다. +**봇 재시작 전 반드시 아래 순서로 실행:** + +```bash +# 1. 3심볼 과거 데이터 수집 +python scripts/fetch_history.py --symbols XRPUSDT BTCUSDT ETHUSDT --days 90 + +# 2. 21피처 모델 재학습 +python scripts/train_model.py + +# 3. 봇 재시작 +``` + +--- + +## 폴백 정책 + +- BTC/ETH 버퍼가 비어있으면 해당 피처를 0.0으로 채워 기존 XRP 피처만으로 동작 +- 모델 파일이 없으면 ML 필터 전체를 건너뜀 (기존 정책 유지) diff --git a/docs/plans/2026-03-01-btc-eth-correlation-features-plan.md b/docs/plans/2026-03-01-btc-eth-correlation-features-plan.md new file mode 100644 index 0000000..81952d7 --- /dev/null +++ b/docs/plans/2026-03-01-btc-eth-correlation-features-plan.md @@ -0,0 +1,815 @@ +# BTC/ETH 상관관계 피처 추가 구현 계획 + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** BTC/ETH 캔들 데이터를 XRP ML 필터의 추가 피처(21개)로 활용해 모델 예측 정확도를 향상시킨다. + +**Architecture:** 바이낸스 Combined WebSocket으로 XRP/BTC/ETH 3개 심볼을 단일 연결로 수신하고, XRP 캔들이 닫힐 때 BTC/ETH 버퍼의 수익률·상대강도 8개 피처를 기존 13개 피처에 추가해 LightGBM에 전달한다. 학습 데이터도 3심볼을 타임스탬프 기준으로 병합해 동일한 21개 피처로 재학습한다. + +**Tech Stack:** Python 3.12, python-binance (AsyncClient + BinanceSocketManager), LightGBM, pandas, joblib + +--- + +## Task 1: `MultiSymbolStream` — Combined WebSocket으로 3심볼 수신 + +**Files:** +- Modify: `src/data_stream.py` +- Test: `tests/test_data_stream.py` + +### Step 1: 실패하는 테스트 작성 + +`tests/test_data_stream.py` 파일에 아래 테스트를 추가한다. + +```python +from src.data_stream import MultiSymbolStream + +def test_multi_symbol_stream_has_three_buffers(): + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + ) + assert "xrpusdt" in stream.buffers + assert "btcusdt" in stream.buffers + assert "ethusdt" in stream.buffers + +def test_multi_symbol_stream_get_dataframe_returns_none_when_empty(): + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + ) + assert stream.get_dataframe("XRPUSDT") is None + +def test_multi_symbol_stream_get_dataframe_returns_df_when_full(): + import pandas as pd + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + buffer_size=200, + ) + candle = { + "timestamp": 1000, "open": 1.0, "high": 1.1, + "low": 0.9, "close": 1.05, "volume": 100.0, "is_closed": True, + } + for i in range(50): + c = candle.copy() + c["timestamp"] = 1000 + i + stream.buffers["xrpusdt"].append(c) + df = stream.get_dataframe("XRPUSDT") + assert df is not None + assert len(df) == 50 +``` + +### Step 2: 테스트 실패 확인 + +```bash +pytest tests/test_data_stream.py::test_multi_symbol_stream_has_three_buffers -v +``` + +Expected: `FAILED` — `MultiSymbolStream` not defined + +### Step 3: `MultiSymbolStream` 구현 + +`src/data_stream.py` 파일에 기존 `KlineStream` 클래스 아래에 추가한다. + +```python +class MultiSymbolStream: + """ + 바이낸스 Combined WebSocket으로 여러 심볼의 캔들을 단일 연결로 수신한다. + XRP 캔들이 닫힐 때 on_candle 콜백을 호출한다. + """ + + def __init__( + self, + symbols: list[str], + interval: str = "1m", + buffer_size: int = 200, + on_candle: Callable = None, + ): + self.symbols = [s.lower() for s in symbols] + self.interval = interval + self.on_candle = on_candle + self.buffers: dict[str, deque] = { + s: deque(maxlen=buffer_size) for s in self.symbols + } + # 첫 번째 심볼이 주 심볼 (XRP) + self.primary_symbol = self.symbols[0] + + def parse_kline(self, msg: dict) -> dict: + k = msg["k"] + return { + "timestamp": k["t"], + "open": float(k["o"]), + "high": float(k["h"]), + "low": float(k["l"]), + "close": float(k["c"]), + "volume": float(k["v"]), + "is_closed": k["x"], + } + + def handle_message(self, msg: dict): + # Combined stream 메시지는 {"stream": "...", "data": {...}} 형태 + if "stream" in msg: + data = msg["data"] + else: + data = msg + + if data.get("e") != "kline": + return + + symbol = data["s"].lower() + candle = self.parse_kline(data) + + if candle["is_closed"] and symbol in self.buffers: + self.buffers[symbol].append(candle) + if symbol == self.primary_symbol and self.on_candle: + self.on_candle(candle) + + def get_dataframe(self, symbol: str) -> pd.DataFrame | None: + key = symbol.lower() + buf = self.buffers.get(key) + if buf is None or len(buf) < 50: + return None + df = pd.DataFrame(list(buf)) + df.set_index("timestamp", inplace=True) + return df + + async def _preload_history(self, client: AsyncClient, limit: int = 200): + """REST API로 모든 심볼의 과거 캔들을 버퍼에 미리 채운다.""" + for symbol in self.symbols: + logger.info(f"{symbol.upper()} 과거 캔들 {limit}개 로드 중...") + klines = await client.futures_klines( + symbol=symbol.upper(), + interval=self.interval, + limit=limit, + ) + for k in klines[:-1]: + self.buffers[symbol].append({ + "timestamp": k[0], + "open": float(k[1]), + "high": float(k[2]), + "low": float(k[3]), + "close": float(k[4]), + "volume": float(k[5]), + "is_closed": True, + }) + logger.info(f"{symbol.upper()} {len(self.buffers[symbol])}개 로드 완료") + + async def start(self, api_key: str, api_secret: str): + client = await AsyncClient.create( + api_key=api_key, + api_secret=api_secret, + ) + await self._preload_history(client) + bm = BinanceSocketManager(client) + streams = [ + f"{s}@kline_{self.interval}" for s in self.symbols + ] + logger.info(f"Combined WebSocket 시작: {streams}") + try: + async with bm.futures_multiplex_socket(streams) as stream: + while True: + msg = await stream.recv() + self.handle_message(msg) + finally: + await client.close_connection() +``` + +### Step 4: 테스트 통과 확인 + +```bash +pytest tests/test_data_stream.py -v +``` + +Expected: 모든 테스트 PASS + +### Step 5: 커밋 + +```bash +git add src/data_stream.py tests/test_data_stream.py +git commit -m "feat: add MultiSymbolStream for combined BTC/ETH/XRP WebSocket" +``` + +--- + +## Task 2: `build_features` — BTC/ETH 피처 8개 추가 + +**Files:** +- Modify: `src/ml_features.py` +- Test: `tests/test_ml_features.py` + +### Step 1: 실패하는 테스트 작성 + +`tests/test_ml_features.py`에 아래 테스트를 추가한다. + +```python +import pandas as pd +import numpy as np +from src.ml_features import build_features, FEATURE_COLS + +def _make_df(n=10, base_price=1.0): + """테스트용 더미 캔들 DataFrame 생성.""" + closes = [base_price * (1 + i * 0.001) for i in range(n)] + return pd.DataFrame({ + "close": closes, "high": [c * 1.01 for c in closes], + "low": [c * 0.99 for c in closes], + "volume": [1000.0] * n, + "rsi": [50.0] * n, "macd": [0.0] * n, "macd_signal": [0.0] * n, + "macd_hist": [0.0] * n, "bb_upper": [c * 1.02 for c in closes], + "bb_lower": [c * 0.98 for c in closes], "ema9": closes, + "ema21": closes, "ema50": closes, "atr": [0.01] * n, + "stoch_k": [50.0] * n, "stoch_d": [50.0] * n, + "vol_ma20": [1000.0] * n, + }) + +def test_build_features_with_btc_eth_has_21_features(): + xrp_df = _make_df(10, base_price=1.0) + btc_df = _make_df(10, base_price=50000.0) + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + assert len(features) == 21 + +def test_build_features_without_btc_eth_has_13_features(): + xrp_df = _make_df(10, base_price=1.0) + features = build_features(xrp_df, "LONG") + assert len(features) == 13 + +def test_build_features_btc_ret_1_correct(): + xrp_df = _make_df(10, base_price=1.0) + btc_df = _make_df(10, base_price=50000.0) + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + btc_closes = btc_df["close"] + expected_btc_ret_1 = (btc_closes.iloc[-1] - btc_closes.iloc[-2]) / btc_closes.iloc[-2] + assert abs(features["btc_ret_1"] - expected_btc_ret_1) < 1e-6 + +def test_build_features_rs_zero_when_btc_ret_zero(): + xrp_df = _make_df(10, base_price=1.0) + # BTC 가격이 변하지 않으면 ret=0, RS=0 + btc_df = _make_df(10, base_price=50000.0) + btc_df["close"] = 50000.0 # 모든 캔들 동일 + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + assert features["xrp_btc_rs"] == 0.0 + +def test_feature_cols_has_21_items(): + from src.ml_features import FEATURE_COLS + assert len(FEATURE_COLS) == 21 +``` + +### Step 2: 테스트 실패 확인 + +```bash +pytest tests/test_ml_features.py -v +``` + +Expected: 여러 테스트 FAIL + +### Step 3: `ml_features.py` 수정 + +`src/ml_features.py` 전체를 아래로 교체한다. + +```python +import pandas as pd +import numpy as np + +FEATURE_COLS = [ + "rsi", "macd_hist", "bb_pct", "ema_align", + "stoch_k", "stoch_d", "atr_pct", "vol_ratio", + "ret_1", "ret_3", "ret_5", "signal_strength", "side", + "btc_ret_1", "btc_ret_3", "btc_ret_5", + "eth_ret_1", "eth_ret_3", "eth_ret_5", + "xrp_btc_rs", "xrp_eth_rs", +] + + +def _calc_ret(closes: pd.Series, n: int) -> float: + """n캔들 전 대비 수익률. 데이터 부족 시 0.0.""" + if len(closes) < n + 1: + return 0.0 + prev = closes.iloc[-(n + 1)] + return (closes.iloc[-1] - prev) / prev if prev != 0 else 0.0 + + +def _calc_rs(xrp_ret: float, other_ret: float) -> float: + """상대강도 = xrp_ret / other_ret. 분모 0이면 0.0.""" + if other_ret == 0.0: + return 0.0 + return xrp_ret / other_ret + + +def build_features( + df: pd.DataFrame, + signal: str, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.Series: + """ + 기술 지표가 계산된 DataFrame의 마지막 행에서 ML 피처를 추출한다. + btc_df, eth_df가 제공되면 21개 피처를, 없으면 13개 피처를 반환한다. + signal: "LONG" | "SHORT" + """ + last = df.iloc[-1] + close = last["close"] + + bb_upper = last.get("bb_upper", close) + bb_lower = last.get("bb_lower", close) + bb_range = bb_upper - bb_lower + bb_pct = (close - bb_lower) / bb_range if bb_range > 0 else 0.5 + + ema9 = last.get("ema9", close) + ema21 = last.get("ema21", close) + ema50 = last.get("ema50", close) + if ema9 > ema21 > ema50: + ema_align = 1 + elif ema9 < ema21 < ema50: + ema_align = -1 + else: + ema_align = 0 + + atr = last.get("atr", 0) + atr_pct = atr / close if close > 0 else 0 + + vol_ma20 = last.get("vol_ma20", last.get("volume", 1)) + vol_ratio = last["volume"] / vol_ma20 if vol_ma20 > 0 else 1.0 + + closes = df["close"] + ret_1 = _calc_ret(closes, 1) + ret_3 = _calc_ret(closes, 3) + ret_5 = _calc_ret(closes, 5) + + prev = df.iloc[-2] if len(df) >= 2 else last + strength = 0 + rsi = last.get("rsi", 50) + macd = last.get("macd", 0) + macd_sig = last.get("macd_signal", 0) + prev_macd = prev.get("macd", 0) + prev_macd_sig = prev.get("macd_signal", 0) + stoch_k = last.get("stoch_k", 50) + stoch_d = last.get("stoch_d", 50) + + if signal == "LONG": + if rsi < 35: strength += 1 + if prev_macd < prev_macd_sig and macd > macd_sig: strength += 2 + if close < last.get("bb_lower", close): strength += 1 + if ema_align == 1: strength += 1 + if stoch_k < 20 and stoch_k > stoch_d: strength += 1 + else: + if rsi > 65: strength += 1 + if prev_macd > prev_macd_sig and macd < macd_sig: strength += 2 + if close > last.get("bb_upper", close): strength += 1 + if ema_align == -1: strength += 1 + if stoch_k > 80 and stoch_k < stoch_d: strength += 1 + + base = { + "rsi": float(rsi), + "macd_hist": float(last.get("macd_hist", 0)), + "bb_pct": float(bb_pct), + "ema_align": float(ema_align), + "stoch_k": float(stoch_k), + "stoch_d": float(last.get("stoch_d", 50)), + "atr_pct": float(atr_pct), + "vol_ratio": float(vol_ratio), + "ret_1": float(ret_1), + "ret_3": float(ret_3), + "ret_5": float(ret_5), + "signal_strength": float(strength), + "side": 1.0 if signal == "LONG" else 0.0, + } + + if btc_df is not None and eth_df is not None: + btc_ret_1 = _calc_ret(btc_df["close"], 1) + btc_ret_3 = _calc_ret(btc_df["close"], 3) + btc_ret_5 = _calc_ret(btc_df["close"], 5) + eth_ret_1 = _calc_ret(eth_df["close"], 1) + eth_ret_3 = _calc_ret(eth_df["close"], 3) + eth_ret_5 = _calc_ret(eth_df["close"], 5) + + base.update({ + "btc_ret_1": float(btc_ret_1), + "btc_ret_3": float(btc_ret_3), + "btc_ret_5": float(btc_ret_5), + "eth_ret_1": float(eth_ret_1), + "eth_ret_3": float(eth_ret_3), + "eth_ret_5": float(eth_ret_5), + "xrp_btc_rs": float(_calc_rs(ret_1, btc_ret_1)), + "xrp_eth_rs": float(_calc_rs(ret_1, eth_ret_1)), + }) + + return pd.Series(base) +``` + +### Step 4: 테스트 통과 확인 + +```bash +pytest tests/test_ml_features.py -v +``` + +Expected: 모든 테스트 PASS + +### Step 5: 커밋 + +```bash +git add src/ml_features.py tests/test_ml_features.py +git commit -m "feat: extend build_features to 21 features with BTC/ETH correlation" +``` + +--- + +## Task 3: `dataset_builder.py` — BTC/ETH 피처 벡터화 추가 + +**Files:** +- Modify: `src/dataset_builder.py` +- Test: `tests/test_dataset_builder.py` + +### Step 1: 실패하는 테스트 작성 + +`tests/test_dataset_builder.py`에 아래 테스트를 추가한다. + +```python +def test_generate_dataset_vectorized_with_btc_eth_has_21_feature_cols(): + """BTC/ETH DataFrame을 전달하면 결과 컬럼이 21개 피처 + label이어야 한다.""" + import pandas as pd + import numpy as np + from src.dataset_builder import generate_dataset_vectorized + from src.ml_features import FEATURE_COLS + + np.random.seed(42) + n = 500 + closes = np.cumprod(1 + np.random.randn(n) * 0.001) * 1.0 + xrp_df = pd.DataFrame({ + "open": closes * 0.999, "high": closes * 1.005, + "low": closes * 0.995, "close": closes, + "volume": np.random.rand(n) * 1000 + 500, + }) + btc_df = xrp_df.copy() * 50000 + eth_df = xrp_df.copy() * 3000 + + result = generate_dataset_vectorized(xrp_df, btc_df=btc_df, eth_df=eth_df) + if not result.empty: + assert set(FEATURE_COLS).issubset(set(result.columns)) + assert len(result.columns) == len(FEATURE_COLS) + 1 # +1 for label +``` + +### Step 2: 테스트 실패 확인 + +```bash +pytest tests/test_dataset_builder.py::test_generate_dataset_vectorized_with_btc_eth_has_21_feature_cols -v +``` + +Expected: FAIL — `generate_dataset_vectorized()` does not accept btc_df/eth_df + +### Step 3: `dataset_builder.py` 수정 + +`_calc_features_vectorized` 함수와 `generate_dataset_vectorized` 함수를 수정한다. + +`_calc_features_vectorized` 시그니처와 반환부에 BTC/ETH 피처 추가: + +```python +def _calc_features_vectorized( + d: pd.DataFrame, + signal_arr: np.ndarray, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.DataFrame: + # ... 기존 코드 유지 ... + + # BTC/ETH 피처 계산 (제공된 경우) + if btc_df is not None and eth_df is not None: + btc_ret_1 = btc_df["close"].pct_change(1).fillna(0).values + btc_ret_3 = btc_df["close"].pct_change(3).fillna(0).values + btc_ret_5 = btc_df["close"].pct_change(5).fillna(0).values + eth_ret_1 = eth_df["close"].pct_change(1).fillna(0).values + eth_ret_3 = eth_df["close"].pct_change(3).fillna(0).values + eth_ret_5 = eth_df["close"].pct_change(5).fillna(0).values + + # 타임스탬프 정렬: XRP 인덱스 기준으로 BTC/ETH 값을 맞춤 + # 길이가 다를 경우 짧은 쪽에 맞춰 앞을 0으로 패딩 + def _align(arr: np.ndarray, target_len: int) -> np.ndarray: + if len(arr) >= target_len: + return arr[-target_len:] + return np.concatenate([np.zeros(target_len - len(arr)), arr]) + + n = len(d) + btc_r1 = _align(btc_ret_1, n).astype(np.float32) + btc_r3 = _align(btc_ret_3, n).astype(np.float32) + btc_r5 = _align(btc_ret_5, n).astype(np.float32) + eth_r1 = _align(eth_ret_1, n).astype(np.float32) + eth_r3 = _align(eth_ret_3, n).astype(np.float32) + eth_r5 = _align(eth_ret_5, n).astype(np.float32) + + xrp_r1 = ret_1.astype(np.float32) + xrp_btc_rs = np.where(btc_r1 != 0, xrp_r1 / btc_r1, 0.0).astype(np.float32) + xrp_eth_rs = np.where(eth_r1 != 0, xrp_r1 / eth_r1, 0.0).astype(np.float32) + + extra = pd.DataFrame({ + "btc_ret_1": btc_r1, "btc_ret_3": btc_r3, "btc_ret_5": btc_r5, + "eth_ret_1": eth_r1, "eth_ret_3": eth_r3, "eth_ret_5": eth_r5, + "xrp_btc_rs": xrp_btc_rs, "xrp_eth_rs": xrp_eth_rs, + }, index=d.index) + result = pd.concat([result, extra], axis=1) # result는 기존 13개 피처 DataFrame + + return result +``` + +`generate_dataset_vectorized` 시그니처 변경: + +```python +def generate_dataset_vectorized( + df: pd.DataFrame, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.DataFrame: + # ... + feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df) + # ... + feat_final = feat_all.iloc[final_idx][FEATURE_COLS].copy() + # ... +``` + +### Step 4: 테스트 통과 확인 + +```bash +pytest tests/test_dataset_builder.py -v +``` + +Expected: 모든 테스트 PASS + +### Step 5: 커밋 + +```bash +git add src/dataset_builder.py tests/test_dataset_builder.py +git commit -m "feat: add BTC/ETH features to vectorized dataset builder" +``` + +--- + +## Task 4: `fetch_history.py` — 3심볼 동시 수집 및 병합 + +**Files:** +- Modify: `scripts/fetch_history.py` + +### Step 1: 수정 내용 + +`fetch_history.py`의 `main()` 함수를 수정해 `--symbols` 인자로 여러 심볼을 받고, 타임스탬프 기준 inner join으로 병합 후 저장한다. + +```python +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--symbols", nargs="+", default=["XRPUSDT"]) + parser.add_argument("--interval", default="1m") + parser.add_argument("--days", type=int, default=90) + parser.add_argument("--output", default="data/xrpusdt_1m.parquet") + args = parser.parse_args() + + if len(args.symbols) == 1: + # 단일 심볼: 기존 동작 유지 + df = asyncio.run(fetch_klines(args.symbols[0], args.interval, args.days)) + df.to_parquet(args.output) + print(f"저장 완료: {args.output} ({len(df)}행)") + else: + # 멀티 심볼: 각각 수집 후 병합 + dfs = {} + for symbol in args.symbols: + print(f"{symbol} 수집 중...") + dfs[symbol] = asyncio.run(fetch_klines(symbol, args.interval, args.days)) + + # 타임스탬프 기준 inner join + primary = args.symbols[0] + merged = dfs[primary].copy() + for symbol in args.symbols[1:]: + suffix = "_" + symbol.lower().replace("usdt", "") + merged = merged.join( + dfs[symbol].add_suffix(suffix), + how="inner", + ) + + output = args.output.replace("xrpusdt", "combined") + merged.to_parquet(output) + print(f"병합 저장 완료: {output} ({len(merged)}행, {len(merged.columns)}컬럼)") +``` + +### Step 2: 동작 확인 (dry run — API 키 없이 구조만 확인) + +```bash +python scripts/fetch_history.py --help +``` + +Expected: `--symbols` 인자가 출력됨 + +### Step 3: 커밋 + +```bash +git add scripts/fetch_history.py +git commit -m "feat: fetch_history supports multi-symbol collection and merge" +``` + +--- + +## Task 5: `train_model.py` — 병합 데이터셋으로 21피처 학습 + +**Files:** +- Modify: `scripts/train_model.py` + +### Step 1: 수정 내용 + +`train()` 함수가 병합된 parquet을 받아 BTC/ETH 컬럼을 분리해 `generate_dataset_vectorized`에 전달하도록 수정한다. + +```python +def train(data_path: str): + print(f"데이터 로드: {data_path}") + df_raw = pd.read_parquet(data_path) + print(f"캔들 수: {len(df_raw)}, 컬럼: {list(df_raw.columns)}") + + # 병합 데이터셋 여부 판별 + btc_df = None + eth_df = None + base_cols = ["open", "high", "low", "close", "volume"] + + if "close_btc" in df_raw.columns: + btc_df = df_raw[[c + "_btc" for c in base_cols]].copy() + btc_df.columns = base_cols + print("BTC 피처 활성화") + + if "close_eth" in df_raw.columns: + eth_df = df_raw[[c + "_eth" for c in base_cols]].copy() + eth_df.columns = base_cols + print("ETH 피처 활성화") + + df = df_raw[base_cols].copy() + + print("데이터셋 생성 중...") + dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df) + + # ... 이하 기존 학습 코드 동일 (X = dataset[FEATURE_COLS] 부분에서 자동으로 21개 사용) ... +``` + +### Step 2: 커밋 + +```bash +git add scripts/train_model.py +git commit -m "feat: train_model uses merged dataset with BTC/ETH features" +``` + +--- + +## Task 6: `bot.py` — `MultiSymbolStream` 연결 및 피처 전달 + +**Files:** +- Modify: `src/bot.py` +- Test: `tests/test_bot.py` + +### Step 1: 실패하는 테스트 작성 + +`tests/test_bot.py`에 아래 테스트를 추가한다. + +```python +def test_bot_uses_multi_symbol_stream(): + from src.bot import TradingBot + from src.config import Config + from src.data_stream import MultiSymbolStream + + config = Config() + bot = TradingBot(config) + assert isinstance(bot.stream, MultiSymbolStream) + +def test_bot_stream_has_btc_eth_buffers(): + from src.bot import TradingBot + from src.config import Config + + config = Config() + bot = TradingBot(config) + assert "btcusdt" in bot.stream.buffers + assert "ethusdt" in bot.stream.buffers +``` + +### Step 2: 테스트 실패 확인 + +```bash +pytest tests/test_bot.py::test_bot_uses_multi_symbol_stream -v +``` + +Expected: FAIL + +### Step 3: `bot.py` 수정 + +`__init__` 에서 `KlineStream` → `MultiSymbolStream`으로 교체하고, `process_candle`에 BTC/ETH df를 전달한다. + +```python +# import 변경 +from src.data_stream import MultiSymbolStream # KlineStream 대신 + +class TradingBot: + def __init__(self, config: Config): + self.config = config + self.exchange = BinanceFuturesClient(config) + self.notifier = DiscordNotifier(config.discord_webhook_url) + self.risk = RiskManager(config) + self.ml_filter = MLFilter() + self.current_trade_side: str | None = None + self.stream = MultiSymbolStream( + symbols=[config.symbol, "BTCUSDT", "ETHUSDT"], + interval="1m", + on_candle=self._on_candle_closed, + ) + + def _on_candle_closed(self, candle: dict): + xrp_df = self.stream.get_dataframe(self.config.symbol) + btc_df = self.stream.get_dataframe("BTCUSDT") + eth_df = self.stream.get_dataframe("ETHUSDT") + if xrp_df is not None: + asyncio.create_task(self.process_candle(xrp_df, btc_df=btc_df, eth_df=eth_df)) + + async def process_candle( + self, + df, + btc_df=None, + eth_df=None, + ): + if not self.risk.is_trading_allowed(): + logger.warning("리스크 한도 초과 - 거래 중단") + return + + ind = Indicators(df) + df_with_indicators = ind.calculate_all() + signal = ind.get_signal(df_with_indicators) + + if signal != "HOLD" and self.ml_filter.is_model_loaded(): + features = build_features(df_with_indicators, signal, btc_df=btc_df, eth_df=eth_df) + if not self.ml_filter.should_enter(features): + logger.info(f"ML 필터 차단: {signal} 신호 무시") + signal = "HOLD" + + # ... 이하 기존 코드 동일 ... + + async def run(self): + logger.info(f"봇 시작: {self.config.symbol}, 레버리지 {self.config.leverage}x") + await self._recover_position() + await self.stream.start( + api_key=self.config.api_key, + api_secret=self.config.api_secret, + ) +``` + +### Step 4: 테스트 통과 확인 + +```bash +pytest tests/test_bot.py -v +``` + +Expected: 모든 테스트 PASS + +### Step 5: 커밋 + +```bash +git add src/bot.py tests/test_bot.py +git commit -m "feat: bot uses MultiSymbolStream and passes BTC/ETH df to build_features" +``` + +--- + +## Task 7: 전체 테스트 통과 및 재학습 실행 + +### Step 1: 전체 테스트 실행 + +```bash +pytest tests/ -v +``` + +Expected: 모든 테스트 PASS + +### Step 2: 3심볼 데이터 수집 + +```bash +python scripts/fetch_history.py \ + --symbols XRPUSDT BTCUSDT ETHUSDT \ + --days 90 \ + --output data/xrpusdt_1m.parquet +``` + +Expected: `data/combined_1m.parquet` 생성 + +### Step 3: 21피처 모델 재학습 + +```bash +python scripts/train_model.py --data data/combined_1m.parquet +``` + +Expected: `models/lgbm_filter.pkl` 교체, AUC 출력 + +### Step 4: 최종 커밋 + +```bash +git add models/training_log.json +git commit -m "chore: retrain model with 21 BTC/ETH correlation features" +``` + +--- + +## 실행 순서 요약 + +``` +Task 1 → Task 2 → Task 3 → Task 4 → Task 5 → Task 6 → Task 7 +(Stream) (피처) (데이터셋) (수집) (학습) (봇) (검증) +``` + +각 Task는 독립적으로 테스트 가능하며, Task 7 이전까지는 기존 봇이 정상 동작한다. diff --git a/models/training_log.json b/models/training_log.json index 98a4fa6..58ec1dc 100644 --- a/models/training_log.json +++ b/models/training_log.json @@ -24,5 +24,12 @@ "auc": 0.5405, "samples": 1704, "model_path": "models/lgbm_filter.pkl" + }, + { + "date": "2026-03-01T19:29:21.454533", + "auc": 0.5321, + "samples": 1696, + "features": 21, + "model_path": "models/lgbm_filter.pkl" } ] \ No newline at end of file diff --git a/scripts/deploy_model.sh b/scripts/deploy_model.sh index 90ba606..f11a457 100755 --- a/scripts/deploy_model.sh +++ b/scripts/deploy_model.sh @@ -54,6 +54,13 @@ fi echo "=== 전송 완료 ===" echo "" -echo "봇이 실행 중이라면 아래 명령으로 모델을 즉시 리로드할 수 있습니다:" -echo " docker exec cointrader python -c \\" -echo " \"from src.ml_filter import MLFilter; f=MLFilter(); f.reload_model(); print('리로드 완료')\"" + +# 봇 컨테이너가 실행 중이면 모델 핫리로드, 아니면 건너뜀 +echo "=== 핫리로드 시도 ===" +if ssh "${LXC_HOST}" "docker inspect -f '{{.State.Running}}' cointrader 2>/dev/null | grep -q true"; then + ssh "${LXC_HOST}" "docker exec cointrader python -c \ + \"from src.ml_filter import MLFilter; f=MLFilter(); f.reload_model(); print('리로드 완료')\"" + echo "=== 핫리로드 완료 ===" +else + echo " cointrader 컨테이너가 실행 중이 아닙니다. 건너뜁니다." +fi diff --git a/scripts/fetch_history.py b/scripts/fetch_history.py index d3404b3..a5355d4 100644 --- a/scripts/fetch_history.py +++ b/scripts/fetch_history.py @@ -1,6 +1,7 @@ """ 바이낸스 선물 REST API로 과거 캔들 데이터를 수집해 parquet으로 저장한다. 사용법: python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90 + python scripts/fetch_history.py --symbols XRPUSDT BTCUSDT ETHUSDT --days 90 """ import sys from pathlib import Path @@ -8,7 +9,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) import asyncio import argparse -from datetime import datetime, timedelta +from datetime import datetime, timezone, timedelta import pandas as pd from binance import AsyncClient from dotenv import load_dotenv @@ -16,32 +17,41 @@ import os load_dotenv() +# 요청 사이 딜레이 (초). 바이낸스 선물 기본 한도: 2400 req/min = 40 req/s +# 1500개씩 가져오므로 90일 1m 데이터 = ~65회 요청/심볼 +# 심볼 간 딜레이 없이 연속 요청하면 레이트 리밋(-1003) 발생 +_REQUEST_DELAY = 0.3 # 초당 ~3.3 req → 안전 마진 충분 -async def fetch_klines(symbol: str, interval: str, days: int) -> pd.DataFrame: - client = await AsyncClient.create( - api_key=os.getenv("BINANCE_API_KEY", ""), - api_secret=os.getenv("BINANCE_API_SECRET", ""), - ) - try: - start_ts = int((datetime.utcnow() - timedelta(days=days)).timestamp() * 1000) - all_klines = [] - while True: - klines = await client.futures_klines( - symbol=symbol, - interval=interval, - startTime=start_ts, - limit=1500, - ) - if not klines: - break - all_klines.extend(klines) - last_ts = klines[-1][0] - if last_ts >= int(datetime.utcnow().timestamp() * 1000): - break - start_ts = last_ts + 1 - print(f"수집 중... {len(all_klines)}개") - finally: - await client.close_connection() + +def _now_ms() -> int: + return int(datetime.now(timezone.utc).timestamp() * 1000) + + +async def _fetch_klines_with_client( + client: AsyncClient, + symbol: str, + interval: str, + days: int, +) -> pd.DataFrame: + """기존 클라이언트를 재사용해 단일 심볼 캔들을 수집한다.""" + start_ts = int((datetime.now(timezone.utc) - timedelta(days=days)).timestamp() * 1000) + all_klines = [] + while True: + klines = await client.futures_klines( + symbol=symbol, + interval=interval, + startTime=start_ts, + limit=1500, + ) + if not klines: + break + all_klines.extend(klines) + last_ts = klines[-1][0] + if last_ts >= _now_ms(): + break + start_ts = last_ts + 1 + print(f" [{symbol}] 수집 중... {len(all_klines):,}개") + await asyncio.sleep(_REQUEST_DELAY) df = pd.DataFrame(all_klines, columns=[ "timestamp", "open", "high", "low", "close", "volume", @@ -51,22 +61,87 @@ async def fetch_klines(symbol: str, interval: str, days: int) -> pd.DataFrame: df = df[["timestamp", "open", "high", "low", "close", "volume"]].copy() for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) - df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True) df.set_index("timestamp", inplace=True) return df +async def fetch_klines(symbol: str, interval: str, days: int) -> pd.DataFrame: + """단일 심볼 수집 (하위 호환용).""" + client = await AsyncClient.create( + api_key=os.getenv("BINANCE_API_KEY", ""), + api_secret=os.getenv("BINANCE_API_SECRET", ""), + ) + try: + return await _fetch_klines_with_client(client, symbol, interval, days) + finally: + await client.close_connection() + + +async def fetch_klines_all( + symbols: list[str], + interval: str, + days: int, +) -> dict[str, pd.DataFrame]: + """ + 단일 클라이언트로 여러 심볼을 순차 수집한다. + asyncio.run()을 심볼마다 반복하면 연결 오버헤드와 레이트 리밋 위험이 있으므로 + 하나의 연결 안에서 심볼 간 딜레이를 두고 순차 처리한다. + """ + client = await AsyncClient.create( + api_key=os.getenv("BINANCE_API_KEY", ""), + api_secret=os.getenv("BINANCE_API_SECRET", ""), + ) + dfs = {} + try: + for i, symbol in enumerate(symbols): + print(f"\n[{i+1}/{len(symbols)}] {symbol} 수집 시작...") + dfs[symbol] = await _fetch_klines_with_client(client, symbol, interval, days) + print(f" [{symbol}] 완료: {len(dfs[symbol]):,}행") + # 심볼 간 추가 대기: 레이트 리밋 카운터가 리셋될 시간 확보 + if i < len(symbols) - 1: + print(f" 다음 심볼 수집 전 5초 대기...") + await asyncio.sleep(5) + finally: + await client.close_connection() + return dfs + + def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--symbol", default="XRPUSDT") + parser = argparse.ArgumentParser( + description="바이낸스 선물 과거 캔들 수집. 단일 심볼 또는 멀티 심볼 병합 저장." + ) + parser.add_argument("--symbols", nargs="+", default=["XRPUSDT"]) + parser.add_argument("--symbol", default=None, help="단일 심볼 (--symbols 미사용 시)") parser.add_argument("--interval", default="1m") parser.add_argument("--days", type=int, default=90) parser.add_argument("--output", default="data/xrpusdt_1m.parquet") args = parser.parse_args() - df = asyncio.run(fetch_klines(args.symbol, args.interval, args.days)) - df.to_parquet(args.output) - print(f"저장 완료: {args.output} ({len(df)}행)") + # 하위 호환: --symbol 단독 사용 시 symbols로 통합 + if args.symbol and args.symbols == ["XRPUSDT"]: + args.symbols = [args.symbol] + + if len(args.symbols) == 1: + df = asyncio.run(fetch_klines(args.symbols[0], args.interval, args.days)) + df.to_parquet(args.output) + print(f"저장 완료: {args.output} ({len(df):,}행)") + else: + # 멀티 심볼: 단일 클라이언트로 순차 수집 후 타임스탬프 기준 inner join 병합 + dfs = asyncio.run(fetch_klines_all(args.symbols, args.interval, args.days)) + + primary = args.symbols[0] + merged = dfs[primary].copy() + for symbol in args.symbols[1:]: + suffix = "_" + symbol.lower().replace("usdt", "") + merged = merged.join( + dfs[symbol].add_suffix(suffix), + how="inner", + ) + + output = args.output.replace("xrpusdt", "combined") + merged.to_parquet(output) + print(f"\n병합 저장 완료: {output} ({len(merged):,}행, {len(merged.columns)}컬럼)") if __name__ == "__main__": diff --git a/scripts/train_and_deploy.sh b/scripts/train_and_deploy.sh index d1382e7..6f851ae 100755 --- a/scripts/train_and_deploy.sh +++ b/scripts/train_and_deploy.sh @@ -16,19 +16,24 @@ PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" cd "$PROJECT_ROOT" -echo "=== [1/3] 데이터 수집 ===" -python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90 --output data/xrpusdt_1m.parquet +echo "=== [1/3] 데이터 수집 (XRP + BTC + ETH 3심볼) ===" +python scripts/fetch_history.py \ + --symbols XRPUSDT BTCUSDT ETHUSDT \ + --interval 1m \ + --days 90 \ + --output data/xrpusdt_1m.parquet +# 결과: data/combined_1m.parquet (타임스탬프 기준 병합) echo "" -echo "=== [2/3] 모델 학습 ===" +echo "=== [2/3] 모델 학습 (21개 피처: XRP 13 + BTC/ETH 상관관계 8) ===" # TRAIN_BACKEND=mlx 로 설정하면 Apple Silicon GPU(Metal)를 사용한다 (기본: lgbm) BACKEND="${TRAIN_BACKEND:-lgbm}" if [ "$BACKEND" = "mlx" ]; then echo " 백엔드: MLX (Apple Silicon GPU)" - python scripts/train_mlx_model.py --data data/xrpusdt_1m.parquet + python scripts/train_mlx_model.py --data data/combined_1m.parquet else echo " 백엔드: LightGBM (CPU)" - python scripts/train_model.py --data data/xrpusdt_1m.parquet + python scripts/train_model.py --data data/combined_1m.parquet fi echo "" diff --git a/scripts/train_model.py b/scripts/train_model.py index f0081e8..bd6226a 100644 --- a/scripts/train_model.py +++ b/scripts/train_model.py @@ -148,11 +148,28 @@ def generate_dataset(df: pd.DataFrame, n_jobs: int | None = None) -> pd.DataFram def train(data_path: str): print(f"데이터 로드: {data_path}") - df = pd.read_parquet(data_path) - print(f"캔들 수: {len(df)}") + df_raw = pd.read_parquet(data_path) + print(f"캔들 수: {len(df_raw)}, 컬럼: {list(df_raw.columns)}") + + # 병합 데이터셋 여부 판별 + btc_df = None + eth_df = None + base_cols = ["open", "high", "low", "close", "volume"] + + if "close_btc" in df_raw.columns: + btc_df = df_raw[[c + "_btc" for c in base_cols]].copy() + btc_df.columns = base_cols + print("BTC 피처 활성화") + + if "close_eth" in df_raw.columns: + eth_df = df_raw[[c + "_eth" for c in base_cols]].copy() + eth_df.columns = base_cols + print("ETH 피처 활성화") + + df = df_raw[base_cols].copy() print("데이터셋 생성 중...") - dataset = generate_dataset_vectorized(df) + dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df) if dataset.empty or "label" not in dataset.columns: raise ValueError(f"데이터셋 생성 실패: 샘플 0개. 위 오류 메시지를 확인하세요.") @@ -162,7 +179,9 @@ def train(data_path: str): if len(dataset) < 200: raise ValueError(f"학습 샘플 부족: {len(dataset)}개 (최소 200 필요)") - X = dataset[FEATURE_COLS] + actual_feature_cols = [c for c in FEATURE_COLS if c in dataset.columns] + print(f"사용 피처: {len(actual_feature_cols)}개 {actual_feature_cols}") + X = dataset[actual_feature_cols] y = dataset["label"] split = int(len(X) * 0.8) @@ -208,6 +227,7 @@ def train(data_path: str): "date": datetime.now().isoformat(), "auc": round(auc, 4), "samples": len(dataset), + "features": len(actual_feature_cols), "model_path": str(MODEL_PATH), }) with open(LOG_PATH, "w") as f: diff --git a/src/bot.py b/src/bot.py index 5557898..1521500 100644 --- a/src/bot.py +++ b/src/bot.py @@ -3,7 +3,7 @@ from loguru import logger from src.config import Config from src.exchange import BinanceFuturesClient from src.indicators import Indicators -from src.data_stream import KlineStream +from src.data_stream import MultiSymbolStream from src.notifier import DiscordNotifier from src.risk_manager import RiskManager from src.ml_filter import MLFilter @@ -18,16 +18,18 @@ class TradingBot: self.risk = RiskManager(config) self.ml_filter = MLFilter() self.current_trade_side: str | None = None # "LONG" | "SHORT" - self.stream = KlineStream( - symbol=config.symbol, + self.stream = MultiSymbolStream( + symbols=[config.symbol, "BTCUSDT", "ETHUSDT"], interval="1m", on_candle=self._on_candle_closed, ) def _on_candle_closed(self, candle: dict): - df = self.stream.get_dataframe() - if df is not None: - asyncio.create_task(self.process_candle(df)) + xrp_df = self.stream.get_dataframe(self.config.symbol) + btc_df = self.stream.get_dataframe("BTCUSDT") + eth_df = self.stream.get_dataframe("ETHUSDT") + if xrp_df is not None: + asyncio.create_task(self.process_candle(xrp_df, btc_df=btc_df, eth_df=eth_df)) async def _recover_position(self) -> None: """재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구.""" @@ -47,7 +49,7 @@ class TradingBot: else: logger.info("기존 포지션 없음 - 신규 진입 대기") - async def process_candle(self, df): + async def process_candle(self, df, btc_df=None, eth_df=None): if not self.risk.is_trading_allowed(): logger.warning("리스크 한도 초과 - 거래 중단") return @@ -57,7 +59,7 @@ class TradingBot: signal = ind.get_signal(df_with_indicators) if signal != "HOLD" and self.ml_filter.is_model_loaded(): - features = build_features(df_with_indicators, signal) + features = build_features(df_with_indicators, signal, btc_df=btc_df, eth_df=eth_df) if not self.ml_filter.should_enter(features): logger.info(f"ML 필터 차단: {signal} 신호 무시") signal = "HOLD" diff --git a/src/data_stream.py b/src/data_stream.py index a5c2509..5d1f0ef 100644 --- a/src/data_stream.py +++ b/src/data_stream.py @@ -6,6 +6,7 @@ from binance import AsyncClient, BinanceSocketManager from loguru import logger + class KlineStream: def __init__( self, @@ -84,3 +85,105 @@ class KlineStream: self.handle_message(msg) finally: await client.close_connection() + + +class MultiSymbolStream: + """ + 바이낸스 Combined WebSocket으로 여러 심볼의 캔들을 단일 연결로 수신한다. + XRP 캔들이 닫힐 때 on_candle 콜백을 호출한다. + """ + + def __init__( + self, + symbols: list[str], + interval: str = "1m", + buffer_size: int = 200, + on_candle: Callable = None, + ): + self.symbols = [s.lower() for s in symbols] + self.interval = interval + self.on_candle = on_candle + self.buffers: dict[str, deque] = { + s: deque(maxlen=buffer_size) for s in self.symbols + } + # 첫 번째 심볼이 주 심볼 (XRP) + self.primary_symbol = self.symbols[0] + + def parse_kline(self, msg: dict) -> dict: + k = msg["k"] + return { + "timestamp": k["t"], + "open": float(k["o"]), + "high": float(k["h"]), + "low": float(k["l"]), + "close": float(k["c"]), + "volume": float(k["v"]), + "is_closed": k["x"], + } + + def handle_message(self, msg: dict): + # Combined stream 메시지는 {"stream": "...", "data": {...}} 형태 + if "stream" in msg: + data = msg["data"] + else: + data = msg + + if data.get("e") != "kline": + return + + symbol = data["s"].lower() + candle = self.parse_kline(data) + + if candle["is_closed"] and symbol in self.buffers: + self.buffers[symbol].append(candle) + if symbol == self.primary_symbol and self.on_candle: + self.on_candle(candle) + + def get_dataframe(self, symbol: str) -> pd.DataFrame | None: + key = symbol.lower() + buf = self.buffers.get(key) + if buf is None or len(buf) < 50: + return None + df = pd.DataFrame(list(buf)) + df.set_index("timestamp", inplace=True) + return df + + async def _preload_history(self, client: AsyncClient, limit: int = 200): + """REST API로 모든 심볼의 과거 캔들을 버퍼에 미리 채운다.""" + for symbol in self.symbols: + logger.info(f"{symbol.upper()} 과거 캔들 {limit}개 로드 중...") + klines = await client.futures_klines( + symbol=symbol.upper(), + interval=self.interval, + limit=limit, + ) + for k in klines[:-1]: + self.buffers[symbol].append({ + "timestamp": k[0], + "open": float(k[1]), + "high": float(k[2]), + "low": float(k[3]), + "close": float(k[4]), + "volume": float(k[5]), + "is_closed": True, + }) + logger.info(f"{symbol.upper()} {len(self.buffers[symbol])}개 로드 완료") + + async def start(self, api_key: str, api_secret: str): + client = await AsyncClient.create( + api_key=api_key, + api_secret=api_secret, + ) + await self._preload_history(client) + bm = BinanceSocketManager(client) + streams = [ + f"{s}@kline_{self.interval}" for s in self.symbols + ] + logger.info(f"Combined WebSocket 시작: {streams}") + try: + async with bm.futures_multiplex_socket(streams) as stream: + while True: + msg = await stream.recv() + self.handle_message(msg) + finally: + await client.close_connection() diff --git a/src/dataset_builder.py b/src/dataset_builder.py index f63cb6a..eb8b44c 100644 --- a/src/dataset_builder.py +++ b/src/dataset_builder.py @@ -115,7 +115,12 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray: return signal_arr -def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.DataFrame: +def _calc_features_vectorized( + d: pd.DataFrame, + signal_arr: np.ndarray, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.DataFrame: """ 신호 발생 인덱스에서 ml_features.py build_features() 로직을 pandas 벡터 연산으로 재현한다. @@ -178,7 +183,7 @@ def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.Dat side = np.where(signal_arr == "LONG", 1.0, 0.0).astype(np.float32) - return pd.DataFrame({ + result = pd.DataFrame({ "rsi": rsi.values.astype(np.float32), "macd_hist": macd_hist.values.astype(np.float32), "bb_pct": bb_pct.astype(np.float32), @@ -195,6 +200,41 @@ def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.Dat "_signal": signal_arr, # 레이블 계산용 임시 컬럼 }, index=d.index) + # BTC/ETH 피처 계산 (제공된 경우) + if btc_df is not None and eth_df is not None: + btc_ret_1 = btc_df["close"].pct_change(1).fillna(0).values + btc_ret_3 = btc_df["close"].pct_change(3).fillna(0).values + btc_ret_5 = btc_df["close"].pct_change(5).fillna(0).values + eth_ret_1 = eth_df["close"].pct_change(1).fillna(0).values + eth_ret_3 = eth_df["close"].pct_change(3).fillna(0).values + eth_ret_5 = eth_df["close"].pct_change(5).fillna(0).values + + def _align(arr: np.ndarray, target_len: int) -> np.ndarray: + if len(arr) >= target_len: + return arr[-target_len:] + return np.concatenate([np.zeros(target_len - len(arr)), arr]) + + n = len(d) + btc_r1 = _align(btc_ret_1, n).astype(np.float32) + btc_r3 = _align(btc_ret_3, n).astype(np.float32) + btc_r5 = _align(btc_ret_5, n).astype(np.float32) + eth_r1 = _align(eth_ret_1, n).astype(np.float32) + eth_r3 = _align(eth_ret_3, n).astype(np.float32) + eth_r5 = _align(eth_ret_5, n).astype(np.float32) + + xrp_r1 = ret_1.astype(np.float32) + xrp_btc_rs = np.where(btc_r1 != 0, xrp_r1 / btc_r1, 0.0).astype(np.float32) + xrp_eth_rs = np.where(eth_r1 != 0, xrp_r1 / eth_r1, 0.0).astype(np.float32) + + extra = pd.DataFrame({ + "btc_ret_1": btc_r1, "btc_ret_3": btc_r3, "btc_ret_5": btc_r5, + "eth_ret_1": eth_r1, "eth_ret_3": eth_r3, "eth_ret_5": eth_r5, + "xrp_btc_rs": xrp_btc_rs, "xrp_eth_rs": xrp_eth_rs, + }, index=d.index) + result = pd.concat([result, extra], axis=1) + + return result + def _calc_labels_vectorized( d: pd.DataFrame, @@ -261,22 +301,28 @@ def _calc_labels_vectorized( return np.array(labels, dtype=np.int8), np.array(valid_mask, dtype=bool) -def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame: +def generate_dataset_vectorized( + df: pd.DataFrame, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.DataFrame: """ 전체 시계열을 1회 계산해 학습 데이터셋을 생성한다. 기존 generate_dataset()의 drop-in 대체제. + btc_df, eth_df가 제공되면 21개 피처로 확장한다. """ print(" [1/3] 전체 시계열 지표 계산 (1회)...") d = _calc_indicators(df) print(" [2/3] 신호 마스킹 및 피처 추출...") signal_arr = _calc_signals(d) - feat_all = _calc_features_vectorized(d, signal_arr) + feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df) # 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만 + available_cols_for_nan_check = [c for c in FEATURE_COLS if c in feat_all.columns] valid_rows = ( (signal_arr != "HOLD") & - (~feat_all[FEATURE_COLS].isna().any(axis=1).values) & + (~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) & (np.arange(len(d)) >= WARMUP) & (np.arange(len(d)) < len(d) - LOOKAHEAD) ) @@ -287,7 +333,9 @@ def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame: labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx) final_idx = sig_idx[valid_mask] - feat_final = feat_all.iloc[final_idx][FEATURE_COLS].copy() + # btc_df/eth_df 제공 여부에 따라 실제 존재하는 피처 컬럼만 선택 + available_feature_cols = [c for c in FEATURE_COLS if c in feat_all.columns] + feat_final = feat_all.iloc[final_idx][available_feature_cols].copy() feat_final["label"] = labels return feat_final.reset_index(drop=True) diff --git a/src/ml_features.py b/src/ml_features.py index 743609f..b4e338c 100644 --- a/src/ml_features.py +++ b/src/ml_features.py @@ -5,12 +5,36 @@ FEATURE_COLS = [ "rsi", "macd_hist", "bb_pct", "ema_align", "stoch_k", "stoch_d", "atr_pct", "vol_ratio", "ret_1", "ret_3", "ret_5", "signal_strength", "side", + "btc_ret_1", "btc_ret_3", "btc_ret_5", + "eth_ret_1", "eth_ret_3", "eth_ret_5", + "xrp_btc_rs", "xrp_eth_rs", ] -def build_features(df: pd.DataFrame, signal: str) -> pd.Series: +def _calc_ret(closes: pd.Series, n: int) -> float: + """n캔들 전 대비 수익률. 데이터 부족 시 0.0.""" + if len(closes) < n + 1: + return 0.0 + prev = closes.iloc[-(n + 1)] + return (closes.iloc[-1] - prev) / prev if prev != 0 else 0.0 + + +def _calc_rs(xrp_ret: float, other_ret: float) -> float: + """상대강도 = xrp_ret / other_ret. 분모 0이면 0.0.""" + if other_ret == 0.0: + return 0.0 + return xrp_ret / other_ret + + +def build_features( + df: pd.DataFrame, + signal: str, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, +) -> pd.Series: """ 기술 지표가 계산된 DataFrame의 마지막 행에서 ML 피처를 추출한다. + btc_df, eth_df가 제공되면 21개 피처를, 없으면 13개 피처를 반환한다. signal: "LONG" | "SHORT" """ last = df.iloc[-1] @@ -38,9 +62,9 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series: vol_ratio = last["volume"] / vol_ma20 if vol_ma20 > 0 else 1.0 closes = df["close"] - ret_1 = (close - closes.iloc[-2]) / closes.iloc[-2] if len(closes) >= 2 else 0.0 - ret_3 = (close - closes.iloc[-4]) / closes.iloc[-4] if len(closes) >= 4 else 0.0 - ret_5 = (close - closes.iloc[-6]) / closes.iloc[-6] if len(closes) >= 6 else 0.0 + ret_1 = _calc_ret(closes, 1) + ret_3 = _calc_ret(closes, 3) + ret_5 = _calc_ret(closes, 5) prev = df.iloc[-2] if len(df) >= 2 else last strength = 0 @@ -65,7 +89,7 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series: if ema_align == -1: strength += 1 if stoch_k > 80 and stoch_k < stoch_d: strength += 1 - return pd.Series({ + base = { "rsi": float(rsi), "macd_hist": float(last.get("macd_hist", 0)), "bb_pct": float(bb_pct), @@ -79,4 +103,25 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series: "ret_5": float(ret_5), "signal_strength": float(strength), "side": 1.0 if signal == "LONG" else 0.0, - }) + } + + if btc_df is not None and eth_df is not None: + btc_ret_1 = _calc_ret(btc_df["close"], 1) + btc_ret_3 = _calc_ret(btc_df["close"], 3) + btc_ret_5 = _calc_ret(btc_df["close"], 5) + eth_ret_1 = _calc_ret(eth_df["close"], 1) + eth_ret_3 = _calc_ret(eth_df["close"], 3) + eth_ret_5 = _calc_ret(eth_df["close"], 5) + + base.update({ + "btc_ret_1": float(btc_ret_1), + "btc_ret_3": float(btc_ret_3), + "btc_ret_5": float(btc_ret_5), + "eth_ret_1": float(eth_ret_1), + "eth_ret_3": float(eth_ret_3), + "eth_ret_5": float(eth_ret_5), + "xrp_btc_rs": float(_calc_rs(ret_1, btc_ret_1)), + "xrp_eth_rs": float(_calc_rs(ret_1, eth_ret_1)), + }) + + return pd.Series(base) diff --git a/tests/test_bot.py b/tests/test_bot.py index 8d5f2de..ac35f07 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -35,6 +35,19 @@ def sample_df(): }) +def test_bot_uses_multi_symbol_stream(config): + from src.data_stream import MultiSymbolStream + with patch("src.bot.BinanceFuturesClient"): + bot = TradingBot(config) + assert isinstance(bot.stream, MultiSymbolStream) + +def test_bot_stream_has_btc_eth_buffers(config): + with patch("src.bot.BinanceFuturesClient"): + bot = TradingBot(config) + assert "btcusdt" in bot.stream.buffers + assert "ethusdt" in bot.stream.buffers + + @pytest.mark.asyncio async def test_bot_processes_signal(config, sample_df): with patch("src.bot.BinanceFuturesClient") as MockExchange: diff --git a/tests/test_data_stream.py b/tests/test_data_stream.py index 26b2ed4..935c3f2 100644 --- a/tests/test_data_stream.py +++ b/tests/test_data_stream.py @@ -2,6 +2,43 @@ import pytest import asyncio from unittest.mock import AsyncMock, patch, MagicMock from src.data_stream import KlineStream +from src.data_stream import MultiSymbolStream + + +def test_multi_symbol_stream_has_three_buffers(): + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + ) + assert "xrpusdt" in stream.buffers + assert "btcusdt" in stream.buffers + assert "ethusdt" in stream.buffers + +def test_multi_symbol_stream_get_dataframe_returns_none_when_empty(): + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + ) + assert stream.get_dataframe("XRPUSDT") is None + +def test_multi_symbol_stream_get_dataframe_returns_df_when_full(): + import pandas as pd + stream = MultiSymbolStream( + symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"], + interval="1m", + buffer_size=200, + ) + candle = { + "timestamp": 1000, "open": 1.0, "high": 1.1, + "low": 0.9, "close": 1.05, "volume": 100.0, "is_closed": True, + } + for i in range(50): + c = candle.copy() + c["timestamp"] = 1000 + i + stream.buffers["xrpusdt"].append(c) + df = stream.get_dataframe("XRPUSDT") + assert df is not None + assert len(df) == 50 @pytest.mark.asyncio diff --git a/tests/test_dataset_builder.py b/tests/test_dataset_builder.py index 311d8c3..653971f 100644 --- a/tests/test_dataset_builder.py +++ b/tests/test_dataset_builder.py @@ -29,12 +29,16 @@ def test_returns_dataframe(sample_df): def test_has_required_columns(sample_df): - """FEATURE_COLS + label 컬럼이 모두 있어야 한다.""" - from src.ml_features import FEATURE_COLS + """기본 13개 피처 + label 컬럼이 모두 있어야 한다.""" + BASE_FEATURE_COLS = [ + "rsi", "macd_hist", "bb_pct", "ema_align", + "stoch_k", "stoch_d", "atr_pct", "vol_ratio", + "ret_1", "ret_3", "ret_5", "signal_strength", "side", + ] result = generate_dataset_vectorized(sample_df) if len(result) > 0: assert "label" in result.columns - for col in FEATURE_COLS: + for col in BASE_FEATURE_COLS: assert col in result.columns, f"컬럼 없음: {col}" @@ -45,6 +49,30 @@ def test_label_is_binary(sample_df): assert set(result["label"].unique()).issubset({0, 1}) +def test_generate_dataset_vectorized_with_btc_eth_has_21_feature_cols(): + """BTC/ETH DataFrame을 전달하면 결과 컬럼이 21개 피처 + label이어야 한다.""" + import pandas as pd + import numpy as np + from src.dataset_builder import generate_dataset_vectorized + from src.ml_features import FEATURE_COLS + + np.random.seed(42) + n = 500 + closes = np.cumprod(1 + np.random.randn(n) * 0.001) * 1.0 + xrp_df = pd.DataFrame({ + "open": closes * 0.999, "high": closes * 1.005, + "low": closes * 0.995, "close": closes, + "volume": np.random.rand(n) * 1000 + 500, + }) + btc_df = xrp_df.copy() * 50000 + eth_df = xrp_df.copy() * 3000 + + result = generate_dataset_vectorized(xrp_df, btc_df=btc_df, eth_df=eth_df) + if not result.empty: + assert set(FEATURE_COLS).issubset(set(result.columns)) + assert len(result.columns) == len(FEATURE_COLS) + 1 # +1 for label + + def test_matches_original_generate_dataset(sample_df): """벡터화 버전과 기존 버전의 샘플 수가 유사해야 한다. diff --git a/tests/test_ml_features.py b/tests/test_ml_features.py index 6b48f96..b7a645f 100644 --- a/tests/test_ml_features.py +++ b/tests/test_ml_features.py @@ -4,6 +4,56 @@ import pytest from src.ml_features import build_features, FEATURE_COLS +def _make_df(n=10, base_price=1.0): + """테스트용 더미 캔들 DataFrame 생성.""" + closes = [base_price * (1 + i * 0.001) for i in range(n)] + return pd.DataFrame({ + "close": closes, "high": [c * 1.01 for c in closes], + "low": [c * 0.99 for c in closes], + "volume": [1000.0] * n, + "rsi": [50.0] * n, "macd": [0.0] * n, "macd_signal": [0.0] * n, + "macd_hist": [0.0] * n, "bb_upper": [c * 1.02 for c in closes], + "bb_lower": [c * 0.98 for c in closes], "ema9": closes, + "ema21": closes, "ema50": closes, "atr": [0.01] * n, + "stoch_k": [50.0] * n, "stoch_d": [50.0] * n, + "vol_ma20": [1000.0] * n, + }) + + +def test_build_features_with_btc_eth_has_21_features(): + xrp_df = _make_df(10, base_price=1.0) + btc_df = _make_df(10, base_price=50000.0) + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + assert len(features) == 21 + +def test_build_features_without_btc_eth_has_13_features(): + xrp_df = _make_df(10, base_price=1.0) + features = build_features(xrp_df, "LONG") + assert len(features) == 13 + +def test_build_features_btc_ret_1_correct(): + xrp_df = _make_df(10, base_price=1.0) + btc_df = _make_df(10, base_price=50000.0) + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + btc_closes = btc_df["close"] + expected_btc_ret_1 = (btc_closes.iloc[-1] - btc_closes.iloc[-2]) / btc_closes.iloc[-2] + assert abs(features["btc_ret_1"] - expected_btc_ret_1) < 1e-6 + +def test_build_features_rs_zero_when_btc_ret_zero(): + xrp_df = _make_df(10, base_price=1.0) + btc_df = _make_df(10, base_price=50000.0) + btc_df["close"] = 50000.0 # 모든 캔들 동일 + eth_df = _make_df(10, base_price=3000.0) + features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df) + assert features["xrp_btc_rs"] == 0.0 + +def test_feature_cols_has_21_items(): + from src.ml_features import FEATURE_COLS + assert len(FEATURE_COLS) == 21 + + def make_df(n=100): """테스트용 최소 DataFrame 생성""" np.random.seed(42) @@ -27,13 +77,19 @@ def test_build_features_returns_series(): assert isinstance(features, pd.Series) +BASE_FEATURE_COLS = [ + "rsi", "macd_hist", "bb_pct", "ema_align", + "stoch_k", "stoch_d", "atr_pct", "vol_ratio", + "ret_1", "ret_3", "ret_5", "signal_strength", "side", +] + def test_build_features_has_all_cols(): from src.indicators import Indicators df = make_df(100) ind = Indicators(df) df_ind = ind.calculate_all() features = build_features(df_ind, signal="LONG") - for col in FEATURE_COLS: + for col in BASE_FEATURE_COLS: assert col in features.index, f"피처 누락: {col}"