feat: enhance data fetching and model training with OI and funding rate integration

- Updated `fetch_history.py` to collect open interest (OI) and funding rate data from Binance, improving the dataset for model training.
- Modified `train_and_deploy.sh` to include options for OI and funding rate collection during data fetching.
- Enhanced `dataset_builder.py` to incorporate OI change and funding rate features with rolling z-score normalization.
- Updated training logs to reflect new metrics and features, ensuring comprehensive tracking of model performance.
- Adjusted feature columns in `ml_features.py` to include OI and funding rate for improved model robustness.
This commit is contained in:
21in7
2026-03-01 22:25:38 +09:00
parent 4245d7cdbf
commit 24d3ba9411
8 changed files with 232 additions and 13 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -162,5 +162,32 @@
"features": 21, "features": 21,
"time_weight_decay": 2.0, "time_weight_decay": 2.0,
"model_path": "models/lgbm_filter.pkl" "model_path": "models/lgbm_filter.pkl"
},
{
"date": "2026-03-01T22:18:59.852831",
"backend": "lgbm",
"auc": 0.5504,
"samples": 533,
"features": 21,
"time_weight_decay": 2.0,
"model_path": "models/lgbm_filter.pkl"
},
{
"date": "2026-03-01T22:19:29.532472",
"backend": "lgbm",
"auc": 0.5504,
"samples": 533,
"features": 21,
"time_weight_decay": 2.0,
"model_path": "models/lgbm_filter.pkl"
},
{
"date": "2026-03-01T22:19:30.938005",
"backend": "mlx",
"auc": 0.5714,
"samples": 533,
"train_sec": 0.1,
"time_weight_decay": 2.0,
"model_path": "models/mlx_filter.weights"
} }
] ]

View File

@@ -2,6 +2,11 @@
바이낸스 선물 REST API로 과거 캔들 데이터를 수집해 parquet으로 저장한다. 바이낸스 선물 REST API로 과거 캔들 데이터를 수집해 parquet으로 저장한다.
사용법: python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90 사용법: python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90
python scripts/fetch_history.py --symbols XRPUSDT BTCUSDT ETHUSDT --days 90 python scripts/fetch_history.py --symbols XRPUSDT BTCUSDT ETHUSDT --days 90
OI/펀딩비 수집 제약:
- OI 히스토리: 바이낸스 API 제한으로 최근 30일치만 제공 (period=15m, limit=500/req)
- 펀딩비: 8시간 주기 → 15분봉에 forward-fill 병합
- 30일 이전 구간은 oi_change=0, funding_rate=0으로 채움
""" """
import sys import sys
from pathlib import Path from pathlib import Path
@@ -9,6 +14,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
import asyncio import asyncio
import argparse import argparse
import aiohttp
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import pandas as pd import pandas as pd
from binance import AsyncClient from binance import AsyncClient
@@ -21,6 +27,7 @@ load_dotenv()
# 1500개씩 가져오므로 90일 1m 데이터 = ~65회 요청/심볼 # 1500개씩 가져오므로 90일 1m 데이터 = ~65회 요청/심볼
# 심볼 간 딜레이 없이 연속 요청하면 레이트 리밋(-1003) 발생 # 심볼 간 딜레이 없이 연속 요청하면 레이트 리밋(-1003) 발생
_REQUEST_DELAY = 0.3 # 초당 ~3.3 req → 안전 마진 충분 _REQUEST_DELAY = 0.3 # 초당 ~3.3 req → 안전 마진 충분
_FAPI_BASE = "https://fapi.binance.com"
def _now_ms() -> int: def _now_ms() -> int:
@@ -107,6 +114,151 @@ async def fetch_klines_all(
return dfs return dfs
async def _fetch_oi_hist(
session: aiohttp.ClientSession,
symbol: str,
period: str = "15m",
) -> pd.DataFrame:
"""
바이낸스 /futures/data/openInterestHist 엔드포인트로 OI 히스토리를 수집한다.
API 제한: 최근 30일치만 제공, 1회 최대 500개.
"""
url = f"{_FAPI_BASE}/futures/data/openInterestHist"
all_rows = []
# 30일 전부터 현재까지 수집
start_ts = int((datetime.now(timezone.utc) - timedelta(days=30)).timestamp() * 1000)
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
print(f" [{symbol}] OI 히스토리 수집 중 (최근 30일)...")
while start_ts < now_ms:
params = {
"symbol": symbol,
"period": period,
"limit": 500,
"startTime": start_ts,
}
async with session.get(url, params=params) as resp:
data = await resp.json()
if not data or not isinstance(data, list):
break
all_rows.extend(data)
last_ts = int(data[-1]["timestamp"])
if last_ts >= now_ms or len(data) < 500:
break
start_ts = last_ts + 1
await asyncio.sleep(_REQUEST_DELAY)
if not all_rows:
print(f" [{symbol}] OI 데이터 없음 — 빈 DataFrame 반환")
return pd.DataFrame(columns=["oi", "oi_value"])
df = pd.DataFrame(all_rows)
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms", utc=True)
df = df.set_index("timestamp")
df = df[["sumOpenInterest", "sumOpenInterestValue"]].copy()
df.columns = ["oi", "oi_value"]
df["oi"] = df["oi"].astype(float)
df["oi_value"] = df["oi_value"].astype(float)
# OI 변화율 (1캔들 전 대비)
df["oi_change"] = df["oi"].pct_change(1).fillna(0)
print(f" [{symbol}] OI 수집 완료: {len(df):,}")
return df[["oi_change"]]
async def _fetch_funding_rate(
session: aiohttp.ClientSession,
symbol: str,
days: int,
) -> pd.DataFrame:
"""
바이낸스 /fapi/v1/fundingRate 엔드포인트로 펀딩비 히스토리를 수집한다.
8시간 주기 데이터 → 15분봉 인덱스에 forward-fill로 병합 예정.
"""
url = f"{_FAPI_BASE}/fapi/v1/fundingRate"
all_rows = []
start_ts = int((datetime.now(timezone.utc) - timedelta(days=days)).timestamp() * 1000)
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
print(f" [{symbol}] 펀딩비 히스토리 수집 중 ({days}일)...")
while start_ts < now_ms:
params = {
"symbol": symbol,
"startTime": start_ts,
"limit": 1000,
}
async with session.get(url, params=params) as resp:
data = await resp.json()
if not data or not isinstance(data, list):
break
all_rows.extend(data)
last_ts = int(data[-1]["fundingTime"])
if last_ts >= now_ms or len(data) < 1000:
break
start_ts = last_ts + 1
await asyncio.sleep(_REQUEST_DELAY)
if not all_rows:
print(f" [{symbol}] 펀딩비 데이터 없음 — 빈 DataFrame 반환")
return pd.DataFrame(columns=["funding_rate"])
df = pd.DataFrame(all_rows)
df["timestamp"] = pd.to_datetime(df["fundingTime"].astype(int), unit="ms", utc=True)
df = df.set_index("timestamp")
df["funding_rate"] = df["fundingRate"].astype(float)
print(f" [{symbol}] 펀딩비 수집 완료: {len(df):,}")
return df[["funding_rate"]]
def _merge_oi_funding(
candles: pd.DataFrame,
oi_df: pd.DataFrame,
funding_df: pd.DataFrame,
) -> pd.DataFrame:
"""
캔들 DataFrame에 OI 변화율과 펀딩비를 병합한다.
- oi_change: 15분봉 인덱스에 nearest merge (없는 구간은 0)
- funding_rate: 8시간 주기 → forward-fill 후 병합 (없는 구간은 0)
"""
result = candles.copy()
# OI 병합: 타임스탬프 기준 reindex + nearest fill
if not oi_df.empty:
oi_reindexed = oi_df.reindex(result.index, method="nearest", tolerance=pd.Timedelta("8min"))
result["oi_change"] = oi_reindexed["oi_change"].fillna(0).astype(float)
else:
result["oi_change"] = 0.0
# 펀딩비 병합: forward-fill (8시간 주기이므로 다음 펀딩 시점까지 이전 값 유지)
if not funding_df.empty:
funding_reindexed = funding_df.reindex(
result.index.union(funding_df.index)
).sort_index()
funding_reindexed = funding_reindexed["funding_rate"].ffill()
result["funding_rate"] = funding_reindexed.reindex(result.index).fillna(0).astype(float)
else:
result["funding_rate"] = 0.0
return result
async def _fetch_oi_and_funding(
symbol: str,
days: int,
candles: pd.DataFrame,
) -> pd.DataFrame:
"""단일 심볼의 OI + 펀딩비를 수집해 캔들에 병합한다."""
async with aiohttp.ClientSession() as session:
oi_df = await _fetch_oi_hist(session, symbol)
await asyncio.sleep(1)
funding_df = await _fetch_funding_rate(session, symbol, days)
return _merge_oi_funding(candles, oi_df, funding_df)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="바이낸스 선물 과거 캔들 수집. 단일 심볼 또는 멀티 심볼 병합 저장." description="바이낸스 선물 과거 캔들 수집. 단일 심볼 또는 멀티 심볼 병합 저장."
@@ -116,6 +268,10 @@ def main():
parser.add_argument("--interval", default="15m") parser.add_argument("--interval", default="15m")
parser.add_argument("--days", type=int, default=365) parser.add_argument("--days", type=int, default=365)
parser.add_argument("--output", default="data/combined_15m.parquet") parser.add_argument("--output", default="data/combined_15m.parquet")
parser.add_argument(
"--no-oi", action="store_true",
help="OI/펀딩비 수집을 건너뜀 (캔들 데이터만 저장)",
)
args = parser.parse_args() args = parser.parse_args()
# 하위 호환: --symbol 단독 사용 시 symbols로 통합 # 하위 호환: --symbol 단독 사용 시 symbols로 통합
@@ -124,8 +280,11 @@ def main():
if len(args.symbols) == 1: if len(args.symbols) == 1:
df = asyncio.run(fetch_klines(args.symbols[0], args.interval, args.days)) df = asyncio.run(fetch_klines(args.symbols[0], args.interval, args.days))
if not args.no_oi:
print(f"\n[OI/펀딩비] {args.symbols[0]} 수집 중...")
df = asyncio.run(_fetch_oi_and_funding(args.symbols[0], args.days, df))
df.to_parquet(args.output) df.to_parquet(args.output)
print(f"저장 완료: {args.output} ({len(df):,}행)") print(f"저장 완료: {args.output} ({len(df):,}, {len(df.columns)}컬럼)")
else: else:
# 멀티 심볼: 단일 클라이언트로 순차 수집 후 타임스탬프 기준 inner join 병합 # 멀티 심볼: 단일 클라이언트로 순차 수집 후 타임스탬프 기준 inner join 병합
dfs = asyncio.run(fetch_klines_all(args.symbols, args.interval, args.days)) dfs = asyncio.run(fetch_klines_all(args.symbols, args.interval, args.days))
@@ -139,6 +298,11 @@ def main():
how="inner", how="inner",
) )
# 주 심볼(XRP)에 대해서만 OI/펀딩비 수집 후 병합
if not args.no_oi:
print(f"\n[OI/펀딩비] {primary} 수집 중...")
merged = asyncio.run(_fetch_oi_and_funding(primary, args.days, merged))
output = args.output.replace("xrpusdt", "combined") output = args.output.replace("xrpusdt", "combined")
merged.to_parquet(output) merged.to_parquet(output)
print(f"\n병합 저장 완료: {output} ({len(merged):,}행, {len(merged.columns)}컬럼)") print(f"\n병합 저장 완료: {output} ({len(merged):,}행, {len(merged.columns)}컬럼)")

View File

@@ -1,10 +1,12 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# 맥미니에서 전체 학습 파이프라인을 실행하고 LXC로 배포한다. # 맥미니에서 전체 학습 파이프라인을 실행하고 LXC로 배포한다.
# 사용법: bash scripts/train_and_deploy.sh [mlx|lgbm] # 사용법: bash scripts/train_and_deploy.sh [mlx|lgbm] [wf-splits]
# #
# 예시: # 예시:
# bash scripts/train_and_deploy.sh # LightGBM (기본값) # bash scripts/train_and_deploy.sh # LightGBM + Walk-Forward 5폴드 (기본값)
# bash scripts/train_and_deploy.sh mlx # MLX GPU 학습 # bash scripts/train_and_deploy.sh mlx # MLX GPU 학습 + Walk-Forward 5폴드
# bash scripts/train_and_deploy.sh lgbm 3 # LightGBM + Walk-Forward 3폴드
# bash scripts/train_and_deploy.sh lgbm 0 # Walk-Forward 건너뜀 (단일 학습만)
set -euo pipefail set -euo pipefail
@@ -20,10 +22,11 @@ else
fi fi
BACKEND="${1:-lgbm}" BACKEND="${1:-lgbm}"
WF_SPLITS="${2:-5}" # 두 번째 인자: Walk-Forward 폴드 수 (0이면 건너뜀)
cd "$PROJECT_ROOT" cd "$PROJECT_ROOT"
echo "=== [1/3] 데이터 수집 (XRP + BTC + ETH 3심볼, 1년치) ===" echo "=== [1/3] 데이터 수집 (XRP + BTC + ETH 3심볼, 1년치 + OI/펀딩비) ==="
python scripts/fetch_history.py \ python scripts/fetch_history.py \
--symbols XRPUSDT BTCUSDT ETHUSDT \ --symbols XRPUSDT BTCUSDT ETHUSDT \
--interval 15m \ --interval 15m \
@@ -31,7 +34,7 @@ python scripts/fetch_history.py \
--output data/combined_15m.parquet --output data/combined_15m.parquet
echo "" echo ""
echo "=== [2/3] 모델 학습 (21개 피처: XRP 13 + BTC/ETH 상관관계 8) ===" echo "=== [2/3] 모델 학습 (23개 피처: XRP 13 + BTC/ETH 8 + OI/펀딩비 2) ==="
DECAY="${TIME_WEIGHT_DECAY:-2.0}" DECAY="${TIME_WEIGHT_DECAY:-2.0}"
if [ "$BACKEND" = "mlx" ]; then if [ "$BACKEND" = "mlx" ]; then
echo " 백엔드: MLX (Apple Silicon GPU), decay=${DECAY}" echo " 백엔드: MLX (Apple Silicon GPU), decay=${DECAY}"
@@ -41,6 +44,17 @@ else
python scripts/train_model.py --data data/combined_15m.parquet --decay "$DECAY" python scripts/train_model.py --data data/combined_15m.parquet --decay "$DECAY"
fi fi
# Walk-Forward 검증 (WF_SPLITS > 0 인 경우, lgbm 백엔드만 지원)
if [ "$WF_SPLITS" -gt 0 ] 2>/dev/null && [ "$BACKEND" != "mlx" ]; then
echo ""
echo "=== [2.5/3] Walk-Forward 검증 (${WF_SPLITS}폴드) ==="
python scripts/train_model.py \
--data data/combined_15m.parquet \
--decay "$DECAY" \
--wf \
--wf-splits "$WF_SPLITS"
fi
echo "" echo ""
echo "=== [3/3] LXC 배포 ===" echo "=== [3/3] LXC 배포 ==="
bash scripts/deploy_model.sh "$BACKEND" bash scripts/deploy_model.sh "$BACKEND"

View File

@@ -115,14 +115,17 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray:
return signal_arr return signal_arr
def _rolling_zscore(arr: np.ndarray, window: int = 200) -> np.ndarray: def _rolling_zscore(arr: np.ndarray, window: int = 288) -> np.ndarray:
"""rolling window z-score 정규화. window 미만 구간은 0으로 채운다. """rolling window z-score 정규화. 15분봉 기준 3일(288캔들) 윈도우.
절대값 피처(수익률, ATR 등)를 레짐 변화에 무관하게 만든다.""" 절대값 피처(ATR, 수익률 등)를 레짐 변화에 무관하게 만든다.
min_periods=1로 초반 데이터도 활용하며, ddof=0(모표준편차)으로 계산한다."""
s = pd.Series(arr.astype(np.float64)) s = pd.Series(arr.astype(np.float64))
mean = s.rolling(window, min_periods=window).mean() r = s.rolling(window=window, min_periods=1)
std = s.rolling(window, min_periods=window).std() mean = r.mean()
z = (s - mean) / std.where(std > 0, other=np.nan) std = r.std(ddof=0)
return z.fillna(0).values.astype(np.float32) std = std.where(std >= 1e-8, other=1e-8)
z = (s - mean) / std
return z.values.astype(np.float32)
def _calc_features_vectorized( def _calc_features_vectorized(
@@ -255,6 +258,14 @@ def _calc_features_vectorized(
}, index=d.index) }, index=d.index)
result = pd.concat([result, extra], axis=1) result = pd.concat([result, extra], axis=1)
# OI 변화율 / 펀딩비 피처 (parquet에 컬럼이 있으면 z-score, 없으면 0)
# OI는 최근 30일치만 제공되므로 이전 구간은 0으로 채워진 채로 들어옴
oi_raw = d["oi_change"].values if "oi_change" in d.columns else np.zeros(len(d))
fr_raw = d["funding_rate"].values if "funding_rate" in d.columns else np.zeros(len(d))
result["oi_change"] = _rolling_zscore(oi_raw.astype(np.float64))
result["funding_rate"] = _rolling_zscore(fr_raw.astype(np.float64))
return result return result

View File

@@ -8,6 +8,9 @@ FEATURE_COLS = [
"btc_ret_1", "btc_ret_3", "btc_ret_5", "btc_ret_1", "btc_ret_3", "btc_ret_5",
"eth_ret_1", "eth_ret_3", "eth_ret_5", "eth_ret_1", "eth_ret_3", "eth_ret_5",
"xrp_btc_rs", "xrp_eth_rs", "xrp_btc_rs", "xrp_eth_rs",
# 시장 미시구조: OI 변화율(z-score), 펀딩비(z-score)
# parquet에 oi_change/funding_rate 컬럼이 없으면 dataset_builder에서 0으로 채움
"oi_change", "funding_rate",
] ]