feat: add upsert_parquet to accumulate OI/funding data incrementally
바이낸스 OI 히스토리 API가 최근 30일치만 제공하는 제약을 우회하기 위해 upsert_parquet() 함수를 추가. 매일 실행 시 기존 parquet의 oi_change/funding_rate가 0.0인 구간만 신규 값으로 덮어써 점진적으로 과거 데이터를 채워나감. --no-upsert 플래그로 기존 덮어쓰기 동작 유지 가능. Made-with: Cursor
This commit is contained in:
@@ -259,6 +259,56 @@ async def _fetch_oi_and_funding(
|
|||||||
return _merge_oi_funding(candles, oi_df, funding_df)
|
return _merge_oi_funding(candles, oi_df, funding_df)
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_parquet(path: "Path | str", new_df: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
기존 parquet 파일에 신규 데이터를 Upsert(병합)한다.
|
||||||
|
|
||||||
|
규칙:
|
||||||
|
- 기존 행의 oi_change / funding_rate가 0.0이면 신규 값으로 덮어씀
|
||||||
|
- 기존 행의 oi_change / funding_rate가 이미 0이 아니면 유지
|
||||||
|
- 신규 타임스탬프 행은 그냥 추가
|
||||||
|
- 결과는 timestamp 기준 오름차순 정렬, 중복 제거
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: 기존 parquet 경로 (없으면 new_df 그대로 반환)
|
||||||
|
new_df: 새로 수집한 DataFrame (timestamp index)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
병합된 DataFrame
|
||||||
|
"""
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
return new_df.sort_index()
|
||||||
|
|
||||||
|
existing = pd.read_parquet(path)
|
||||||
|
|
||||||
|
# timestamp index 통일 (tz-aware UTC)
|
||||||
|
if existing.index.tz is None:
|
||||||
|
existing.index = existing.index.tz_localize("UTC")
|
||||||
|
if new_df.index.tz is None:
|
||||||
|
new_df.index = new_df.index.tz_localize("UTC")
|
||||||
|
|
||||||
|
# 기존 데이터에서 oi_change / funding_rate가 0.0인 행만 신규 값으로 업데이트
|
||||||
|
UPSERT_COLS = ["oi_change", "funding_rate"]
|
||||||
|
overlap_idx = existing.index.intersection(new_df.index)
|
||||||
|
|
||||||
|
for col in UPSERT_COLS:
|
||||||
|
if col not in existing.columns or col not in new_df.columns:
|
||||||
|
continue
|
||||||
|
# 겹치는 행 중 기존 값이 0.0인 경우에만 신규 값으로 교체
|
||||||
|
zero_mask = existing.loc[overlap_idx, col] == 0.0
|
||||||
|
update_idx = overlap_idx[zero_mask]
|
||||||
|
if len(update_idx) > 0:
|
||||||
|
existing.loc[update_idx, col] = new_df.loc[update_idx, col]
|
||||||
|
|
||||||
|
# 신규 타임스탬프 행 추가 (기존에 없는 것만)
|
||||||
|
new_only_idx = new_df.index.difference(existing.index)
|
||||||
|
if len(new_only_idx) > 0:
|
||||||
|
existing = pd.concat([existing, new_df.loc[new_only_idx]])
|
||||||
|
|
||||||
|
return existing.sort_index()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="바이낸스 선물 과거 캔들 수집. 단일 심볼 또는 멀티 심볼 병합 저장."
|
description="바이낸스 선물 과거 캔들 수집. 단일 심볼 또는 멀티 심볼 병합 저장."
|
||||||
@@ -272,6 +322,10 @@ def main():
|
|||||||
"--no-oi", action="store_true",
|
"--no-oi", action="store_true",
|
||||||
help="OI/펀딩비 수집을 건너뜀 (캔들 데이터만 저장)",
|
help="OI/펀딩비 수집을 건너뜀 (캔들 데이터만 저장)",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-upsert", action="store_true",
|
||||||
|
help="기존 parquet을 Upsert하지 않고 새로 덮어씀 (기본: Upsert 활성화)",
|
||||||
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# 하위 호환: --symbol 단독 사용 시 symbols로 통합
|
# 하위 호환: --symbol 단독 사용 시 symbols로 통합
|
||||||
@@ -283,8 +337,10 @@ def main():
|
|||||||
if not args.no_oi:
|
if not args.no_oi:
|
||||||
print(f"\n[OI/펀딩비] {args.symbols[0]} 수집 중...")
|
print(f"\n[OI/펀딩비] {args.symbols[0]} 수집 중...")
|
||||||
df = asyncio.run(_fetch_oi_and_funding(args.symbols[0], args.days, df))
|
df = asyncio.run(_fetch_oi_and_funding(args.symbols[0], args.days, df))
|
||||||
|
if not args.no_upsert:
|
||||||
|
df = upsert_parquet(args.output, df)
|
||||||
df.to_parquet(args.output)
|
df.to_parquet(args.output)
|
||||||
print(f"저장 완료: {args.output} ({len(df):,}행, {len(df.columns)}컬럼)")
|
print(f"{'Upsert' if not args.no_upsert else '저장'} 완료: {args.output} ({len(df):,}행, {len(df.columns)}컬럼)")
|
||||||
else:
|
else:
|
||||||
# 멀티 심볼: 단일 클라이언트로 순차 수집 후 타임스탬프 기준 inner join 병합
|
# 멀티 심볼: 단일 클라이언트로 순차 수집 후 타임스탬프 기준 inner join 병합
|
||||||
dfs = asyncio.run(fetch_klines_all(args.symbols, args.interval, args.days))
|
dfs = asyncio.run(fetch_klines_all(args.symbols, args.interval, args.days))
|
||||||
@@ -304,8 +360,10 @@ def main():
|
|||||||
merged = asyncio.run(_fetch_oi_and_funding(primary, args.days, merged))
|
merged = asyncio.run(_fetch_oi_and_funding(primary, args.days, merged))
|
||||||
|
|
||||||
output = args.output.replace("xrpusdt", "combined")
|
output = args.output.replace("xrpusdt", "combined")
|
||||||
|
if not args.no_upsert:
|
||||||
|
merged = upsert_parquet(output, merged)
|
||||||
merged.to_parquet(output)
|
merged.to_parquet(output)
|
||||||
print(f"\n병합 저장 완료: {output} ({len(merged):,}행, {len(merged.columns)}컬럼)")
|
print(f"\n{'Upsert' if not args.no_upsert else '병합 저장'} 완료: {output} ({len(merged):,}행, {len(merged.columns)}컬럼)")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
131
tests/test_fetch_history.py
Normal file
131
tests/test_fetch_history.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
"""fetch_history.py의 upsert_parquet() 함수 테스트."""
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def _make_parquet(tmp_path: Path, rows: dict) -> Path:
|
||||||
|
"""테스트용 parquet 파일 생성 헬퍼."""
|
||||||
|
df = pd.DataFrame(rows)
|
||||||
|
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
|
||||||
|
df = df.set_index("timestamp")
|
||||||
|
path = tmp_path / "test.parquet"
|
||||||
|
df.to_parquet(path)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_fills_zero_oi_with_real_value(tmp_path):
|
||||||
|
"""기존 행의 oi_change=0.0이 신규 데이터의 실제 값으로 덮어써진다."""
|
||||||
|
from scripts.fetch_history import upsert_parquet
|
||||||
|
|
||||||
|
existing_path = _make_parquet(tmp_path, {
|
||||||
|
"timestamp": ["2026-01-01 00:00", "2026-01-01 00:15"],
|
||||||
|
"close": [1.0, 1.1],
|
||||||
|
"oi_change": [0.0, 0.0],
|
||||||
|
"funding_rate": [0.0, 0.0],
|
||||||
|
})
|
||||||
|
|
||||||
|
new_df = pd.DataFrame({
|
||||||
|
"close": [1.0, 1.1],
|
||||||
|
"oi_change": [0.05, 0.03],
|
||||||
|
"funding_rate": [0.0001, 0.0001],
|
||||||
|
}, index=pd.to_datetime(["2026-01-01 00:00", "2026-01-01 00:15"], utc=True))
|
||||||
|
new_df.index.name = "timestamp"
|
||||||
|
|
||||||
|
result = upsert_parquet(existing_path, new_df)
|
||||||
|
|
||||||
|
assert result.loc["2026-01-01 00:00+00:00", "oi_change"] == pytest.approx(0.05)
|
||||||
|
assert result.loc["2026-01-01 00:15+00:00", "oi_change"] == pytest.approx(0.03)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_appends_new_rows(tmp_path):
|
||||||
|
"""신규 타임스탬프 행이 기존 데이터 아래에 추가된다."""
|
||||||
|
from scripts.fetch_history import upsert_parquet
|
||||||
|
|
||||||
|
existing_path = _make_parquet(tmp_path, {
|
||||||
|
"timestamp": ["2026-01-01 00:00"],
|
||||||
|
"close": [1.0],
|
||||||
|
"oi_change": [0.05],
|
||||||
|
"funding_rate": [0.0001],
|
||||||
|
})
|
||||||
|
|
||||||
|
new_df = pd.DataFrame({
|
||||||
|
"close": [1.1],
|
||||||
|
"oi_change": [0.03],
|
||||||
|
"funding_rate": [0.0002],
|
||||||
|
}, index=pd.to_datetime(["2026-01-01 00:15"], utc=True))
|
||||||
|
new_df.index.name = "timestamp"
|
||||||
|
|
||||||
|
result = upsert_parquet(existing_path, new_df)
|
||||||
|
|
||||||
|
assert len(result) == 2
|
||||||
|
assert pd.Timestamp("2026-01-01 00:15", tz="UTC") in result.index
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_keeps_nonzero_existing_oi(tmp_path):
|
||||||
|
"""기존 행의 oi_change가 이미 0이 아니면 덮어쓰지 않는다."""
|
||||||
|
from scripts.fetch_history import upsert_parquet
|
||||||
|
|
||||||
|
existing_path = _make_parquet(tmp_path, {
|
||||||
|
"timestamp": ["2026-01-01 00:00"],
|
||||||
|
"close": [1.0],
|
||||||
|
"oi_change": [0.07], # 이미 실제 값 존재
|
||||||
|
"funding_rate": [0.0003],
|
||||||
|
})
|
||||||
|
|
||||||
|
new_df = pd.DataFrame({
|
||||||
|
"close": [1.0],
|
||||||
|
"oi_change": [0.05], # 다른 값으로 덮어쓰려 해도
|
||||||
|
"funding_rate": [0.0001],
|
||||||
|
}, index=pd.to_datetime(["2026-01-01 00:00"], utc=True))
|
||||||
|
new_df.index.name = "timestamp"
|
||||||
|
|
||||||
|
result = upsert_parquet(existing_path, new_df)
|
||||||
|
|
||||||
|
# 기존 값(0.07)이 유지되어야 한다
|
||||||
|
assert result.iloc[0]["oi_change"] == pytest.approx(0.07)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_no_existing_file_returns_new_df(tmp_path):
|
||||||
|
"""기존 parquet 파일이 없으면 신규 데이터를 그대로 반환한다."""
|
||||||
|
from scripts.fetch_history import upsert_parquet
|
||||||
|
|
||||||
|
nonexistent_path = tmp_path / "nonexistent.parquet"
|
||||||
|
new_df = pd.DataFrame({
|
||||||
|
"close": [1.0, 1.1],
|
||||||
|
"oi_change": [0.05, 0.03],
|
||||||
|
"funding_rate": [0.0001, 0.0001],
|
||||||
|
}, index=pd.to_datetime(["2026-01-01 00:00", "2026-01-01 00:15"], utc=True))
|
||||||
|
new_df.index.name = "timestamp"
|
||||||
|
|
||||||
|
result = upsert_parquet(nonexistent_path, new_df)
|
||||||
|
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result.iloc[0]["oi_change"] == pytest.approx(0.05)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upsert_result_is_sorted_by_timestamp(tmp_path):
|
||||||
|
"""결과 DataFrame이 timestamp 기준 오름차순 정렬되어 있다."""
|
||||||
|
from scripts.fetch_history import upsert_parquet
|
||||||
|
|
||||||
|
existing_path = _make_parquet(tmp_path, {
|
||||||
|
"timestamp": ["2026-01-01 00:15"],
|
||||||
|
"close": [1.1],
|
||||||
|
"oi_change": [0.0],
|
||||||
|
"funding_rate": [0.0],
|
||||||
|
})
|
||||||
|
|
||||||
|
new_df = pd.DataFrame({
|
||||||
|
"close": [1.0, 1.1, 1.2],
|
||||||
|
"oi_change": [0.05, 0.03, 0.02],
|
||||||
|
"funding_rate": [0.0001, 0.0001, 0.0002],
|
||||||
|
}, index=pd.to_datetime(
|
||||||
|
["2026-01-01 00:00", "2026-01-01 00:15", "2026-01-01 00:30"], utc=True
|
||||||
|
))
|
||||||
|
new_df.index.name = "timestamp"
|
||||||
|
|
||||||
|
result = upsert_parquet(existing_path, new_df)
|
||||||
|
|
||||||
|
assert result.index.is_monotonic_increasing
|
||||||
|
assert len(result) == 3
|
||||||
Reference in New Issue
Block a user