Files
cointrader/scripts/tune_hyperparams.py
21in7 6d82febab7 feat: implement Active Config pattern for automatic param promotion
- tune_hyperparams.py: 탐색 완료 후 Best AUC > Baseline AUC 이면
  models/active_lgbm_params.json 자동 갱신
- tune_hyperparams.py: 베이스라인을 active 파일 기준으로 측정
  (active 없으면 코드 내 기본값 사용)
- train_model.py: _load_lgbm_params()에 active 파일 자동 탐색 추가
  우선순위: --tuned-params > active_lgbm_params.json > 하드코딩 기본값
- models/active_lgbm_params.json: 현재 best 파라미터로 초기화
- .gitignore: tune_results_*.json 제외, active 파일은 git 추적 유지

Made-with: Cursor
2026-03-02 14:56:42 +09:00

453 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Optuna를 사용한 LightGBM 하이퍼파라미터 자동 탐색.
사용법:
python scripts/tune_hyperparams.py # 기본 (50 trials, 5폴드)
python scripts/tune_hyperparams.py --trials 10 --folds 3 # 빠른 테스트
python scripts/tune_hyperparams.py --data data/combined_15m.parquet --trials 100
python scripts/tune_hyperparams.py --no-baseline # 베이스라인 측정 건너뜀
결과:
- 콘솔: Best Params + Walk-Forward 리포트
- JSON: models/tune_results_YYYYMMDD_HHMMSS.json
"""
import sys
import warnings
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import argparse
import json
import time
from datetime import datetime
import numpy as np
import pandas as pd
import lightgbm as lgb
import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner
from sklearn.metrics import roc_auc_score
from src.ml_features import FEATURE_COLS
from src.dataset_builder import generate_dataset_vectorized
# ──────────────────────────────────────────────
# 데이터 로드 및 데이터셋 생성 (1회 캐싱)
# ──────────────────────────────────────────────
def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
parquet 로드 → 벡터화 데이터셋 생성 → (X, y, w) numpy 배열 반환.
study 시작 전 1회만 호출하여 모든 trial이 공유한다.
"""
print(f"데이터 로드: {data_path}")
df_raw = pd.read_parquet(data_path)
print(f"캔들 수: {len(df_raw):,}, 컬럼: {list(df_raw.columns)}")
base_cols = ["open", "high", "low", "close", "volume"]
btc_df = eth_df = None
if "close_btc" in df_raw.columns:
btc_df = df_raw[[c + "_btc" for c in base_cols]].copy()
btc_df.columns = base_cols
print("BTC 피처 활성화")
if "close_eth" in df_raw.columns:
eth_df = df_raw[[c + "_eth" for c in base_cols]].copy()
eth_df.columns = base_cols
print("ETH 피처 활성화")
df = df_raw[base_cols].copy()
print("\n데이터셋 생성 중 (1회만 실행)...")
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0)
if dataset.empty or "label" not in dataset.columns:
raise ValueError("데이터셋 생성 실패: 샘플 0개")
actual_feature_cols = [c for c in FEATURE_COLS if c in dataset.columns]
X = dataset[actual_feature_cols].values.astype(np.float32)
y = dataset["label"].values.astype(np.int8)
w = dataset["sample_weight"].values.astype(np.float32)
pos = int(y.sum())
neg = int((y == 0).sum())
print(f"데이터셋 완성: {len(dataset):,}개 샘플 (양성={pos}, 음성={neg})")
print(f"사용 피처: {len(actual_feature_cols)}\n")
return X, y, w
# ──────────────────────────────────────────────
# Walk-Forward 교차검증
# ──────────────────────────────────────────────
def _walk_forward_cv(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
params: dict,
n_splits: int,
train_ratio: float,
trial: "optuna.Trial | None" = None,
) -> tuple[float, list[float]]:
"""
Walk-Forward 교차검증으로 평균 AUC를 반환한다.
trial이 제공되면 각 폴드 후 Optuna에 중간 값을 보고하여 Pruning을 활성화한다.
"""
n = len(X)
step = max(1, int(n * (1 - train_ratio) / n_splits))
train_end_start = int(n * train_ratio)
fold_aucs: list[float] = []
for fold_idx in range(n_splits):
tr_end = train_end_start + fold_idx * step
val_end = tr_end + step
if val_end > n:
break
X_tr, y_tr, w_tr = X[:tr_end], y[:tr_end], w[:tr_end]
X_val, y_val = X[tr_end:val_end], y[tr_end:val_end]
# 클래스 불균형 처리: 언더샘플링 (시간 순서 유지)
pos_idx = np.where(y_tr == 1)[0]
neg_idx = np.where(y_tr == 0)[0]
if len(neg_idx) > len(pos_idx) and len(pos_idx) > 0:
rng = np.random.default_rng(42)
neg_idx = rng.choice(neg_idx, size=len(pos_idx), replace=False)
bal_idx = np.sort(np.concatenate([pos_idx, neg_idx]))
if len(bal_idx) < 20 or len(np.unique(y_val)) < 2:
fold_aucs.append(0.5)
continue
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model.fit(X_tr[bal_idx], y_tr[bal_idx], sample_weight=w_tr[bal_idx])
proba = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, proba) if len(np.unique(y_val)) > 1 else 0.5
fold_aucs.append(float(auc))
# Optuna Pruning: 중간 값 보고
if trial is not None:
trial.report(float(np.mean(fold_aucs)), step=fold_idx)
if trial.should_prune():
raise optuna.TrialPruned()
mean_auc = float(np.mean(fold_aucs)) if fold_aucs else 0.5
return mean_auc, fold_aucs
# ──────────────────────────────────────────────
# Optuna 목적 함수
# ──────────────────────────────────────────────
def make_objective(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
n_splits: int,
train_ratio: float,
):
"""클로저로 데이터셋을 캡처한 목적 함수를 반환한다."""
def objective(trial: optuna.Trial) -> float:
# ── 하이퍼파라미터 샘플링 ──
n_estimators = trial.suggest_int("n_estimators", 100, 600)
learning_rate = trial.suggest_float("learning_rate", 0.01, 0.2, log=True)
max_depth = trial.suggest_int("max_depth", 2, 7)
# 핵심 제약: num_leaves <= 2^max_depth - 1 (leaf-wise 과적합 방지)
# 360개 수준의 소규모 데이터셋에서 num_leaves가 크면 암기 발생
max_leaves_upper = min(31, 2 ** max_depth - 1)
num_leaves = trial.suggest_int("num_leaves", 7, max(7, max_leaves_upper))
min_child_samples = trial.suggest_int("min_child_samples", 10, 50)
subsample = trial.suggest_float("subsample", 0.5, 1.0)
colsample_bytree = trial.suggest_float("colsample_bytree", 0.5, 1.0)
reg_alpha = trial.suggest_float("reg_alpha", 1e-4, 1.0, log=True)
reg_lambda = trial.suggest_float("reg_lambda", 1e-4, 1.0, log=True)
# weight_scale: 데이터셋을 1회 캐싱하는 구조이므로
# time_weight_decay 효과를 sample_weight 스케일로 근사한다.
weight_scale = trial.suggest_float("weight_scale", 0.5, 2.0)
w_scaled = (w * weight_scale).astype(np.float32)
params = {
"n_estimators": n_estimators,
"learning_rate": learning_rate,
"max_depth": max_depth,
"num_leaves": num_leaves,
"min_child_samples": min_child_samples,
"subsample": subsample,
"colsample_bytree": colsample_bytree,
"reg_alpha": reg_alpha,
"reg_lambda": reg_lambda,
}
mean_auc, fold_aucs = _walk_forward_cv(
X, y, w_scaled, params,
n_splits=n_splits,
train_ratio=train_ratio,
trial=trial,
)
# 폴드별 AUC를 user_attrs에 저장 (결과 리포트용)
trial.set_user_attr("fold_aucs", fold_aucs)
return mean_auc
return objective
# ──────────────────────────────────────────────
# 베이스라인 AUC 측정 (현재 고정 파라미터)
# ──────────────────────────────────────────────
def measure_baseline(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
n_splits: int,
train_ratio: float,
) -> tuple[float, list[float]]:
"""현재 실전 파라미터(active 파일 또는 하드코딩 기본값)로 베이스라인 AUC를 측정한다."""
active_path = Path("models/active_lgbm_params.json")
if active_path.exists():
with open(active_path, "r", encoding="utf-8") as f:
tune_data = json.load(f)
best_params = dict(tune_data["best_trial"]["params"])
best_params.pop("weight_scale", None)
baseline_params = best_params
print(f"베이스라인 측정 중 (active 파일: {active_path})...")
else:
baseline_params = {
"n_estimators": 434,
"learning_rate": 0.123659,
"max_depth": 6,
"num_leaves": 14,
"min_child_samples": 10,
"subsample": 0.929062,
"colsample_bytree": 0.946330,
"reg_alpha": 0.573971,
"reg_lambda": 0.000157,
}
print("베이스라인 측정 중 (active 파일 없음 → 코드 내 기본 파라미터)...")
return _walk_forward_cv(X, y, w, baseline_params, n_splits=n_splits, train_ratio=train_ratio)
# ──────────────────────────────────────────────
# 결과 출력 및 저장
# ──────────────────────────────────────────────
def print_report(
study: optuna.Study,
baseline_auc: float,
baseline_folds: list[float],
elapsed_sec: float,
output_path: Path,
) -> None:
"""콘솔에 최종 리포트를 출력한다."""
best = study.best_trial
best_auc = best.value
best_folds = best.user_attrs.get("fold_aucs", [])
improvement = best_auc - baseline_auc
improvement_pct = (improvement / baseline_auc * 100) if baseline_auc > 0 else 0.0
elapsed_min = int(elapsed_sec // 60)
elapsed_s = int(elapsed_sec % 60)
sep = "=" * 64
dash = "-" * 64
completed = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
pruned = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
print(f"\n{sep}")
print(f" Optuna 튜닝 완료 | {len(study.trials)} trials "
f"(완료={len(completed)}, 조기종료={len(pruned)}) | "
f"소요: {elapsed_min}{elapsed_s}")
print(sep)
print(f" Best AUC : {best_auc:.4f} (Trial #{best.number})")
if baseline_auc > 0:
sign = "+" if improvement >= 0 else ""
print(f" Baseline : {baseline_auc:.4f} (현재 train_model.py 고정값)")
print(f" 개선폭 : {sign}{improvement:.4f} ({sign}{improvement_pct:.1f}%)")
print(dash)
print(" Best Parameters:")
for k, v in best.params.items():
if isinstance(v, float):
print(f" {k:<22}: {v:.6f}")
else:
print(f" {k:<22}: {v}")
print(dash)
print(" Walk-Forward 폴드별 AUC (Best Trial):")
for i, auc in enumerate(best_folds, 1):
print(f" 폴드 {i}: {auc:.4f}")
if best_folds:
arr = np.array(best_folds)
print(f" 평균: {arr.mean():.4f} ± {arr.std():.4f}")
if baseline_folds:
print(dash)
print(" Baseline 폴드별 AUC:")
for i, auc in enumerate(baseline_folds, 1):
print(f" 폴드 {i}: {auc:.4f}")
arr = np.array(baseline_folds)
print(f" 평균: {arr.mean():.4f} ± {arr.std():.4f}")
print(dash)
print(f" 결과 저장: {output_path}")
print(f" 다음 단계: python scripts/train_model.py (파라미터 수동 반영 후)")
print(sep)
def save_results(
study: optuna.Study,
baseline_auc: float,
baseline_folds: list[float],
elapsed_sec: float,
data_path: str,
) -> Path:
"""결과를 JSON 파일로 저장하고 경로를 반환한다."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = Path(f"models/tune_results_{timestamp}.json")
output_path.parent.mkdir(exist_ok=True)
best = study.best_trial
all_trials = []
for t in study.trials:
if t.state == optuna.trial.TrialState.COMPLETE:
all_trials.append({
"number": t.number,
"auc": round(t.value, 6),
"fold_aucs": [round(a, 6) for a in t.user_attrs.get("fold_aucs", [])],
"params": {
k: (round(v, 6) if isinstance(v, float) else v)
for k, v in t.params.items()
},
})
result = {
"timestamp": datetime.now().isoformat(),
"data_path": data_path,
"n_trials_total": len(study.trials),
"n_trials_complete": len(all_trials),
"elapsed_sec": round(elapsed_sec, 1),
"baseline": {
"auc": round(baseline_auc, 6),
"fold_aucs": [round(a, 6) for a in baseline_folds],
},
"best_trial": {
"number": best.number,
"auc": round(best.value, 6),
"fold_aucs": [round(a, 6) for a in best.user_attrs.get("fold_aucs", [])],
"params": {
k: (round(v, 6) if isinstance(v, float) else v)
for k, v in best.params.items()
},
},
"all_trials": all_trials,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
return output_path
# ──────────────────────────────────────────────
# 메인
# ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Optuna LightGBM 하이퍼파라미터 튜닝")
parser.add_argument("--data", default="data/combined_15m.parquet", help="학습 데이터 경로")
parser.add_argument("--trials", type=int, default=50, help="Optuna trial 수 (기본: 50)")
parser.add_argument("--folds", type=int, default=5, help="Walk-Forward 폴드 수 (기본: 5)")
parser.add_argument("--train-ratio", type=float, default=0.6, help="학습 구간 비율 (기본: 0.6)")
parser.add_argument("--no-baseline", action="store_true", help="베이스라인 측정 건너뜀")
args = parser.parse_args()
# 1. 데이터셋 로드 (1회)
X, y, w = load_dataset(args.data)
# 2. 베이스라인 측정
if args.no_baseline:
baseline_auc, baseline_folds = 0.0, []
print("베이스라인 측정 건너뜀 (--no-baseline)\n")
else:
baseline_auc, baseline_folds = measure_baseline(X, y, w, args.folds, args.train_ratio)
print(
f"베이스라인 AUC: {baseline_auc:.4f} "
f"(폴드별: {[round(a, 4) for a in baseline_folds]})\n"
)
# 3. Optuna study 실행
optuna.logging.set_verbosity(optuna.logging.WARNING)
sampler = TPESampler(seed=42)
pruner = MedianPruner(n_startup_trials=5, n_warmup_steps=2)
study = optuna.create_study(
direction="maximize",
sampler=sampler,
pruner=pruner,
study_name="lgbm_wf_auc",
)
objective = make_objective(X, y, w, n_splits=args.folds, train_ratio=args.train_ratio)
print(f"Optuna 탐색 시작: {args.trials} trials, {args.folds}폴드 Walk-Forward")
print("(trial 완료마다 진행 상황 출력)\n")
start_time = time.time()
def _progress_callback(study: optuna.Study, trial: optuna.trial.FrozenTrial) -> None:
if trial.state == optuna.trial.TrialState.COMPLETE:
best_so_far = study.best_value
leaves = trial.params.get("num_leaves", "?")
depth = trial.params.get("max_depth", "?")
print(
f" Trial #{trial.number:3d} | AUC={trial.value:.4f} "
f"| Best={best_so_far:.4f} "
f"| leaves={leaves} depth={depth}"
)
elif trial.state == optuna.trial.TrialState.PRUNED:
print(f" Trial #{trial.number:3d} | PRUNED (조기 종료)")
study.optimize(
objective,
n_trials=args.trials,
callbacks=[_progress_callback],
show_progress_bar=False,
)
elapsed = time.time() - start_time
# 4. 결과 저장 및 출력
output_path = save_results(study, baseline_auc, baseline_folds, elapsed, args.data)
print_report(study, baseline_auc, baseline_folds, elapsed, output_path)
# 5. 성능 개선 시 active 파일 자동 갱신
import shutil
active_path = Path("models/active_lgbm_params.json")
if not args.no_baseline and study.best_value > baseline_auc:
shutil.copy(output_path, active_path)
improvement = study.best_value - baseline_auc
print(f"[MLOps] AUC +{improvement:.4f} 개선 → {active_path} 자동 갱신 완료")
print(f"[MLOps] 다음 train_model.py 실행 시 새 파라미터가 자동 적용됩니다.\n")
elif args.no_baseline:
print("[MLOps] --no-baseline 모드: 성능 비교 없이 active 파일 유지\n")
else:
print(f"[MLOps] 성능 개선 없음 (Best={study.best_value:.4f} ≤ Baseline={baseline_auc:.4f}) → active 파일 유지\n")
if __name__ == "__main__":
main()