feat: implement BTC/ETH correlation features for improved model accuracy

- Added a new design document outlining the integration of BTC/ETH candle data as additional features in the XRP ML filter, enhancing prediction accuracy.
- Introduced `MultiSymbolStream` for combined WebSocket data retrieval of XRP, BTC, and ETH.
- Expanded feature set from 13 to 21 by including 8 new BTC/ETH-related features.
- Updated various scripts and modules to support the new feature set and data handling.
- Enhanced training and deployment scripts to accommodate the new dataset structure.

This commit lays the groundwork for improved model performance by leveraging the correlation between BTC and ETH with XRP.
This commit is contained in:
21in7
2026-03-01 19:30:17 +09:00
parent c4062c39d3
commit d1af736bfc
15 changed files with 1448 additions and 68 deletions

View File

@@ -3,7 +3,7 @@ from loguru import logger
from src.config import Config
from src.exchange import BinanceFuturesClient
from src.indicators import Indicators
from src.data_stream import KlineStream
from src.data_stream import MultiSymbolStream
from src.notifier import DiscordNotifier
from src.risk_manager import RiskManager
from src.ml_filter import MLFilter
@@ -18,16 +18,18 @@ class TradingBot:
self.risk = RiskManager(config)
self.ml_filter = MLFilter()
self.current_trade_side: str | None = None # "LONG" | "SHORT"
self.stream = KlineStream(
symbol=config.symbol,
self.stream = MultiSymbolStream(
symbols=[config.symbol, "BTCUSDT", "ETHUSDT"],
interval="1m",
on_candle=self._on_candle_closed,
)
def _on_candle_closed(self, candle: dict):
df = self.stream.get_dataframe()
if df is not None:
asyncio.create_task(self.process_candle(df))
xrp_df = self.stream.get_dataframe(self.config.symbol)
btc_df = self.stream.get_dataframe("BTCUSDT")
eth_df = self.stream.get_dataframe("ETHUSDT")
if xrp_df is not None:
asyncio.create_task(self.process_candle(xrp_df, btc_df=btc_df, eth_df=eth_df))
async def _recover_position(self) -> None:
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
@@ -47,7 +49,7 @@ class TradingBot:
else:
logger.info("기존 포지션 없음 - 신규 진입 대기")
async def process_candle(self, df):
async def process_candle(self, df, btc_df=None, eth_df=None):
if not self.risk.is_trading_allowed():
logger.warning("리스크 한도 초과 - 거래 중단")
return
@@ -57,7 +59,7 @@ class TradingBot:
signal = ind.get_signal(df_with_indicators)
if signal != "HOLD" and self.ml_filter.is_model_loaded():
features = build_features(df_with_indicators, signal)
features = build_features(df_with_indicators, signal, btc_df=btc_df, eth_df=eth_df)
if not self.ml_filter.should_enter(features):
logger.info(f"ML 필터 차단: {signal} 신호 무시")
signal = "HOLD"

View File

@@ -6,6 +6,7 @@ from binance import AsyncClient, BinanceSocketManager
from loguru import logger
class KlineStream:
def __init__(
self,
@@ -84,3 +85,105 @@ class KlineStream:
self.handle_message(msg)
finally:
await client.close_connection()
class MultiSymbolStream:
"""
바이낸스 Combined WebSocket으로 여러 심볼의 캔들을 단일 연결로 수신한다.
XRP 캔들이 닫힐 때 on_candle 콜백을 호출한다.
"""
def __init__(
self,
symbols: list[str],
interval: str = "1m",
buffer_size: int = 200,
on_candle: Callable = None,
):
self.symbols = [s.lower() for s in symbols]
self.interval = interval
self.on_candle = on_candle
self.buffers: dict[str, deque] = {
s: deque(maxlen=buffer_size) for s in self.symbols
}
# 첫 번째 심볼이 주 심볼 (XRP)
self.primary_symbol = self.symbols[0]
def parse_kline(self, msg: dict) -> dict:
k = msg["k"]
return {
"timestamp": k["t"],
"open": float(k["o"]),
"high": float(k["h"]),
"low": float(k["l"]),
"close": float(k["c"]),
"volume": float(k["v"]),
"is_closed": k["x"],
}
def handle_message(self, msg: dict):
# Combined stream 메시지는 {"stream": "...", "data": {...}} 형태
if "stream" in msg:
data = msg["data"]
else:
data = msg
if data.get("e") != "kline":
return
symbol = data["s"].lower()
candle = self.parse_kline(data)
if candle["is_closed"] and symbol in self.buffers:
self.buffers[symbol].append(candle)
if symbol == self.primary_symbol and self.on_candle:
self.on_candle(candle)
def get_dataframe(self, symbol: str) -> pd.DataFrame | None:
key = symbol.lower()
buf = self.buffers.get(key)
if buf is None or len(buf) < 50:
return None
df = pd.DataFrame(list(buf))
df.set_index("timestamp", inplace=True)
return df
async def _preload_history(self, client: AsyncClient, limit: int = 200):
"""REST API로 모든 심볼의 과거 캔들을 버퍼에 미리 채운다."""
for symbol in self.symbols:
logger.info(f"{symbol.upper()} 과거 캔들 {limit}개 로드 중...")
klines = await client.futures_klines(
symbol=symbol.upper(),
interval=self.interval,
limit=limit,
)
for k in klines[:-1]:
self.buffers[symbol].append({
"timestamp": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"is_closed": True,
})
logger.info(f"{symbol.upper()} {len(self.buffers[symbol])}개 로드 완료")
async def start(self, api_key: str, api_secret: str):
client = await AsyncClient.create(
api_key=api_key,
api_secret=api_secret,
)
await self._preload_history(client)
bm = BinanceSocketManager(client)
streams = [
f"{s}@kline_{self.interval}" for s in self.symbols
]
logger.info(f"Combined WebSocket 시작: {streams}")
try:
async with bm.futures_multiplex_socket(streams) as stream:
while True:
msg = await stream.recv()
self.handle_message(msg)
finally:
await client.close_connection()

View File

@@ -115,7 +115,12 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray:
return signal_arr
def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.DataFrame:
def _calc_features_vectorized(
d: pd.DataFrame,
signal_arr: np.ndarray,
btc_df: pd.DataFrame | None = None,
eth_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""
신호 발생 인덱스에서 ml_features.py build_features() 로직을
pandas 벡터 연산으로 재현한다.
@@ -178,7 +183,7 @@ def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.Dat
side = np.where(signal_arr == "LONG", 1.0, 0.0).astype(np.float32)
return pd.DataFrame({
result = pd.DataFrame({
"rsi": rsi.values.astype(np.float32),
"macd_hist": macd_hist.values.astype(np.float32),
"bb_pct": bb_pct.astype(np.float32),
@@ -195,6 +200,41 @@ def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.Dat
"_signal": signal_arr, # 레이블 계산용 임시 컬럼
}, index=d.index)
# BTC/ETH 피처 계산 (제공된 경우)
if btc_df is not None and eth_df is not None:
btc_ret_1 = btc_df["close"].pct_change(1).fillna(0).values
btc_ret_3 = btc_df["close"].pct_change(3).fillna(0).values
btc_ret_5 = btc_df["close"].pct_change(5).fillna(0).values
eth_ret_1 = eth_df["close"].pct_change(1).fillna(0).values
eth_ret_3 = eth_df["close"].pct_change(3).fillna(0).values
eth_ret_5 = eth_df["close"].pct_change(5).fillna(0).values
def _align(arr: np.ndarray, target_len: int) -> np.ndarray:
if len(arr) >= target_len:
return arr[-target_len:]
return np.concatenate([np.zeros(target_len - len(arr)), arr])
n = len(d)
btc_r1 = _align(btc_ret_1, n).astype(np.float32)
btc_r3 = _align(btc_ret_3, n).astype(np.float32)
btc_r5 = _align(btc_ret_5, n).astype(np.float32)
eth_r1 = _align(eth_ret_1, n).astype(np.float32)
eth_r3 = _align(eth_ret_3, n).astype(np.float32)
eth_r5 = _align(eth_ret_5, n).astype(np.float32)
xrp_r1 = ret_1.astype(np.float32)
xrp_btc_rs = np.where(btc_r1 != 0, xrp_r1 / btc_r1, 0.0).astype(np.float32)
xrp_eth_rs = np.where(eth_r1 != 0, xrp_r1 / eth_r1, 0.0).astype(np.float32)
extra = pd.DataFrame({
"btc_ret_1": btc_r1, "btc_ret_3": btc_r3, "btc_ret_5": btc_r5,
"eth_ret_1": eth_r1, "eth_ret_3": eth_r3, "eth_ret_5": eth_r5,
"xrp_btc_rs": xrp_btc_rs, "xrp_eth_rs": xrp_eth_rs,
}, index=d.index)
result = pd.concat([result, extra], axis=1)
return result
def _calc_labels_vectorized(
d: pd.DataFrame,
@@ -261,22 +301,28 @@ def _calc_labels_vectorized(
return np.array(labels, dtype=np.int8), np.array(valid_mask, dtype=bool)
def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame:
def generate_dataset_vectorized(
df: pd.DataFrame,
btc_df: pd.DataFrame | None = None,
eth_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""
전체 시계열을 1회 계산해 학습 데이터셋을 생성한다.
기존 generate_dataset()의 drop-in 대체제.
btc_df, eth_df가 제공되면 21개 피처로 확장한다.
"""
print(" [1/3] 전체 시계열 지표 계산 (1회)...")
d = _calc_indicators(df)
print(" [2/3] 신호 마스킹 및 피처 추출...")
signal_arr = _calc_signals(d)
feat_all = _calc_features_vectorized(d, signal_arr)
feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
# 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만
available_cols_for_nan_check = [c for c in FEATURE_COLS if c in feat_all.columns]
valid_rows = (
(signal_arr != "HOLD") &
(~feat_all[FEATURE_COLS].isna().any(axis=1).values) &
(~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) &
(np.arange(len(d)) >= WARMUP) &
(np.arange(len(d)) < len(d) - LOOKAHEAD)
)
@@ -287,7 +333,9 @@ def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame:
labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx)
final_idx = sig_idx[valid_mask]
feat_final = feat_all.iloc[final_idx][FEATURE_COLS].copy()
# btc_df/eth_df 제공 여부에 따라 실제 존재하는 피처 컬럼만 선택
available_feature_cols = [c for c in FEATURE_COLS if c in feat_all.columns]
feat_final = feat_all.iloc[final_idx][available_feature_cols].copy()
feat_final["label"] = labels
return feat_final.reset_index(drop=True)

View File

@@ -5,12 +5,36 @@ FEATURE_COLS = [
"rsi", "macd_hist", "bb_pct", "ema_align",
"stoch_k", "stoch_d", "atr_pct", "vol_ratio",
"ret_1", "ret_3", "ret_5", "signal_strength", "side",
"btc_ret_1", "btc_ret_3", "btc_ret_5",
"eth_ret_1", "eth_ret_3", "eth_ret_5",
"xrp_btc_rs", "xrp_eth_rs",
]
def build_features(df: pd.DataFrame, signal: str) -> pd.Series:
def _calc_ret(closes: pd.Series, n: int) -> float:
"""n캔들 전 대비 수익률. 데이터 부족 시 0.0."""
if len(closes) < n + 1:
return 0.0
prev = closes.iloc[-(n + 1)]
return (closes.iloc[-1] - prev) / prev if prev != 0 else 0.0
def _calc_rs(xrp_ret: float, other_ret: float) -> float:
"""상대강도 = xrp_ret / other_ret. 분모 0이면 0.0."""
if other_ret == 0.0:
return 0.0
return xrp_ret / other_ret
def build_features(
df: pd.DataFrame,
signal: str,
btc_df: pd.DataFrame | None = None,
eth_df: pd.DataFrame | None = None,
) -> pd.Series:
"""
기술 지표가 계산된 DataFrame의 마지막 행에서 ML 피처를 추출한다.
btc_df, eth_df가 제공되면 21개 피처를, 없으면 13개 피처를 반환한다.
signal: "LONG" | "SHORT"
"""
last = df.iloc[-1]
@@ -38,9 +62,9 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series:
vol_ratio = last["volume"] / vol_ma20 if vol_ma20 > 0 else 1.0
closes = df["close"]
ret_1 = (close - closes.iloc[-2]) / closes.iloc[-2] if len(closes) >= 2 else 0.0
ret_3 = (close - closes.iloc[-4]) / closes.iloc[-4] if len(closes) >= 4 else 0.0
ret_5 = (close - closes.iloc[-6]) / closes.iloc[-6] if len(closes) >= 6 else 0.0
ret_1 = _calc_ret(closes, 1)
ret_3 = _calc_ret(closes, 3)
ret_5 = _calc_ret(closes, 5)
prev = df.iloc[-2] if len(df) >= 2 else last
strength = 0
@@ -65,7 +89,7 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series:
if ema_align == -1: strength += 1
if stoch_k > 80 and stoch_k < stoch_d: strength += 1
return pd.Series({
base = {
"rsi": float(rsi),
"macd_hist": float(last.get("macd_hist", 0)),
"bb_pct": float(bb_pct),
@@ -79,4 +103,25 @@ def build_features(df: pd.DataFrame, signal: str) -> pd.Series:
"ret_5": float(ret_5),
"signal_strength": float(strength),
"side": 1.0 if signal == "LONG" else 0.0,
})
}
if btc_df is not None and eth_df is not None:
btc_ret_1 = _calc_ret(btc_df["close"], 1)
btc_ret_3 = _calc_ret(btc_df["close"], 3)
btc_ret_5 = _calc_ret(btc_df["close"], 5)
eth_ret_1 = _calc_ret(eth_df["close"], 1)
eth_ret_3 = _calc_ret(eth_df["close"], 3)
eth_ret_5 = _calc_ret(eth_df["close"], 5)
base.update({
"btc_ret_1": float(btc_ret_1),
"btc_ret_3": float(btc_ret_3),
"btc_ret_5": float(btc_ret_5),
"eth_ret_1": float(eth_ret_1),
"eth_ret_3": float(eth_ret_3),
"eth_ret_5": float(eth_ret_5),
"xrp_btc_rs": float(_calc_rs(ret_1, btc_ret_1)),
"xrp_eth_rs": float(_calc_rs(ret_1, eth_ret_1)),
})
return pd.Series(base)