feat: strategy parameter sweep and production param optimization

- Add independent backtest engine (backtester.py) with walk-forward support
- Add backtest sanity check validator (backtest_validator.py)
- Add CLI tools: run_backtest.py, strategy_sweep.py (with --combined mode)
- Fix train-serve skew: unify feature z-score normalization (ml_features.py)
- Add strategy params (SL/TP ATR mult, ADX filter, volume multiplier) to
  config.py, indicators.py, dataset_builder.py, bot.py, backtester.py
- Fix WalkForwardBacktester not propagating strategy params to test folds
- Update production defaults: SL=2.0x, TP=2.0x, ADX=25, Vol=2.5
  (3-symbol combined PF: 0.71 → 1.24, MDD: 65.9% → 17.1%)
- Retrain ML models with new strategy parameters

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
21in7
2026-03-06 23:39:43 +09:00
parent 15fb9c158a
commit 02e41881ac
20 changed files with 2153 additions and 33 deletions

837
src/backtester.py Normal file
View File

@@ -0,0 +1,837 @@
"""
독립 백테스트 엔진.
봇 코드(src/bot.py)를 수정하지 않고, 기존 모듈을 재활용하여
풀 파이프라인(지표 → 시그널 → ML 필터 → 진입/청산)을 동기 루프로 시뮬레이션한다.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
from loguru import logger
import warnings
import joblib
import lightgbm as lgb
from src.dataset_builder import (
_calc_indicators, _calc_signals, _calc_features_vectorized,
generate_dataset_vectorized, stratified_undersample,
)
from src.ml_features import FEATURE_COLS
from src.ml_filter import MLFilter
# ── 설정 ─────────────────────────────────────────────────────────────
@dataclass
class BacktestConfig:
symbols: list[str] = field(default_factory=lambda: ["XRPUSDT"])
start: str | None = None
end: str | None = None
initial_balance: float = 1000.0
leverage: int = 10
fee_pct: float = 0.04 # taker 수수료 (%)
slippage_pct: float = 0.01 # 슬리피지 (%)
use_ml: bool = True
ml_threshold: float = 0.55
# 리스크
max_daily_loss_pct: float = 0.05
max_positions: int = 3
max_same_direction: int = 2
# 증거금
margin_max_ratio: float = 0.50
margin_min_ratio: float = 0.20
margin_decay_rate: float = 0.0006
# SL/TP ATR 배수
atr_sl_mult: float = 2.0
atr_tp_mult: float = 2.0
min_notional: float = 5.0
# 전략 파라미터
signal_threshold: int = 3
adx_threshold: float = 25.0
volume_multiplier: float = 2.5
WARMUP = 60 # 지표 안정화에 필요한 캔들 수
# ── 포지션 상태 ──────────────────────────────────────────────────────
@dataclass
class Position:
symbol: str
side: str # "LONG" | "SHORT"
entry_price: float
quantity: float
sl: float
tp: float
entry_time: pd.Timestamp
entry_fee: float
entry_indicators: dict = field(default_factory=dict)
ml_proba: float | None = None
# ── 동기 RiskManager ─────────────────────────────────────────────────
class BacktestRiskManager:
def __init__(self, cfg: BacktestConfig):
self.cfg = cfg
self.daily_pnl: float = 0.0
self.initial_balance: float = cfg.initial_balance
self.base_balance: float = cfg.initial_balance
self.open_positions: dict[str, str] = {} # {symbol: side}
self._current_date: str | None = None
def new_day(self, date_str: str):
if self._current_date != date_str:
self._current_date = date_str
self.daily_pnl = 0.0
def is_trading_allowed(self) -> bool:
if self.initial_balance <= 0:
return True
if self.daily_pnl < 0 and abs(self.daily_pnl) / self.initial_balance >= self.cfg.max_daily_loss_pct:
return False
return True
def can_open(self, symbol: str, side: str) -> bool:
if len(self.open_positions) >= self.cfg.max_positions:
return False
if symbol in self.open_positions:
return False
same_dir = sum(1 for s in self.open_positions.values() if s == side)
if same_dir >= self.cfg.max_same_direction:
return False
return True
def register(self, symbol: str, side: str):
self.open_positions[symbol] = side
def close(self, symbol: str, pnl: float):
self.open_positions.pop(symbol, None)
self.daily_pnl += pnl
def get_dynamic_margin_ratio(self, balance: float) -> float:
ratio = self.cfg.margin_max_ratio - (
(balance - self.base_balance) * self.cfg.margin_decay_rate
)
return max(self.cfg.margin_min_ratio, min(self.cfg.margin_max_ratio, ratio))
# ── 유틸 ─────────────────────────────────────────────────────────────
def _apply_slippage(price: float, side: str, slippage_pct: float) -> float:
"""시장가 주문의 슬리피지 적용. BUY는 불리하게(+), SELL은 불리하게(-)."""
factor = slippage_pct / 100.0
if side == "BUY":
return price * (1 + factor)
return price * (1 - factor)
def _calc_fee(price: float, quantity: float, fee_pct: float) -> float:
return price * quantity * fee_pct / 100.0
def _load_data(symbol: str, start: str | None, end: str | None) -> pd.DataFrame:
path = Path(f"data/{symbol.lower()}/combined_15m.parquet")
if not path.exists():
raise FileNotFoundError(f"데이터 파일 없음: {path}")
df = pd.read_parquet(path)
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.set_index("timestamp").sort_index()
elif not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index()
# tz-aware → tz-naive 통일 (UTC 기준)
if df.index.tz is not None:
df.index = df.index.tz_localize(None)
if start:
df = df[df.index >= pd.Timestamp(start)]
if end:
df = df[df.index <= pd.Timestamp(end)]
return df
def _get_ml_proba(ml_filter: MLFilter | None, features: pd.Series) -> float | None:
"""ML 확률을 반환. 모델이 없거나 비활성이면 None."""
if ml_filter is None or not ml_filter.is_model_loaded():
return None
try:
if ml_filter._onnx_session is not None:
input_name = ml_filter._onnx_session.get_inputs()[0].name
X = features[FEATURE_COLS].values.astype(np.float32).reshape(1, -1)
return float(ml_filter._onnx_session.run(None, {input_name: X})[0][0])
else:
X = features.to_frame().T
return float(ml_filter._lgbm_model.predict_proba(X)[0][1])
except Exception:
return None
# ── 메인 엔진 ────────────────────────────────────────────────────────
class Backtester:
def __init__(self, cfg: BacktestConfig):
self.cfg = cfg
self.risk = BacktestRiskManager(cfg)
self.balance = cfg.initial_balance
self.positions: dict[str, Position] = {} # {symbol: Position}
self.trades: list[dict] = []
self.equity_curve: list[dict] = []
self._peak_equity: float = cfg.initial_balance
# ML 필터 (심볼별)
self.ml_filters: dict[str, MLFilter | None] = {}
if cfg.use_ml:
for sym in cfg.symbols:
sym_dir = Path(f"models/{sym.lower()}")
onnx = str(sym_dir / "mlx_filter.weights.onnx")
lgbm = str(sym_dir / "lgbm_filter.pkl")
if not sym_dir.exists():
onnx = "models/mlx_filter.weights.onnx"
lgbm = "models/lgbm_filter.pkl"
mf = MLFilter(onnx_path=onnx, lgbm_path=lgbm, threshold=cfg.ml_threshold)
self.ml_filters[sym] = mf if mf.is_model_loaded() else None
else:
for sym in cfg.symbols:
self.ml_filters[sym] = None
def run(self, ml_models: dict[str, object] | None = None) -> dict:
"""백테스트 실행. 결과 dict(config, summary, trades, validation) 반환.
ml_models: walk-forward에서 심볼별 사전 학습 모델을 전달할 때 사용.
{symbol: lgbm_model} 형태. None이면 기존 파일 기반 MLFilter 사용.
"""
# 데이터 로드
all_data: dict[str, pd.DataFrame] = {}
all_indicators: dict[str, pd.DataFrame] = {}
all_signals: dict[str, np.ndarray] = {}
all_features: dict[str, pd.DataFrame] = {}
# BTC/ETH 상관 데이터 (있으면 로드)
btc_df = self._try_load_corr("BTCUSDT")
eth_df = self._try_load_corr("ETHUSDT")
for sym in self.cfg.symbols:
df = _load_data(sym, self.cfg.start, self.cfg.end)
all_data[sym] = df
df_ind = _calc_indicators(df)
all_indicators[sym] = df_ind
sig_arr = _calc_signals(
df_ind,
signal_threshold=self.cfg.signal_threshold,
adx_threshold=self.cfg.adx_threshold,
volume_multiplier=self.cfg.volume_multiplier,
)
all_signals[sym] = sig_arr
# 벡터화 피처 미리 계산 (학습과 동일한 z-score 적용)
all_features[sym] = _calc_features_vectorized(
df_ind, sig_arr, btc_df=btc_df, eth_df=eth_df,
)
logger.info(f"[{sym}] 데이터 로드: {len(df):,}캔들 ({df.index[0]} ~ {df.index[-1]})")
# walk-forward 모델 주입
if ml_models is not None:
self.ml_filters = {}
for sym in self.cfg.symbols:
if sym in ml_models and ml_models[sym] is not None:
mf = MLFilter.__new__(MLFilter)
mf._disabled = False
mf._onnx_session = None
mf._lgbm_model = ml_models[sym]
mf._threshold = self.cfg.ml_threshold
mf._onnx_path = Path("/dev/null")
mf._lgbm_path = Path("/dev/null")
mf._loaded_onnx_mtime = 0.0
mf._loaded_lgbm_mtime = 0.0
self.ml_filters[sym] = mf
else:
self.ml_filters[sym] = None
# 멀티심볼: 타임스탬프 기준 통합 이벤트 생성
events = self._build_events(all_indicators, all_signals)
logger.info(f"총 이벤트: {len(events):,}")
# 메인 루프
for ts, sym, candle_idx in events:
date_str = str(ts.date())
self.risk.new_day(date_str)
df_ind = all_indicators[sym]
signal = all_signals[sym][candle_idx]
row = df_ind.iloc[candle_idx]
# 에퀴티 기록
self._record_equity(ts)
# 1) 일일 손실 체크
if not self.risk.is_trading_allowed():
continue
# 2) SL/TP 체크 (보유 포지션)
if sym in self.positions:
closed = self._check_sl_tp(sym, row, ts)
if closed:
continue
# 3) 반대 시그널 재진입
if sym in self.positions and signal != "HOLD":
pos = self.positions[sym]
if (pos.side == "LONG" and signal == "SHORT") or \
(pos.side == "SHORT" and signal == "LONG"):
self._close_position(sym, row["close"], ts, "REVERSE_SIGNAL")
# 새 방향으로 재진입 시도
if self.risk.can_open(sym, signal):
self._try_enter(
sym, signal, df_ind, candle_idx,
all_features[sym], ts=ts,
)
continue
# 4) 신규 진입
if sym not in self.positions and signal != "HOLD":
if self.risk.can_open(sym, signal):
self._try_enter(
sym, signal, df_ind, candle_idx,
all_features[sym], ts=ts,
)
# 미청산 포지션 강제 청산
for sym in list(self.positions.keys()):
last_df = all_indicators[sym]
last_price = last_df["close"].iloc[-1]
last_ts = last_df.index[-1]
self._close_position(sym, last_price, last_ts, "END_OF_DATA")
return self._build_result()
def _try_load_corr(self, symbol: str) -> pd.DataFrame | None:
path = Path(f"data/{symbol.lower()}/combined_15m.parquet")
if not path.exists():
alt = Path(f"data/combined_15m.parquet")
if not alt.exists():
return None
path = alt
try:
df = pd.read_parquet(path)
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.set_index("timestamp").sort_index()
elif not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
df = df.sort_index()
if df.index.tz is not None:
df.index = df.index.tz_localize(None)
if self.cfg.start:
df = df[df.index >= pd.Timestamp(self.cfg.start)]
if self.cfg.end:
df = df[df.index <= pd.Timestamp(self.cfg.end)]
return df
except Exception:
return None
def _build_events(
self,
all_indicators: dict[str, pd.DataFrame],
all_signals: dict[str, np.ndarray],
) -> list[tuple[pd.Timestamp, str, int]]:
"""모든 심볼의 캔들을 타임스탬프 순서로 정렬한 이벤트 리스트 생성."""
events = []
for sym, df_ind in all_indicators.items():
for i in range(self.cfg.WARMUP, len(df_ind)):
ts = df_ind.index[i]
events.append((ts, sym, i))
events.sort(key=lambda x: (x[0], x[1]))
return events
def _check_sl_tp(self, symbol: str, row: pd.Series, ts: pd.Timestamp) -> bool:
"""캔들의 고가/저가로 SL/TP 체크. SL 우선. 청산 시 True 반환."""
pos = self.positions[symbol]
high = row["high"]
low = row["low"]
if pos.side == "LONG":
# SL 먼저 (보수적)
if low <= pos.sl:
self._close_position(symbol, pos.sl, ts, "STOP_LOSS")
return True
if high >= pos.tp:
self._close_position(symbol, pos.tp, ts, "TAKE_PROFIT")
return True
else: # SHORT
if high >= pos.sl:
self._close_position(symbol, pos.sl, ts, "STOP_LOSS")
return True
if low <= pos.tp:
self._close_position(symbol, pos.tp, ts, "TAKE_PROFIT")
return True
return False
def _try_enter(
self,
symbol: str,
signal: str,
df_ind: pd.DataFrame,
candle_idx: int,
feat_df: pd.DataFrame,
ts: pd.Timestamp,
):
"""ML 필터 + 포지션 크기 계산 → 진입."""
row = df_ind.iloc[candle_idx]
# 벡터화된 피처에서 해당 행을 lookup (학습과 동일한 z-score 적용)
available_cols = [c for c in FEATURE_COLS if c in feat_df.columns]
features = feat_df.iloc[candle_idx][available_cols]
# ML 필터
ml_filter = self.ml_filters.get(symbol)
ml_proba = _get_ml_proba(ml_filter, features)
if ml_filter is not None and ml_filter.is_model_loaded():
if ml_proba is not None and ml_proba < self.cfg.ml_threshold:
return # ML 차단
# 포지션 크기 계산
num_symbols = len(self.cfg.symbols)
per_symbol_balance = self.balance / num_symbols
price = float(row["close"])
margin_ratio = self.risk.get_dynamic_margin_ratio(self.balance)
notional = per_symbol_balance * margin_ratio * self.cfg.leverage
if notional < self.cfg.min_notional:
notional = self.cfg.min_notional
quantity = round(notional / price, 1)
if quantity * price < self.cfg.min_notional:
quantity = round(self.cfg.min_notional / price + 0.05, 1)
if quantity <= 0 or quantity * price < self.cfg.min_notional:
return
# 슬리피지 적용 (시장가 진입)
buy_side = "BUY" if signal == "LONG" else "SELL"
entry_price = _apply_slippage(price, buy_side, self.cfg.slippage_pct)
# 수수료
entry_fee = _calc_fee(entry_price, quantity, self.cfg.fee_pct)
self.balance -= entry_fee
# SL/TP 계산
atr = float(row.get("atr", 0))
if atr <= 0:
return
if signal == "LONG":
sl = entry_price - atr * self.cfg.atr_sl_mult
tp = entry_price + atr * self.cfg.atr_tp_mult
else:
sl = entry_price + atr * self.cfg.atr_sl_mult
tp = entry_price - atr * self.cfg.atr_tp_mult
indicators_snapshot = {
"rsi": float(row.get("rsi", 0)),
"macd_hist": float(row.get("macd_hist", 0)),
"atr": float(atr),
"adx": float(row.get("adx", 0)),
}
pos = Position(
symbol=symbol,
side=signal,
entry_price=entry_price,
quantity=quantity,
sl=sl,
tp=tp,
entry_time=ts,
entry_fee=entry_fee,
entry_indicators=indicators_snapshot,
ml_proba=ml_proba,
)
self.positions[symbol] = pos
self.risk.register(symbol, signal)
def _close_position(
self, symbol: str, exit_price: float, ts: pd.Timestamp, reason: str
):
pos = self.positions.pop(symbol)
# SL/TP 히트는 지정가이므로 슬리피지 없음. 그 외는 시장가.
if reason in ("REVERSE_SIGNAL", "END_OF_DATA"):
close_side = "SELL" if pos.side == "LONG" else "BUY"
exit_price = _apply_slippage(exit_price, close_side, self.cfg.slippage_pct)
exit_fee = _calc_fee(exit_price, pos.quantity, self.cfg.fee_pct)
if pos.side == "LONG":
gross_pnl = (exit_price - pos.entry_price) * pos.quantity
else:
gross_pnl = (pos.entry_price - exit_price) * pos.quantity
net_pnl = gross_pnl - pos.entry_fee - exit_fee
self.balance += net_pnl
self.risk.close(symbol, net_pnl)
trade = {
"symbol": symbol,
"side": pos.side,
"entry_time": str(pos.entry_time),
"exit_time": str(ts),
"entry_price": round(pos.entry_price, 6),
"exit_price": round(exit_price, 6),
"quantity": pos.quantity,
"sl": round(pos.sl, 6),
"tp": round(pos.tp, 6),
"gross_pnl": round(gross_pnl, 6),
"entry_fee": round(pos.entry_fee, 6),
"exit_fee": round(exit_fee, 6),
"net_pnl": round(net_pnl, 6),
"close_reason": reason,
"ml_proba": round(pos.ml_proba, 4) if pos.ml_proba is not None else None,
"indicators": pos.entry_indicators,
}
self.trades.append(trade)
def _record_equity(self, ts: pd.Timestamp):
# 미실현 PnL 포함 에퀴티
unrealized = 0.0
for pos in self.positions.values():
# 에퀴티 기록 시점에는 현재가를 알 수 없으므로 entry_price 기준으로 0 처리
pass
equity = self.balance + unrealized
self.equity_curve.append({"timestamp": str(ts), "equity": round(equity, 4)})
if equity > self._peak_equity:
self._peak_equity = equity
def _build_result(self) -> dict:
summary = self._calc_summary()
from src.backtest_validator import validate
validation = validate(self.trades, summary, self.cfg)
return {
"config": asdict(self.cfg),
"summary": summary,
"trades": self.trades,
"validation": validation,
}
def _calc_summary(self) -> dict:
if not self.trades:
return {
"total_trades": 0,
"total_pnl": 0.0,
"return_pct": 0.0,
"win_rate": 0.0,
"avg_win": 0.0,
"avg_loss": 0.0,
"profit_factor": 0.0,
"max_drawdown_pct": 0.0,
"sharpe_ratio": 0.0,
"total_fees": 0.0,
"close_reasons": {},
}
pnls = [t["net_pnl"] for t in self.trades]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
total_pnl = sum(pnls)
total_fees = sum(t["entry_fee"] + t["exit_fee"] for t in self.trades)
gross_profit = sum(wins) if wins else 0.0
gross_loss = abs(sum(losses)) if losses else 0.0
# MDD 계산
cumulative = np.cumsum(pnls)
equity = self.cfg.initial_balance + cumulative
peak = np.maximum.accumulate(equity)
drawdown = (peak - equity) / peak
mdd = float(np.max(drawdown)) * 100 if len(drawdown) > 0 else 0.0
# 샤프비율 (연율화, 15분봉 기준: 252일 * 96봉 = 24192)
if len(pnls) > 1:
pnl_arr = np.array(pnls)
sharpe = float(np.mean(pnl_arr) / np.std(pnl_arr) * np.sqrt(24192)) if np.std(pnl_arr) > 0 else 0.0
else:
sharpe = 0.0
# 청산 사유별 비율
reasons = {}
for t in self.trades:
r = t["close_reason"]
reasons[r] = reasons.get(r, 0) + 1
return {
"total_trades": len(self.trades),
"total_pnl": round(total_pnl, 4),
"return_pct": round(total_pnl / self.cfg.initial_balance * 100, 2),
"win_rate": round(len(wins) / len(self.trades) * 100, 2) if self.trades else 0.0,
"avg_win": round(np.mean(wins), 4) if wins else 0.0,
"avg_loss": round(np.mean(losses), 4) if losses else 0.0,
"profit_factor": round(gross_profit / gross_loss, 2) if gross_loss > 0 else float("inf"),
"max_drawdown_pct": round(mdd, 2),
"sharpe_ratio": round(sharpe, 2),
"total_fees": round(total_fees, 4),
"close_reasons": reasons,
}
# ── Walk-Forward 백테스트 ─────────────────────────────────────────────
@dataclass
class WalkForwardConfig(BacktestConfig):
train_months: int = 6 # 학습 윈도우 (개월)
test_months: int = 1 # 검증 윈도우 (개월)
time_weight_decay: float = 2.0
negative_ratio: int = 5
class WalkForwardBacktester:
"""
Walk-Forward 백테스트: 기간별로 모델을 학습하고 미래 데이터에서만 검증한다.
look-ahead bias를 완전히 제거한다.
"""
def __init__(self, cfg: WalkForwardConfig):
self.cfg = cfg
def run(self) -> dict:
# 데이터 로드 (전체 기간)
all_raw: dict[str, pd.DataFrame] = {}
for sym in self.cfg.symbols:
all_raw[sym] = _load_data(sym, self.cfg.start, self.cfg.end)
# 윈도우 생성
windows = self._build_windows(all_raw)
logger.info(f"Walk-Forward: {len(windows)}개 윈도우 "
f"(학습 {self.cfg.train_months}개월, 검증 {self.cfg.test_months}개월)")
all_trades = []
fold_summaries = []
for i, (train_start, train_end, test_start, test_end) in enumerate(windows):
logger.info(f" 폴드 {i+1}/{len(windows)}: "
f"학습 {train_start.date()}~{train_end.date()}, "
f"검증 {test_start.date()}~{test_end.date()}")
# 심볼별 모델 학습
models = {}
for sym in self.cfg.symbols:
model = self._train_model(
all_raw[sym], train_start, train_end, sym
)
models[sym] = model
# 검증 구간 백테스트
test_cfg = BacktestConfig(
symbols=self.cfg.symbols,
start=str(test_start.date()),
end=str(test_end.date()),
initial_balance=self.cfg.initial_balance,
leverage=self.cfg.leverage,
fee_pct=self.cfg.fee_pct,
slippage_pct=self.cfg.slippage_pct,
use_ml=self.cfg.use_ml,
ml_threshold=self.cfg.ml_threshold,
max_daily_loss_pct=self.cfg.max_daily_loss_pct,
max_positions=self.cfg.max_positions,
max_same_direction=self.cfg.max_same_direction,
margin_max_ratio=self.cfg.margin_max_ratio,
margin_min_ratio=self.cfg.margin_min_ratio,
margin_decay_rate=self.cfg.margin_decay_rate,
atr_sl_mult=self.cfg.atr_sl_mult,
atr_tp_mult=self.cfg.atr_tp_mult,
min_notional=self.cfg.min_notional,
signal_threshold=self.cfg.signal_threshold,
adx_threshold=self.cfg.adx_threshold,
volume_multiplier=self.cfg.volume_multiplier,
)
bt = Backtester(test_cfg)
result = bt.run(ml_models=models)
# 폴드별 트레이드에 폴드 번호 추가
for t in result["trades"]:
t["fold"] = i + 1
all_trades.extend(result["trades"])
fold_summaries.append({
"fold": i + 1,
"train_period": f"{train_start.date()} ~ {train_end.date()}",
"test_period": f"{test_start.date()} ~ {test_end.date()}",
"summary": result["summary"],
})
# 전체 결과 집계
return self._aggregate_results(all_trades, fold_summaries)
def _build_windows(
self, all_raw: dict[str, pd.DataFrame]
) -> list[tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]:
# 모든 심볼의 공통 기간
start = max(df.index[0] for df in all_raw.values())
end = min(df.index[-1] for df in all_raw.values())
train_delta = pd.DateOffset(months=self.cfg.train_months)
test_delta = pd.DateOffset(months=self.cfg.test_months)
windows = []
cursor = start
while cursor + train_delta + test_delta <= end:
train_start = cursor
train_end = cursor + train_delta
test_start = train_end
test_end = test_start + test_delta
windows.append((train_start, train_end, test_start, test_end))
cursor = test_start # 슬라이딩 (겹침 없음)
return windows
def _train_model(
self,
raw_df: pd.DataFrame,
train_start: pd.Timestamp,
train_end: pd.Timestamp,
symbol: str,
) -> object | None:
"""학습 구간 데이터로 LightGBM 모델 학습. 실패 시 None 반환."""
# tz-naive로 비교
ts_start = train_start.tz_localize(None) if train_start.tz else train_start
ts_end = train_end.tz_localize(None) if train_end.tz else train_end
idx = raw_df.index
if idx.tz is not None:
idx = idx.tz_localize(None)
train_df = raw_df[(idx >= ts_start) & (idx < ts_end)]
if len(train_df) < 200:
logger.warning(f" [{symbol}] 학습 데이터 부족: {len(train_df)}캔들")
return None
base_cols = ["open", "high", "low", "close", "volume"]
df = train_df[base_cols].copy()
# BTC/ETH 상관 데이터 (있으면)
btc_df = eth_df = None
if "close_btc" in train_df.columns:
btc_df = train_df[[c + "_btc" for c in base_cols]].copy()
btc_df.columns = base_cols
if "close_eth" in train_df.columns:
eth_df = train_df[[c + "_eth" for c in base_cols]].copy()
eth_df.columns = base_cols
try:
dataset = generate_dataset_vectorized(
df, btc_df=btc_df, eth_df=eth_df,
time_weight_decay=self.cfg.time_weight_decay,
negative_ratio=self.cfg.negative_ratio,
signal_threshold=self.cfg.signal_threshold,
adx_threshold=self.cfg.adx_threshold,
volume_multiplier=self.cfg.volume_multiplier,
)
except Exception as e:
logger.warning(f" [{symbol}] 데이터셋 생성 실패: {e}")
return None
if dataset.empty or "label" not in dataset.columns:
return None
actual_cols = [c for c in FEATURE_COLS if c in dataset.columns]
X = dataset[actual_cols].values
y = dataset["label"].values
w = dataset["sample_weight"].values
source = dataset["source"].values if "source" in dataset.columns else np.full(len(X), "signal")
# 언더샘플링
idx = stratified_undersample(y, source, seed=42)
# LightGBM 파라미터 (active 파일 또는 기본값)
lgbm_params = self._load_params(symbol)
model = lgb.LGBMClassifier(**lgbm_params, random_state=42, verbose=-1)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model.fit(X[idx], y[idx], sample_weight=w[idx])
return model
def _load_params(self, symbol: str) -> dict:
"""심볼별 active 파라미터 로드. 없으면 기본값."""
params_path = Path(f"models/{symbol.lower()}/active_lgbm_params.json")
if not params_path.exists():
params_path = Path("models/active_lgbm_params.json")
default = {
"n_estimators": 434,
"learning_rate": 0.123659,
"max_depth": 6,
"num_leaves": 14,
"min_child_samples": 10,
"subsample": 0.929062,
"colsample_bytree": 0.946330,
"reg_alpha": 0.573971,
"reg_lambda": 0.000157,
}
if params_path.exists():
import json
with open(params_path) as f:
data = json.load(f)
best = dict(data["best_trial"]["params"])
best.pop("weight_scale", None)
default.update(best)
return default
def _aggregate_results(
self, all_trades: list[dict], fold_summaries: list[dict]
) -> dict:
"""폴드별 결과를 합산하여 전체 Walk-Forward 결과 생성."""
from src.backtest_validator import validate
# 전체 통계 계산
if not all_trades:
summary = {"total_trades": 0, "total_pnl": 0.0, "return_pct": 0.0,
"win_rate": 0.0, "avg_win": 0.0, "avg_loss": 0.0,
"profit_factor": 0.0, "max_drawdown_pct": 0.0,
"sharpe_ratio": 0.0, "total_fees": 0.0, "close_reasons": {}}
else:
pnls = [t["net_pnl"] for t in all_trades]
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p <= 0]
total_pnl = sum(pnls)
total_fees = sum(t["entry_fee"] + t["exit_fee"] for t in all_trades)
gross_profit = sum(wins) if wins else 0.0
gross_loss = abs(sum(losses)) if losses else 0.0
cumulative = np.cumsum(pnls)
equity = self.cfg.initial_balance + cumulative
peak = np.maximum.accumulate(equity)
drawdown = (peak - equity) / peak
mdd = float(np.max(drawdown)) * 100 if len(drawdown) > 0 else 0.0
if len(pnls) > 1:
pnl_arr = np.array(pnls)
sharpe = float(np.mean(pnl_arr) / np.std(pnl_arr) * np.sqrt(24192)) if np.std(pnl_arr) > 0 else 0.0
else:
sharpe = 0.0
reasons = {}
for t in all_trades:
r = t["close_reason"]
reasons[r] = reasons.get(r, 0) + 1
summary = {
"total_trades": len(all_trades),
"total_pnl": round(total_pnl, 4),
"return_pct": round(total_pnl / self.cfg.initial_balance * 100, 2),
"win_rate": round(len(wins) / len(all_trades) * 100, 2),
"avg_win": round(np.mean(wins), 4) if wins else 0.0,
"avg_loss": round(np.mean(losses), 4) if losses else 0.0,
"profit_factor": round(gross_profit / gross_loss, 2) if gross_loss > 0 else float("inf"),
"max_drawdown_pct": round(mdd, 2),
"sharpe_ratio": round(sharpe, 2),
"total_fees": round(total_fees, 4),
"close_reasons": reasons,
}
validation = validate(all_trades, summary, self.cfg)
return {
"mode": "walk_forward",
"config": asdict(self.cfg),
"summary": summary,
"folds": fold_summaries,
"trades": all_trades,
"validation": validation,
}