feat: add L/S ratio collector service for top_acct and global ratios

Collect top trader account L/S ratio and global L/S ratio every 15 minutes
for XRP, BTC, ETH (6 API calls/cycle) and persist to per-symbol parquet files.
Deployed as a separate Docker service reusing the bot image.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-22 22:20:30 +09:00
parent aa5c0afce6
commit e2b0454825
5 changed files with 425 additions and 2 deletions

121
scripts/collect_ls_ratio.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Long/Short Ratio 장기 수집 스크립트.
15분마다 cron 실행하여 Binance Trading Data API에서
top_acct_ls_ratio, global_ls_ratio를 data/{symbol}/ls_ratio_15m.parquet에 누적한다.
수집 대상:
- topLongShortAccountRatio × 3심볼 (XRPUSDT, BTCUSDT, ETHUSDT)
- globalLongShortAccountRatio × 3심볼 (XRPUSDT, BTCUSDT, ETHUSDT)
→ 총 API 호출 6회/15분 (rate limit 무관)
사용법:
python scripts/collect_ls_ratio.py
python scripts/collect_ls_ratio.py --symbols XRPUSDT BTCUSDT
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import argparse
import asyncio
from datetime import datetime, timezone
import aiohttp
import pandas as pd
BASE_URL = "https://fapi.binance.com"
DEFAULT_SYMBOLS = ["XRPUSDT", "BTCUSDT", "ETHUSDT"]
ENDPOINTS = {
"top_acct_ls_ratio": "/futures/data/topLongShortAccountRatio",
"global_ls_ratio": "/futures/data/globalLongShortAccountRatio",
}
async def fetch_latest(session: aiohttp.ClientSession, symbol: str) -> dict | None:
"""심볼 하나에 대해 두 ratio의 최신 1건씩 가져온다."""
row = {"timestamp": None, "symbol": symbol}
for col_name, endpoint in ENDPOINTS.items():
url = f"{BASE_URL}{endpoint}"
params = {"symbol": symbol, "period": "15m", "limit": 1}
try:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json()
if isinstance(data, list) and data:
row[col_name] = float(data[0]["longShortRatio"])
# 타임스탬프는 첫 번째 응답에서 설정
if row["timestamp"] is None:
row["timestamp"] = pd.Timestamp(
int(data[0]["timestamp"]), unit="ms", tz="UTC"
)
else:
print(f"[WARN] {symbol} {col_name}: unexpected response: {data}")
return None
except Exception as e:
print(f"[ERROR] {symbol} {col_name}: {e}")
return None
return row
async def collect(symbols: list[str]):
"""모든 심볼 데이터를 수집하고 parquet에 추가한다."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_latest(session, sym) for sym in symbols]
results = await asyncio.gather(*tasks)
now = datetime.now(timezone.utc)
collected = 0
for row in results:
if row is None:
continue
symbol = row["symbol"]
out_path = Path(f"data/{symbol.lower()}/ls_ratio_15m.parquet")
out_path.parent.mkdir(parents=True, exist_ok=True)
new_df = pd.DataFrame([{
"timestamp": row["timestamp"],
"top_acct_ls_ratio": row["top_acct_ls_ratio"],
"global_ls_ratio": row["global_ls_ratio"],
}])
if out_path.exists():
existing = pd.read_parquet(out_path)
# 중복 방지: 동일 timestamp가 이미 있으면 스킵
if row["timestamp"] in existing["timestamp"].values:
print(f"[SKIP] {symbol} ts={row['timestamp']} already exists")
continue
combined = pd.concat([existing, new_df], ignore_index=True)
else:
combined = new_df
combined.to_parquet(out_path, index=False)
collected += 1
print(
f"[{now.isoformat()}] {symbol}: "
f"top_acct={row['top_acct_ls_ratio']:.4f}, "
f"global={row['global_ls_ratio']:.4f} "
f"{out_path} ({len(combined)} rows)"
)
if collected == 0:
print(f"[{now.isoformat()}] No new data collected")
def main():
parser = argparse.ArgumentParser(description="L/S Ratio 장기 수집")
parser.add_argument(
"--symbols", nargs="+", default=DEFAULT_SYMBOLS,
help="수집 대상 심볼 (기본: XRPUSDT BTCUSDT ETHUSDT)",
)
args = parser.parse_args()
asyncio.run(collect(args.symbols))
if __name__ == "__main__":
main()