From e2b045482596d4dc87b6b8937475b6cb2a82520c Mon Sep 17 00:00:00 2001 From: 21in7 Date: Sun, 22 Mar 2026 22:20:30 +0900 Subject: [PATCH] feat: add L/S ratio collector service for top_acct and global ratios Collect top trader account L/S ratio and global L/S ratio every 15 minutes for XRP, BTC, ETH (6 API calls/cycle) and persist to per-symbol parquet files. Deployed as a separate Docker service reusing the bot image. Co-Authored-By: Claude Opus 4.6 (1M context) --- Jenkinsfile | 7 +- docker-compose.yml | 16 ++ scripts/collect_ls_ratio.py | 121 +++++++++++++++ scripts/collect_ls_ratio_loop.sh | 27 ++++ scripts/taker_ratio_analysis.py | 256 +++++++++++++++++++++++++++++++ 5 files changed, 425 insertions(+), 2 deletions(-) create mode 100644 scripts/collect_ls_ratio.py create mode 100755 scripts/collect_ls_ratio_loop.sh create mode 100644 scripts/taker_ratio_analysis.py diff --git a/Jenkinsfile b/Jenkinsfile index 36967ad..f7b2389 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -50,7 +50,7 @@ pipeline { env.DASH_API_CHANGED = 'true' env.DASH_UI_CHANGED = 'true' } else { - env.BOT_CHANGED = (changes =~ /(?m)^(src\/|main\.py|requirements\.txt|Dockerfile)/).find() ? 'true' : 'false' + env.BOT_CHANGED = (changes =~ /(?m)^(src\/|scripts\/|main\.py|requirements\.txt|Dockerfile)/).find() ? 'true' : 'false' env.DASH_API_CHANGED = (changes =~ /(?m)^dashboard\/api\//).find() ? 'true' : 'false' env.DASH_UI_CHANGED = (changes =~ /(?m)^dashboard\/ui\//).find() ? 'true' : 'false' } @@ -123,7 +123,10 @@ pipeline { // 변경된 서비스만 pull & recreate (나머지는 중단 없음) def services = [] - if (env.BOT_CHANGED == 'true') services.add('cointrader') + if (env.BOT_CHANGED == 'true') { + services.add('cointrader') + services.add('ls-ratio-collector') + } if (env.DASH_API_CHANGED == 'true') services.add('dashboard-api') if (env.DASH_UI_CHANGED == 'true') services.add('dashboard-ui') diff --git a/docker-compose.yml b/docker-compose.yml index 731b80e..5d81132 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,5 +52,21 @@ services: max-size: "10m" max-file: "3" + ls-ratio-collector: + image: git.gihyeon.com/gihyeon/cointrader:latest + container_name: ls-ratio-collector + restart: unless-stopped + environment: + - TZ=Asia/Seoul + - PYTHONUNBUFFERED=1 + volumes: + - ./data:/app/data + entrypoint: ["sh", "scripts/collect_ls_ratio_loop.sh"] + logging: + driver: "json-file" + options: + max-size: "5m" + max-file: "3" + volumes: dashboard-data: diff --git a/scripts/collect_ls_ratio.py b/scripts/collect_ls_ratio.py new file mode 100644 index 0000000..7eed033 --- /dev/null +++ b/scripts/collect_ls_ratio.py @@ -0,0 +1,121 @@ +""" +Long/Short Ratio 장기 수집 스크립트. +15분마다 cron 실행하여 Binance Trading Data API에서 +top_acct_ls_ratio, global_ls_ratio를 data/{symbol}/ls_ratio_15m.parquet에 누적한다. + +수집 대상: + - topLongShortAccountRatio × 3심볼 (XRPUSDT, BTCUSDT, ETHUSDT) + - globalLongShortAccountRatio × 3심볼 (XRPUSDT, BTCUSDT, ETHUSDT) + → 총 API 호출 6회/15분 (rate limit 무관) + +사용법: + python scripts/collect_ls_ratio.py + python scripts/collect_ls_ratio.py --symbols XRPUSDT BTCUSDT +""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import argparse +import asyncio +from datetime import datetime, timezone + +import aiohttp +import pandas as pd + +BASE_URL = "https://fapi.binance.com" +DEFAULT_SYMBOLS = ["XRPUSDT", "BTCUSDT", "ETHUSDT"] + +ENDPOINTS = { + "top_acct_ls_ratio": "/futures/data/topLongShortAccountRatio", + "global_ls_ratio": "/futures/data/globalLongShortAccountRatio", +} + + +async def fetch_latest(session: aiohttp.ClientSession, symbol: str) -> dict | None: + """심볼 하나에 대해 두 ratio의 최신 1건씩 가져온다.""" + row = {"timestamp": None, "symbol": symbol} + + for col_name, endpoint in ENDPOINTS.items(): + url = f"{BASE_URL}{endpoint}" + params = {"symbol": symbol, "period": "15m", "limit": 1} + try: + async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp: + data = await resp.json() + if isinstance(data, list) and data: + row[col_name] = float(data[0]["longShortRatio"]) + # 타임스탬프는 첫 번째 응답에서 설정 + if row["timestamp"] is None: + row["timestamp"] = pd.Timestamp( + int(data[0]["timestamp"]), unit="ms", tz="UTC" + ) + else: + print(f"[WARN] {symbol} {col_name}: unexpected response: {data}") + return None + except Exception as e: + print(f"[ERROR] {symbol} {col_name}: {e}") + return None + + return row + + +async def collect(symbols: list[str]): + """모든 심볼 데이터를 수집하고 parquet에 추가한다.""" + async with aiohttp.ClientSession() as session: + tasks = [fetch_latest(session, sym) for sym in symbols] + results = await asyncio.gather(*tasks) + + now = datetime.now(timezone.utc) + collected = 0 + + for row in results: + if row is None: + continue + + symbol = row["symbol"] + out_path = Path(f"data/{symbol.lower()}/ls_ratio_15m.parquet") + out_path.parent.mkdir(parents=True, exist_ok=True) + + new_df = pd.DataFrame([{ + "timestamp": row["timestamp"], + "top_acct_ls_ratio": row["top_acct_ls_ratio"], + "global_ls_ratio": row["global_ls_ratio"], + }]) + + if out_path.exists(): + existing = pd.read_parquet(out_path) + # 중복 방지: 동일 timestamp가 이미 있으면 스킵 + if row["timestamp"] in existing["timestamp"].values: + print(f"[SKIP] {symbol} ts={row['timestamp']} already exists") + continue + combined = pd.concat([existing, new_df], ignore_index=True) + else: + combined = new_df + + combined.to_parquet(out_path, index=False) + collected += 1 + print( + f"[{now.isoformat()}] {symbol}: " + f"top_acct={row['top_acct_ls_ratio']:.4f}, " + f"global={row['global_ls_ratio']:.4f} " + f"→ {out_path} ({len(combined)} rows)" + ) + + if collected == 0: + print(f"[{now.isoformat()}] No new data collected") + + +def main(): + parser = argparse.ArgumentParser(description="L/S Ratio 장기 수집") + parser.add_argument( + "--symbols", nargs="+", default=DEFAULT_SYMBOLS, + help="수집 대상 심볼 (기본: XRPUSDT BTCUSDT ETHUSDT)", + ) + args = parser.parse_args() + asyncio.run(collect(args.symbols)) + + +if __name__ == "__main__": + main() diff --git a/scripts/collect_ls_ratio_loop.sh b/scripts/collect_ls_ratio_loop.sh new file mode 100755 index 0000000..182a9ee --- /dev/null +++ b/scripts/collect_ls_ratio_loop.sh @@ -0,0 +1,27 @@ +#!/bin/sh +# 15분 경계에 맞춰 collect_ls_ratio.py를 반복 실행한다. +# Docker 컨테이너 entrypoint용. + +set -e + +echo "[collect_ls_ratio] Starting loop (interval: 15m)" + +while true; do + # 현재 분/초를 기준으로 다음 15분 경계(00/15/30/45)까지 대기 + now_min=$(date -u +%M | sed 's/^0//') + now_sec=$(date -u +%S | sed 's/^0//') + # 다음 15분 경계까지 남은 분 + remainder=$((now_min % 15)) + wait_min=$((15 - remainder)) + # 초 단위로 변환 (경계 직후 10초 여유) + wait_sec=$(( wait_min * 60 - now_sec + 10 )) + if [ "$wait_sec" -le 10 ]; then + wait_sec=$((wait_sec + 900)) + fi + + echo "[collect_ls_ratio] Next run in ${wait_sec}s ($(date -u))" + sleep "$wait_sec" + + echo "[collect_ls_ratio] Running collection... ($(date -u))" + python scripts/collect_ls_ratio.py || echo "[collect_ls_ratio] ERROR: collection failed" +done diff --git a/scripts/taker_ratio_analysis.py b/scripts/taker_ratio_analysis.py new file mode 100644 index 0000000..759fba4 --- /dev/null +++ b/scripts/taker_ratio_analysis.py @@ -0,0 +1,256 @@ +""" +Taker Buy/Sell Ratio vs Next-Candle Price Change Correlation Analysis +- Taker Buy Ratio (from klines + Trading Data API) +- Long/Short Ratio (global) +- Top Trader Long/Short Ratio (accounts & positions) + +Usage: python scripts/taker_ratio_analysis.py [SYMBOL1] [SYMBOL2] ... +Default: XRPUSDT BTCUSDT ETHUSDT +""" + +import asyncio +import aiohttp +import pandas as pd +import numpy as np +from datetime import datetime, timedelta, timezone +import sys + +BASE = "https://fapi.binance.com" +SYMBOLS = sys.argv[1:] if len(sys.argv) > 1 else ["XRPUSDT", "BTCUSDT", "ETHUSDT"] +INTERVAL = "15m" +DAYS = 30 + +async def fetch_json(session, url, params): + async with session.get(url, params=params) as resp: + return await resp.json() + +async def fetch_klines(session, symbol, start_ms, end_ms): + all_klines = [] + current = start_ms + while current < end_ms: + params = {"symbol": symbol, "interval": INTERVAL, "startTime": current, "endTime": end_ms, "limit": 1500} + data = await fetch_json(session, f"{BASE}/fapi/v1/klines", params) + if not data: + break + all_klines.extend(data) + current = data[-1][0] + 1 + return all_klines + +async def fetch_ratio(session, url, symbol): + params = {"symbol": symbol, "period": INTERVAL, "limit": 500} + data = await fetch_json(session, url, params) + return data if isinstance(data, list) else [] + +async def analyze_symbol(session, symbol, start_ms, end_ms): + """Fetch and analyze a single symbol""" + klines, ls_ratio, top_acct, top_pos, taker = await asyncio.gather( + fetch_klines(session, symbol, start_ms, end_ms), + fetch_ratio(session, f"{BASE}/futures/data/globalLongShortAccountRatio", symbol), + fetch_ratio(session, f"{BASE}/futures/data/topLongShortAccountRatio", symbol), + fetch_ratio(session, f"{BASE}/futures/data/topLongShortPositionRatio", symbol), + fetch_ratio(session, f"{BASE}/futures/data/takerlongshortRatio", symbol), + ) + + print(f"\n {symbol}: Klines={len(klines)}, L/S={len(ls_ratio)}, TopAcct={len(top_acct)}, TopPos={len(top_pos)}, Taker={len(taker)}") + + # Build DataFrame + df_k = pd.DataFrame(klines, columns=[ + "open_time","open","high","low","close","volume", + "close_time","quote_vol","trades","taker_buy_vol","taker_buy_quote_vol","ignore" + ]) + df_k["open_time"] = pd.to_datetime(df_k["open_time"], unit="ms") + for c in ["open","high","low","close","volume","taker_buy_vol","taker_buy_quote_vol","quote_vol"]: + df_k[c] = df_k[c].astype(float) + + df_k["kline_taker_buy_ratio"] = (df_k["taker_buy_vol"] / df_k["volume"]).replace([np.inf, -np.inf], np.nan) + df_k["next_return"] = df_k["close"].shift(-1) / df_k["close"] - 1 + df_k["next_4_return"] = df_k["close"].shift(-4) / df_k["close"] - 1 + df_k = df_k.set_index("open_time") + + def join_ratio(data, col_name): + if not data: + return + df = pd.DataFrame(data) + df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms") + if "buySellRatio" in df.columns: + df["buySellRatio"] = df["buySellRatio"].astype(float) + df["buyVol"] = df["buyVol"].astype(float) + df["sellVol"] = df["sellVol"].astype(float) + df = df.set_index("timestamp") + df_k.update(df_k.join(df[["buySellRatio","buyVol","sellVol"]], how="left")) + for c in ["buySellRatio","buyVol","sellVol"]: + if c not in df_k.columns: + df_k[c] = np.nan + joined = df_k.join(df[["buySellRatio","buyVol","sellVol"]], how="left", rsuffix="_new") + for c in ["buySellRatio","buyVol","sellVol"]: + if f"{c}_new" in joined.columns: + df_k[c] = joined[f"{c}_new"] + else: + df["longShortRatio"] = df["longShortRatio"].astype(float) + df = df.set_index("timestamp").rename(columns={"longShortRatio": col_name}) + df_k[col_name] = df_k.join(df[[col_name]], how="left")[col_name] + + join_ratio(taker, "buySellRatio") + join_ratio(ls_ratio, "global_ls_ratio") + join_ratio(top_acct, "top_acct_ls_ratio") + join_ratio(top_pos, "top_pos_ls_ratio") + + return df_k + +def print_analysis(symbol, df_k): + """Print analysis results for a symbol""" + print("\n" + "="*70) + print(f"{symbol} {INTERVAL} Taker/Ratio → Price Correlation Analysis ({DAYS} days klines, ~5 days ratios)") + print("="*70) + + features = ["kline_taker_buy_ratio", "buySellRatio", "global_ls_ratio", + "top_acct_ls_ratio", "top_pos_ls_ratio"] + available = [f for f in features if f in df_k.columns and df_k[f].notna().sum() > 20] + + # 1. Correlation + print("\n[1] Pearson Correlation with Next-Candle Returns") + print("-"*55) + print(f"{'Feature':<25} {'next_15m':>12} {'next_1h':>12}") + print("-"*55) + for feat in available: + c1 = df_k[feat].corr(df_k["next_return"]) + c4 = df_k[feat].corr(df_k["next_4_return"]) + print(f"{feat:<25} {c1:>12.4f} {c4:>12.4f}") + + # 2. Quintile - Taker + print("\n[2] Taker Buy Ratio Quintile → Next Returns") + print("-"*60) + for ratio_col in ["kline_taker_buy_ratio", "buySellRatio"]: + if ratio_col not in available: + continue + valid = df_k[[ratio_col, "next_return", "next_4_return"]].dropna() + try: + valid["quintile"] = pd.qcut(valid[ratio_col], 5, labels=["Q1(sell)","Q2","Q3","Q4","Q5(buy)"]) + except ValueError: + continue + print(f"\n {ratio_col}:") + print(f" {'Quintile':<12} {'mean_ratio':>12} {'next_15m_bps':>14} {'next_1h_bps':>13} {'count':>7} {'win_rate':>10}") + for q in ["Q1(sell)","Q2","Q3","Q4","Q5(buy)"]: + grp = valid[valid["quintile"] == q] + if len(grp) == 0: + continue + mr = grp[ratio_col].mean() + r1 = grp["next_return"].mean() * 10000 + r4 = grp["next_4_return"].mean() * 10000 + wr = (grp["next_return"] > 0).mean() * 100 + print(f" {q:<12} {mr:>12.4f} {r1:>14.2f} {r4:>13.2f} {len(grp):>7} {wr:>9.1f}%") + + # 3. Extreme analysis + print("\n[3] Extreme Taker Buy Ratio Analysis (top/bottom 10%)") + print("-"*60) + for ratio_col in ["kline_taker_buy_ratio", "buySellRatio"]: + if ratio_col not in available: + continue + valid = df_k[[ratio_col, "next_return", "next_4_return"]].dropna() + p10 = valid[ratio_col].quantile(0.10) + p90 = valid[ratio_col].quantile(0.90) + bottom = valid[valid[ratio_col] <= p10] + top = valid[valid[ratio_col] >= p90] + mid = valid[(valid[ratio_col] > p10) & (valid[ratio_col] < p90)] + + print(f"\n {ratio_col}:") + print(f" {'Group':<18} {'mean_ratio':>12} {'next_15m_bps':>14} {'next_1h_bps':>13} {'win_rate':>10} {'count':>7}") + for name, grp in [("Bottom 10% (sell)", bottom), ("Middle 80%", mid), ("Top 10% (buy)", top)]: + if len(grp) == 0: + continue + mr = grp[ratio_col].mean() + r1 = grp["next_return"].mean() * 10000 + r4 = grp["next_4_return"].mean() * 10000 + wr = (grp["next_return"] > 0).mean() * 100 + print(f" {name:<18} {mr:>12.4f} {r1:>14.2f} {r4:>13.2f} {wr:>9.1f}% {len(grp):>7}") + + # 4. L/S ratio quintile + print("\n[4] Long/Short Ratio Quintile → Next Returns") + print("-"*60) + for ratio_col in ["global_ls_ratio", "top_acct_ls_ratio", "top_pos_ls_ratio"]: + if ratio_col not in available: + continue + valid = df_k[[ratio_col, "next_return", "next_4_return"]].dropna() + if len(valid) < 20: + continue + try: + valid["quintile"] = pd.qcut(valid[ratio_col], 5, labels=["Q1(short)","Q2","Q3","Q4","Q5(long)"], duplicates="drop") + except ValueError: + continue + print(f"\n {ratio_col}:") + print(f" {'Quintile':<12} {'mean_ratio':>12} {'next_15m_bps':>14} {'next_1h_bps':>13} {'win_rate':>10} {'count':>7}") + for q in valid["quintile"].cat.categories: + grp = valid[valid["quintile"] == q] + if len(grp) == 0: + continue + mr = grp[ratio_col].mean() + r1 = grp["next_return"].mean() * 10000 + r4 = grp["next_4_return"].mean() * 10000 + wr = (grp["next_return"] > 0).mean() * 100 + print(f" {q:<12} {mr:>12.4f} {r1:>14.2f} {r4:>13.2f} {wr:>9.1f}% {len(grp):>7}") + + # 5. Contrarian vs Momentum + print("\n[5] Contrarian vs Momentum Signal Test") + print("-"*60) + for ratio_col, label in [("kline_taker_buy_ratio", "Taker Buy Ratio"), + ("global_ls_ratio", "Global L/S Ratio"), + ("top_acct_ls_ratio", "Top Trader Acct Ratio"), + ("top_pos_ls_ratio", "Top Trader Pos Ratio")]: + if ratio_col not in available: + continue + valid = df_k[[ratio_col, "next_return", "next_4_return"]].dropna() + median = valid[ratio_col].median() + high = valid[valid[ratio_col] > median] + low = valid[valid[ratio_col] <= median] + h_wr = (high["next_return"] > 0).mean() * 100 + l_wr = (low["next_return"] > 0).mean() * 100 + h_r = high["next_return"].mean() * 10000 + l_r = low["next_return"].mean() * 10000 + signal = "Momentum" if h_r > l_r else "Contrarian" + print(f"\n {label}:") + print(f" Above median → next 15m: {h_r:+.2f} bps (win {h_wr:.1f}%)") + print(f" Below median → next 15m: {l_r:+.2f} bps (win {l_wr:.1f}%)") + print(f" → Signal type: {signal}") + + # 6. Stats + print("\n[6] Feature Statistics Summary") + print("-"*60) + for feat in available: + s = df_k[feat].dropna() + print(f" {feat}: mean={s.mean():.4f}, std={s.std():.4f}, min={s.min():.4f}, max={s.max():.4f}, n={len(s)}") + print(f"\n Total klines: {len(df_k)}") + print(f" Period: {df_k.index[0]} ~ {df_k.index[-1]}") + +async def main(): + end_dt = datetime.now(timezone.utc) + start_dt = end_dt - timedelta(days=DAYS) + start_ms = int(start_dt.timestamp() * 1000) + end_ms = int(end_dt.timestamp() * 1000) + + print(f"Fetching {DAYS} days of {INTERVAL} data for {', '.join(SYMBOLS)}...") + + async with aiohttp.ClientSession() as session: + results = await asyncio.gather( + *[analyze_symbol(session, sym, start_ms, end_ms) for sym in SYMBOLS] + ) + + for sym, df in zip(SYMBOLS, results): + print_analysis(sym, df) + + # Cross-symbol comparison + if len(SYMBOLS) > 1: + print("\n" + "="*70) + print("CROSS-SYMBOL COMPARISON SUMMARY") + print("="*70) + print(f"\n{'Symbol':<12} {'taker_buy→15m':>14} {'taker_buy→1h':>13} {'global_ls→1h':>13} {'top_acct→1h':>13} {'top_pos→1h':>12}") + print("-"*78) + for sym, df in zip(SYMBOLS, results): + tb = df["kline_taker_buy_ratio"].corr(df["next_return"]) if "kline_taker_buy_ratio" in df.columns else float('nan') + tb4 = df["kline_taker_buy_ratio"].corr(df["next_4_return"]) if "kline_taker_buy_ratio" in df.columns else float('nan') + gl = df["global_ls_ratio"].corr(df["next_4_return"]) if "global_ls_ratio" in df.columns and df["global_ls_ratio"].notna().sum() > 20 else float('nan') + ta = df["top_acct_ls_ratio"].corr(df["next_4_return"]) if "top_acct_ls_ratio" in df.columns and df["top_acct_ls_ratio"].notna().sum() > 20 else float('nan') + tp = df["top_pos_ls_ratio"].corr(df["next_4_return"]) if "top_pos_ls_ratio" in df.columns and df["top_pos_ls_ratio"].notna().sum() > 20 else float('nan') + print(f"{sym:<12} {tb:>14.4f} {tb4:>13.4f} {gl:>13.4f} {ta:>13.4f} {tp:>12.4f}") + +if __name__ == "__main__": + asyncio.run(main())