diff --git a/CLAUDE.md b/CLAUDE.md index 16f9c4d..fe83ffc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -130,3 +130,4 @@ All design documents and implementation plans are stored in `docs/plans/` with t | 2026-03-04 | `oi-derived-features` (design + plan) | Completed | | 2026-03-05 | `multi-symbol-trading` (design + plan) | Completed | | 2026-03-06 | `multi-symbol-dashboard` (design + plan) | Completed | +| 2026-03-06 | `strategy-parameter-sweep` (plan) | Completed | diff --git a/docs/plans/2026-03-06-strategy-parameter-sweep-plan.md b/docs/plans/2026-03-06-strategy-parameter-sweep-plan.md new file mode 100644 index 0000000..4f8d691 --- /dev/null +++ b/docs/plans/2026-03-06-strategy-parameter-sweep-plan.md @@ -0,0 +1,80 @@ +# Strategy Parameter Sweep Plan + +**Date**: 2026-03-06 +**Status**: Completed + +## Goal + +Find profitable parameter combinations for the base technical indicator strategy (ML OFF) using walk-forward backtesting, targeting PF >= 1.0 as foundation for ML redesign. + +## Background + +Walk-forward backtest revealed the current XRP strategy is unprofitable (PF 0.71, -641 PnL). The strategy parameter sweep systematically tests 324 combinations of 5 parameters to find profitable regimes. + +## Parameters Swept + +| Parameter | Values | Description | +|-----------|--------|-------------| +| `atr_sl_mult` | 1.0, 1.5, 2.0 | Stop-loss ATR multiplier | +| `atr_tp_mult` | 2.0, 3.0, 4.0 | Take-profit ATR multiplier | +| `signal_threshold` | 3, 4, 5 | Min weighted indicator score for entry | +| `adx_threshold` | 0, 20, 25, 30 | ADX filter (0=disabled, N=require ADX>=N) | +| `volume_multiplier` | 1.5, 2.0, 2.5 | Volume surge detection multiplier | + +Total combinations: 3 x 3 x 3 x 4 x 3 = **324** + +## Implementation + +### Files Modified +- `src/indicators.py` — `get_signal()` accepts `signal_threshold`, `adx_threshold`, `volume_multiplier` params +- `src/dataset_builder.py` — `_calc_signals()` accepts same params for vectorized computation +- `src/backtester.py` — `BacktestConfig` includes strategy params; `WalkForwardBacktester` propagates them to test folds + +### Files Created +- `scripts/strategy_sweep.py` — CLI tool for parameter grid sweep + +### Bug Fix +- `WalkForwardBacktester` was not passing `signal_threshold`, `adx_threshold`, `volume_multiplier`, or `use_ml` to fold `BacktestConfig`. All signal params were silently using defaults, making ADX/volume/threshold sweeps have zero effect. + +## Results (XRPUSDT, Walk-Forward 3/1) + +### Top 10 Combinations + +| Rank | SL×ATR | TP×ATR | Signal | ADX | Vol | Trades | WinRate | PF | MDD | PnL | Sharpe | +|------|--------|--------|--------|-----|-----|--------|---------|-----|-----|------|--------| +| 1 | 1.5 | 4.0 | 3 | 30 | 2.5 | 19 | 52.6% | 2.39 | 7.0% | +469 | 61.0 | +| 2 | 1.5 | 2.0 | 3 | 30 | 2.5 | 19 | 68.4% | 2.23 | 6.5% | +282 | 61.2 | +| 3 | 1.0 | 2.0 | 3 | 30 | 2.5 | 19 | 57.9% | 1.98 | 5.0% | +213 | 50.8 | +| 4 | 1.0 | 4.0 | 3 | 30 | 2.5 | 19 | 36.8% | 1.80 | 7.7% | +248 | 37.1 | +| 5 | 1.5 | 3.0 | 3 | 30 | 2.5 | 19 | 52.6% | 1.76 | 10.1% | +258 | 40.9 | +| 6 | 1.5 | 4.0 | 3 | 25 | 2.5 | 28 | 42.9% | 1.75 | 13.1% | +381 | 36.8 | +| 7 | 2.0 | 4.0 | 3 | 30 | 1.5 | 39 | 48.7% | 1.67 | 16.9% | +572 | 35.3 | +| 8 | 1.0 | 2.0 | 3 | 25 | 2.5 | 28 | 50.0% | 1.64 | 5.8% | +205 | 35.7 | +| 9 | 1.5 | 2.0 | 3 | 25 | 2.5 | 28 | 57.1% | 1.62 | 10.3% | +229 | 35.7 | +| 10 | 2.0 | 2.0 | 3 | 25 | 2.5 | 27 | 66.7% | 1.57 | 12.0% | +217 | 33.3 | + +### Current Production (Rank 93/324) +| SL×ATR | TP×ATR | Signal | ADX | Vol | Trades | WinRate | PF | MDD | PnL | +|--------|--------|--------|-----|-----|--------|---------|-----|-----|------| +| 1.5 | 3.0 | 3 | 0 | 1.5 | 118 | 30.5% | 0.71 | 65.9% | -641 | + +### Key Findings + +1. **ADX filter is the single most impactful parameter.** All top 10 results use ADX >= 25, with ADX=30 dominating the top 5. This filters out sideways/ranging markets where signals are noise. + +2. **Volume multiplier 2.5 dominates.** Higher volume thresholds ensure entries only on strong conviction (genuine breakouts vs. noise). + +3. **Signal threshold 3 is optimal.** Higher thresholds (4, 5) produced too few trades or zero trades in most ADX-filtered regimes. + +4. **SL/TP ratios matter less than entry filters.** The top results span all SL/TP combos, but all share ADX=25-30 + Vol=2.5. + +5. **Trade count drops significantly with filters.** Top combos have 19-39 trades vs. 118 for current. Fewer but higher quality entries. + +6. **41 combinations achieved PF >= 1.0** out of 324 total (12.7%). + +## Recommended Next Steps + +1. **Update production defaults**: ADX=25, volume_multiplier=2.0 as a conservative choice (more trades than ADX=30) +2. **Validate on TRXUSDT and DOGEUSDT** to confirm ADX filter is not XRP-specific +3. **Retrain ML models** with updated strategy params — the ML filter should now have a profitable base to improve upon +4. **Fine-tune sweep** around the profitable zone: ADX [25-35], Vol [2.0-3.0] diff --git a/models/dogeusdt/lgbm_filter.pkl b/models/dogeusdt/lgbm_filter.pkl index 97dc77f..12a0988 100644 Binary files a/models/dogeusdt/lgbm_filter.pkl and b/models/dogeusdt/lgbm_filter.pkl differ diff --git a/models/dogeusdt/training_log.json b/models/dogeusdt/training_log.json index 7ba1e43..9afea85 100644 --- a/models/dogeusdt/training_log.json +++ b/models/dogeusdt/training_log.json @@ -23,5 +23,80 @@ "reg_lambda": 0.000157 }, "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T02:00:56.287381", + "backend": "lgbm", + "auc": 0.9555, + "best_threshold": 0.4012, + "best_precision": 0.577, + "best_recall": 0.319, + "samples": 3330, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/dogeusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T22:37:26.751875", + "backend": "lgbm", + "auc": 0.9565, + "best_threshold": 0.4047, + "best_precision": 0.65, + "best_recall": 0.277, + "samples": 3336, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/dogeusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T23:35:19.306197", + "backend": "lgbm", + "auc": 0.9552, + "best_threshold": 0.8009, + "best_precision": 0.75, + "best_recall": 0.2, + "samples": 744, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/dogeusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 } ] \ No newline at end of file diff --git a/models/trxusdt/lgbm_filter.pkl b/models/trxusdt/lgbm_filter.pkl index c457fa1..1740b7b 100644 Binary files a/models/trxusdt/lgbm_filter.pkl and b/models/trxusdt/lgbm_filter.pkl differ diff --git a/models/trxusdt/training_log.json b/models/trxusdt/training_log.json index 4d52670..97aa2f8 100644 --- a/models/trxusdt/training_log.json +++ b/models/trxusdt/training_log.json @@ -23,5 +23,80 @@ "reg_lambda": 0.000157 }, "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T02:00:40.471987", + "backend": "lgbm", + "auc": 0.9433, + "best_threshold": 0.2433, + "best_precision": 0.439, + "best_recall": 0.947, + "samples": 2940, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/trxusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T22:37:17.762061", + "backend": "lgbm", + "auc": 0.9493, + "best_threshold": 0.2613, + "best_precision": 0.448, + "best_recall": 0.975, + "samples": 2952, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/trxusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T23:35:11.188338", + "backend": "lgbm", + "auc": 0.96, + "best_threshold": 0.6121, + "best_precision": 0.75, + "best_recall": 0.6, + "samples": 648, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/trxusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 } ] \ No newline at end of file diff --git a/models/xrpusdt/lgbm_filter.pkl b/models/xrpusdt/lgbm_filter.pkl index c048776..8c64a16 100644 Binary files a/models/xrpusdt/lgbm_filter.pkl and b/models/xrpusdt/lgbm_filter.pkl differ diff --git a/models/xrpusdt/training_log.json b/models/xrpusdt/training_log.json index 2ce89c8..fc8e0f6 100644 --- a/models/xrpusdt/training_log.json +++ b/models/xrpusdt/training_log.json @@ -23,5 +23,80 @@ "reg_lambda": 0.000157 }, "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T02:00:24.712465", + "backend": "lgbm", + "auc": 0.9456, + "best_threshold": 0.7213, + "best_precision": 0.6, + "best_recall": 0.22, + "samples": 3222, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/xrpusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T22:37:08.529734", + "backend": "lgbm", + "auc": 0.9448, + "best_threshold": 0.7881, + "best_precision": 0.538, + "best_recall": 0.167, + "samples": 3234, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/xrpusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 + }, + { + "date": "2026-03-06T23:35:02.930027", + "backend": "lgbm", + "auc": 0.9598, + "best_threshold": 0.4674, + "best_precision": 1.0, + "best_recall": 0.182, + "samples": 618, + "features": 26, + "time_weight_decay": 2.0, + "model_path": "models/xrpusdt/lgbm_filter.pkl", + "tuned_params_path": null, + "lgbm_params": { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.94633, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157 + }, + "weight_scale": 1.783105 } ] \ No newline at end of file diff --git a/scripts/run_backtest.py b/scripts/run_backtest.py new file mode 100644 index 0000000..a2f1a0f --- /dev/null +++ b/scripts/run_backtest.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +백테스트 CLI 진입점. + +사용법: + python scripts/run_backtest.py --symbol XRPUSDT + python scripts/run_backtest.py --symbols XRPUSDT,TRXUSDT,DOGEUSDT + python scripts/run_backtest.py --symbol XRPUSDT --no-ml + python scripts/run_backtest.py --symbol XRPUSDT --start 2025-06-01 --end 2026-03-01 + python scripts/run_backtest.py --symbol XRPUSDT --fee 0.04 --slippage 0.02 + python scripts/run_backtest.py --symbol XRPUSDT --walk-forward + python scripts/run_backtest.py --symbol XRPUSDT --walk-forward --train-months 6 --test-months 1 +""" +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import argparse +import json +from datetime import datetime + +import numpy as np + +from loguru import logger + +from src.backtester import Backtester, BacktestConfig, WalkForwardBacktester, WalkForwardConfig + + +def parse_args(): + p = argparse.ArgumentParser(description="CoinTrader Backtest Engine") + group = p.add_mutually_exclusive_group(required=True) + group.add_argument("--symbol", type=str, help="단일 심볼 (e.g. XRPUSDT)") + group.add_argument("--symbols", type=str, help="멀티심볼, 콤마 구분 (e.g. XRPUSDT,TRXUSDT,DOGEUSDT)") + + p.add_argument("--start", type=str, default=None, help="시작일 (e.g. 2025-06-01)") + p.add_argument("--end", type=str, default=None, help="종료일 (e.g. 2026-03-01)") + p.add_argument("--balance", type=float, default=1000.0, help="초기 잔고 (기본: 1000)") + p.add_argument("--leverage", type=int, default=10, help="레버리지 (기본: 10)") + p.add_argument("--fee", type=float, default=0.04, help="taker 수수료 %% (기본: 0.04)") + p.add_argument("--slippage", type=float, default=0.01, help="슬리피지 %% (기본: 0.01)") + p.add_argument("--no-ml", action="store_true", help="ML 필터 비활성화") + p.add_argument("--ml-threshold", type=float, default=0.55, help="ML 임계값 (기본: 0.55)") + + # Strategy params + p.add_argument("--sl-atr", type=float, default=1.5, help="SL ATR 배수 (기본: 1.5)") + p.add_argument("--tp-atr", type=float, default=3.0, help="TP ATR 배수 (기본: 3.0)") + p.add_argument("--signal-threshold", type=int, default=3, help="신호 임계값 (기본: 3)") + p.add_argument("--adx-threshold", type=float, default=0, help="ADX 필터 (0=비활성화, 기본: 0)") + p.add_argument("--vol-multiplier", type=float, default=1.5, help="거래량 급증 배수 (기본: 1.5)") + + # Walk-Forward + p.add_argument("--walk-forward", action="store_true", help="Walk-Forward 백테스트 (기간별 모델 학습/검증)") + p.add_argument("--train-months", type=int, default=6, help="WF 학습 윈도우 개월 (기본: 6)") + p.add_argument("--test-months", type=int, default=1, help="WF 검증 윈도우 개월 (기본: 1)") + return p.parse_args() + + +def print_summary(summary: dict, cfg, mode: str = "standard"): + print("\n" + "=" * 60) + title = "WALK-FORWARD BACKTEST RESULT" if mode == "walk_forward" else "BACKTEST RESULT" + print(f" {title}") + print("=" * 60) + print(f" 심볼: {', '.join(cfg.symbols)}") + print(f" 기간: {cfg.start or '전체'} ~ {cfg.end or '전체'}") + print(f" 초기 잔고: {cfg.initial_balance:,.2f} USDT") + print(f" 레버리지: {cfg.leverage}x") + print(f" 수수료: {cfg.fee_pct}% | 슬리피지: {cfg.slippage_pct}%") + if mode == "walk_forward": + print(f" 학습/검증: {cfg.train_months}개월 / {cfg.test_months}개월") + else: + print(f" ML 필터: {'OFF' if not cfg.use_ml else f'ON (threshold={cfg.ml_threshold})'}") + print("-" * 60) + print(f" 총 거래: {summary['total_trades']}건") + print(f" 총 PnL: {summary['total_pnl']:+,.4f} USDT") + print(f" 수익률: {summary['return_pct']:+.2f}%") + print(f" 승률: {summary['win_rate']:.1f}%") + print(f" 평균 수익: {summary['avg_win']:+.4f} USDT") + print(f" 평균 손실: {summary['avg_loss']:+.4f} USDT") + pf = summary['profit_factor'] + pf_str = f"{pf:.2f}" if pf != float("inf") else "INF" + print(f" Profit Factor: {pf_str}") + print(f" 최대 낙폭: {summary['max_drawdown_pct']:.2f}%") + print(f" 샤프비율: {summary['sharpe_ratio']:.2f}") + print(f" 총 수수료: {summary['total_fees']:,.4f} USDT") + print("-" * 60) + print(" 청산 사유:") + for reason, count in summary.get("close_reasons", {}).items(): + pct = count / summary["total_trades"] * 100 if summary["total_trades"] > 0 else 0 + print(f" {reason:20s} {count:4d}건 ({pct:.1f}%)") + print("=" * 60) + + +def print_fold_table(folds: list[dict]): + print("\n" + "=" * 90) + print(" FOLD DETAILS") + print("=" * 90) + print(f" {'Fold':>4} {'Test Period':>25} {'Trades':>6} {'PnL':>10} {'WinRate':>7} {'PF':>6} {'MDD':>6}") + print("-" * 90) + for f in folds: + s = f["summary"] + pf = s["profit_factor"] + pf_str = f"{pf:.2f}" if pf != float("inf") else "INF" + print(f" {f['fold']:>4} {f['test_period']:>25} {s['total_trades']:>6} " + f"{s['total_pnl']:>+10.2f} {s['win_rate']:>6.1f}% {pf_str:>6} {s['max_drawdown_pct']:>5.1f}%") + print("=" * 90) + + +def save_result(result: dict, cfg): + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + mode = result.get("mode", "standard") + prefix = "wf_backtest" if mode == "walk_forward" else "backtest" + + for sym in cfg.symbols: + out_dir = Path(f"results/{sym.lower()}") + out_dir.mkdir(parents=True, exist_ok=True) + path = out_dir / f"{prefix}_{ts}.json" + + if len(cfg.symbols) > 1: + out_dir = Path("results/combined") + out_dir.mkdir(parents=True, exist_ok=True) + path = out_dir / f"{prefix}_{ts}.json" + + def sanitize(obj): + if isinstance(obj, bool): + return obj + if isinstance(obj, (int, float)): + if isinstance(obj, float): + if obj == float("inf"): + return "Infinity" + if obj == float("-inf"): + return "-Infinity" + return obj + if isinstance(obj, dict): + return {k: sanitize(v) for k, v in obj.items()} + if isinstance(obj, list): + return [sanitize(v) for v in obj] + if isinstance(obj, (np.integer,)): + return int(obj) + if isinstance(obj, (np.floating,)): + return float(obj) + if isinstance(obj, np.bool_): + return bool(obj) + return obj + + with open(path, "w") as f: + json.dump(sanitize(result), f, indent=2, ensure_ascii=False) + print(f"결과 저장: {path}") + return path + + +def main(): + args = parse_args() + + if args.symbol: + symbols = [args.symbol.upper()] + else: + symbols = [s.strip().upper() for s in args.symbols.split(",") if s.strip()] + + if args.walk_forward: + cfg = WalkForwardConfig( + symbols=symbols, + start=args.start, + end=args.end, + initial_balance=args.balance, + leverage=args.leverage, + fee_pct=args.fee, + slippage_pct=args.slippage, + use_ml=not args.no_ml, + ml_threshold=args.ml_threshold, + atr_sl_mult=args.sl_atr, + atr_tp_mult=args.tp_atr, + signal_threshold=args.signal_threshold, + adx_threshold=args.adx_threshold, + volume_multiplier=args.vol_multiplier, + train_months=args.train_months, + test_months=args.test_months, + ) + logger.info(f"Walk-Forward 백테스트 시작: {', '.join(symbols)} " + f"(학습 {cfg.train_months}개월, 검증 {cfg.test_months}개월)") + wf = WalkForwardBacktester(cfg) + result = wf.run() + print_summary(result["summary"], cfg, mode="walk_forward") + if result.get("folds"): + print_fold_table(result["folds"]) + save_result(result, cfg) + else: + cfg = BacktestConfig( + symbols=symbols, + start=args.start, + end=args.end, + initial_balance=args.balance, + leverage=args.leverage, + fee_pct=args.fee, + slippage_pct=args.slippage, + use_ml=not args.no_ml, + ml_threshold=args.ml_threshold, + atr_sl_mult=args.sl_atr, + atr_tp_mult=args.tp_atr, + signal_threshold=args.signal_threshold, + adx_threshold=args.adx_threshold, + volume_multiplier=args.vol_multiplier, + ) + logger.info(f"백테스트 시작: {', '.join(symbols)}") + bt = Backtester(cfg) + result = bt.run() + print_summary(result["summary"], cfg) + save_result(result, cfg) + + +if __name__ == "__main__": + main() diff --git a/scripts/strategy_sweep.py b/scripts/strategy_sweep.py new file mode 100644 index 0000000..b7f3647 --- /dev/null +++ b/scripts/strategy_sweep.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +전략 파라미터 스윕: 기존 백테스터를 활용하여 파라미터 조합별 성능을 비교한다. +ML 필터 OFF 상태에서 순수 전략 성능만 측정한다. + +사용법: + python scripts/strategy_sweep.py --symbol XRPUSDT + python scripts/strategy_sweep.py --symbol XRPUSDT --train-months 3 --test-months 1 + python scripts/strategy_sweep.py --symbols XRPUSDT,TRXUSDT,DOGEUSDT + python scripts/strategy_sweep.py --symbols XRPUSDT,TRXUSDT,DOGEUSDT --combined +""" +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import argparse +import json +import itertools +from datetime import datetime + +import numpy as np +from loguru import logger + +from src.backtester import Backtester, BacktestConfig, WalkForwardBacktester, WalkForwardConfig + + +# ── 스윕 파라미터 정의 ──────────────────────────────────────────────── +PARAM_GRID = { + "atr_sl_mult": [1.0, 1.5, 2.0], + "atr_tp_mult": [2.0, 3.0, 4.0], + "signal_threshold": [3, 4, 5], + "adx_threshold": [0, 20, 25, 30], + "volume_multiplier": [1.5, 2.0, 2.5], +} + +# 현재 프로덕션 파라미터 +CURRENT_PARAMS = { + "atr_sl_mult": 2.0, + "atr_tp_mult": 2.0, + "signal_threshold": 3, + "adx_threshold": 25, + "volume_multiplier": 2.5, +} + +EMPTY_SUMMARY = { + "total_trades": 0, "total_pnl": 0, "return_pct": 0, "win_rate": 0, + "avg_win": 0, "avg_loss": 0, "profit_factor": 0, + "max_drawdown_pct": 0, "sharpe_ratio": 0, "total_fees": 0, "close_reasons": {}, +} + + +def generate_combinations(grid: dict) -> list[dict]: + keys = list(grid.keys()) + values = list(grid.values()) + combos = [] + for combo in itertools.product(*values): + combos.append(dict(zip(keys, combo))) + return combos + + +def run_single_backtest(symbols: list[str], params: dict, train_months: int, test_months: int) -> dict: + """단일 파라미터 조합으로 walk-forward 백테스트 실행.""" + cfg = WalkForwardConfig( + symbols=symbols, + use_ml=False, + train_months=train_months, + test_months=test_months, + atr_sl_mult=params["atr_sl_mult"], + atr_tp_mult=params["atr_tp_mult"], + signal_threshold=params["signal_threshold"], + adx_threshold=params["adx_threshold"], + volume_multiplier=params["volume_multiplier"], + ) + wf = WalkForwardBacktester(cfg) + result = wf.run() + return result["summary"] + + +def run_combined_backtest(symbols: list[str], params: dict, train_months: int, test_months: int) -> dict: + """심볼별 독립 walk-forward 실행 후 합산 결과 반환.""" + per_symbol = {} + total_gross_profit = 0.0 + total_gross_loss = 0.0 + total_trades = 0 + total_pnl = 0.0 + + for sym in symbols: + try: + summary = run_single_backtest([sym], params, train_months, test_months) + except Exception as e: + logger.warning(f" {sym} 실패: {e}") + summary = EMPTY_SUMMARY.copy() + + per_symbol[sym] = summary + + # gross profit/loss 역산 + n = summary["total_trades"] + if n > 0: + wr = summary["win_rate"] / 100.0 + n_wins = round(wr * n) + n_losses = n - n_wins + gp = summary["avg_win"] * n_wins if n_wins > 0 else 0.0 + gl = abs(summary["avg_loss"]) * n_losses if n_losses > 0 else 0.0 + total_gross_profit += gp + total_gross_loss += gl + total_trades += n + total_pnl += summary["total_pnl"] + + combined_pf = (total_gross_profit / total_gross_loss) if total_gross_loss > 0 else float("inf") + + return { + "params": params, + "combined_pf": round(combined_pf, 2), + "combined_trades": total_trades, + "combined_pnl": round(total_pnl, 2), + "per_symbol": per_symbol, + } + + +def print_results_table(results: list[dict], symbols: list[str], train_months: int, test_months: int): + sym_str = ",".join(symbols) + print(f"\n{'=' * 100}") + print(f" Strategy Parameter Sweep Results ({sym_str}, Walk-Forward {train_months}/{test_months})") + print(f"{'=' * 100}") + print(f" {'Rank':>4} {'SL×ATR':>6} {'TP×ATR':>6} {'Signal':>6} {'ADX':>4} {'Vol':>4} " + f"{'Trades':>6} {'WinRate':>7} {'PF':>6} {'MDD':>5} {'PnL':>10} {'Sharpe':>6}") + print(f" {'-' * 94}") + + for i, r in enumerate(results): + p = r["params"] + s = r["summary"] + pf = s["profit_factor"] + pf_str = f"{pf:.2f}" if pf != float("inf") else "INF" + + is_current = all(p[k] == CURRENT_PARAMS[k] for k in CURRENT_PARAMS) + marker = " ← CURRENT" if is_current else "" + + print(f" {i+1:>4} {p['atr_sl_mult']:>6.1f} {p['atr_tp_mult']:>6.1f} " + f"{p['signal_threshold']:>6} {p['adx_threshold']:>4.0f} {p['volume_multiplier']:>4.1f} " + f"{s['total_trades']:>6} {s['win_rate']:>6.1f}% {pf_str:>6} {s['max_drawdown_pct']:>4.1f}% " + f"{s['total_pnl']:>+10.2f} {s['sharpe_ratio']:>6.1f}{marker}") + + print(f"{'=' * 100}") + + +def print_combined_results_table(results: list[dict], symbols: list[str], + train_months: int, test_months: int, + min_pf_count: int = 2, min_pf: float = 0.9): + sym_str = ",".join(symbols) + # 심볼 약칭 + short = {s: s.replace("USDT", "") for s in symbols} + + print(f"\n{'=' * 130}") + print(f" Combined Strategy Sweep ({sym_str}, WF {train_months}/{test_months})") + print(f" Filter: {min_pf_count}+ symbols with PF >= {min_pf}") + print(f"{'=' * 130}") + + # 헤더 + sym_headers = " ".join(f"{short[s]:>12s}" for s in symbols) + print(f" {'Rank':>4} {'SL':>4} {'TP':>4} {'Sig':>3} {'ADX':>3} {'Vol':>4} " + f"{'Tot':>4} {'CombPF':>6} {'PnL':>9} {sym_headers}") + + # 심볼별 서브헤더 + sub = " ".join(f"{'PF/WR%/Trd':>12s}" for _ in symbols) + print(f" {'':>4} {'':>4} {'':>4} {'':>3} {'':>3} {'':>4} " + f"{'':>4} {'':>6} {'':>9} {sub}") + print(f" {'-' * 124}") + + for i, r in enumerate(results): + p = r["params"] + cpf = r["combined_pf"] + cpf_str = f"{cpf:.2f}" if cpf != float("inf") else "INF" + + is_current = all(p[k] == CURRENT_PARAMS[k] for k in CURRENT_PARAMS) + marker = " ←CUR" if is_current else "" + + # 심볼별 PF/WR/Trades + sym_cols = [] + for s in symbols: + ss = r["per_symbol"][s] + spf = ss["profit_factor"] + spf_str = f"{spf:.1f}" if spf != float("inf") else "INF" + sym_cols.append(f"{spf_str}/{ss['win_rate']:.0f}%/{ss['total_trades']}") + + sym_detail = " ".join(f"{c:>12s}" for c in sym_cols) + + print(f" {i+1:>4} {p['atr_sl_mult']:>4.1f} {p['atr_tp_mult']:>4.1f} " + f"{p['signal_threshold']:>3} {p['adx_threshold']:>3.0f} {p['volume_multiplier']:>4.1f} " + f"{r['combined_trades']:>4} {cpf_str:>6} {r['combined_pnl']:>+9.1f} " + f"{sym_detail}{marker}") + + print(f"{'=' * 130}") + print(f" 표시된 조합: {len(results)}개 / 전체 324개") + print(f" 심볼별 칼럼: PF/승률%/거래수") + + +def save_results(results: list[dict], symbols: list[str]): + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + for sym in symbols: + out_dir = Path(f"results/{sym.lower()}") + out_dir.mkdir(parents=True, exist_ok=True) + path = out_dir / f"strategy_sweep_{ts}.json" + + if len(symbols) > 1: + out_dir = Path("results/combined") + out_dir.mkdir(parents=True, exist_ok=True) + path = out_dir / f"strategy_sweep_{ts}.json" + + def sanitize(obj): + if isinstance(obj, bool): + return obj + if isinstance(obj, (np.integer,)): + return int(obj) + if isinstance(obj, (np.floating,)): + return float(obj) + if isinstance(obj, float) and obj == float("inf"): + return "Infinity" + if isinstance(obj, dict): + return {k: sanitize(v) for k, v in obj.items()} + if isinstance(obj, list): + return [sanitize(v) for v in obj] + return obj + + with open(path, "w") as f: + json.dump(sanitize(results), f, indent=2, ensure_ascii=False) + print(f"결과 저장: {path}") + + +def main(): + p = argparse.ArgumentParser(description="Strategy Parameter Sweep") + group = p.add_mutually_exclusive_group(required=True) + group.add_argument("--symbol", type=str) + group.add_argument("--symbols", type=str) + p.add_argument("--train-months", type=int, default=3) + p.add_argument("--test-months", type=int, default=1) + p.add_argument("--combined", action="store_true", + help="심볼별 독립 실행 후 합산 PF 기준 정렬 (--symbols 필수)") + p.add_argument("--min-pf", type=float, default=0.9, + help="심볼별 최소 PF 필터 (기본: 0.9)") + p.add_argument("--min-pf-count", type=int, default=2, + help="최소 PF 충족 심볼 수 (기본: 2)") + args = p.parse_args() + + symbols = [args.symbol.upper()] if args.symbol else [s.strip().upper() for s in args.symbols.split(",")] + + if args.combined: + if len(symbols) < 2: + logger.error("--combined 모드는 --symbols에 2개 이상 심볼 필요") + sys.exit(1) + run_combined_sweep(symbols, args) + else: + run_single_sweep(symbols, args) + + +def run_single_sweep(symbols: list[str], args): + combos = generate_combinations(PARAM_GRID) + logger.info(f"스윕 시작: {len(combos)}개 조합, 심볼={','.join(symbols)}") + + results = [] + for i, params in enumerate(combos): + param_str = " | ".join(f"{k}={v}" for k, v in params.items()) + logger.info(f" [{i+1}/{len(combos)}] {param_str}") + + try: + summary = run_single_backtest(symbols, params, args.train_months, args.test_months) + results.append({"params": params, "summary": summary}) + except Exception as e: + logger.warning(f" 실패: {e}") + results.append({"params": params, "summary": EMPTY_SUMMARY.copy()}) + + # PF 기준 내림차순 정렬 + def sort_key(r): + pf = r["summary"]["profit_factor"] + return pf if pf != float("inf") else 999 + results.sort(key=sort_key, reverse=True) + + print_results_table(results, symbols, args.train_months, args.test_months) + save_results(results, symbols) + + +def run_combined_sweep(symbols: list[str], args): + combos = generate_combinations(PARAM_GRID) + total_runs = len(combos) * len(symbols) + logger.info(f"합산 스윕 시작: {len(combos)}개 조합 × {len(symbols)}심볼 = {total_runs}회") + + results = [] + for i, params in enumerate(combos): + param_str = " | ".join(f"{k}={v}" for k, v in params.items()) + logger.info(f" [{i+1}/{len(combos)}] {param_str}") + + r = run_combined_backtest(symbols, params, args.train_months, args.test_months) + results.append(r) + + # 필터: N개 이상 심볼에서 PF >= min_pf + filtered = [] + for r in results: + pf_pass = sum( + 1 for s in symbols + if r["per_symbol"][s]["profit_factor"] >= args.min_pf + and r["per_symbol"][s]["total_trades"] > 0 + ) + if pf_pass >= args.min_pf_count: + filtered.append(r) + + # 합산 PF 기준 정렬 + def sort_key(r): + pf = r["combined_pf"] + return pf if pf != float("inf") else 999 + filtered.sort(key=sort_key, reverse=True) + + print_combined_results_table(filtered, symbols, args.train_months, args.test_months, + min_pf_count=args.min_pf_count, min_pf=args.min_pf) + save_results(filtered, symbols) + + +if __name__ == "__main__": + main() diff --git a/src/backtest_validator.py b/src/backtest_validator.py new file mode 100644 index 0000000..8856e8c --- /dev/null +++ b/src/backtest_validator.py @@ -0,0 +1,228 @@ +""" +백테스트 결과 Sanity Check 검증. +논리적 불변 조건(FAIL) + 통계적 이상 감지(WARNING)를 수행한다. +""" +from __future__ import annotations + +from dataclasses import dataclass +import pandas as pd + + +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +RESET = "\033[0m" + + +@dataclass +class CheckResult: + name: str + passed: bool + level: str # "FAIL" | "WARNING" + message: str + + +def validate(trades: list[dict], summary: dict, cfg) -> dict: + """ + 모든 검증을 실행하고 결과를 dict로 반환한다. + CLI에도 PASS/WARNING/FAIL을 출력한다. + """ + results: list[CheckResult] = [] + + # 검증 1: 논리적 불변 조건 + results.extend(_check_invariants(trades)) + + # 검증 2: 통계적 이상 감지 + results.extend(_check_statistics(trades, summary)) + + # 결과 출력 + _print_results(results) + + return { + "overall": "PASS" if all(r.passed for r in results) else "FAIL", + "checks": [ + {"name": r.name, "passed": r.passed, "level": r.level, "message": r.message} + for r in results + ], + } + + +def _check_invariants(trades: list[dict]) -> list[CheckResult]: + """논리적 불변 조건. 하나라도 위반 시 FAIL.""" + results = [] + + if not trades: + results.append(CheckResult( + "trade_count", True, "FAIL", "트레이드 없음 (검증 스킵)" + )) + return results + + # 1. 청산 시각 >= 진입 시각 (END_OF_DATA는 동일 캔들 가능) + bad_times = [] + for i, t in enumerate(trades): + if pd.Timestamp(t["exit_time"]) < pd.Timestamp(t["entry_time"]): + bad_times.append(i) + passed = len(bad_times) == 0 + results.append(CheckResult( + "exit_after_entry", + passed, + "FAIL", + f"모든 트레이드에서 청산 > 진입" if passed else f"위반 트레이드 인덱스: {bad_times}", + )) + + # 2. SL/TP 방향 정합성 + bad_sltp = [] + for i, t in enumerate(trades): + if t["side"] == "LONG": + if not (t["sl"] < t["entry_price"] < t["tp"]): + bad_sltp.append(i) + else: + if not (t["tp"] < t["entry_price"] < t["sl"]): + bad_sltp.append(i) + passed = len(bad_sltp) == 0 + results.append(CheckResult( + "sl_tp_direction", + passed, + "FAIL", + "SL/TP 방향 정합" if passed else f"위반 트레이드 인덱스: {bad_sltp}", + )) + + # 3. 포지션 비중첩 (같은 심볼에서 직전 청산 ≤ 다음 진입) + by_symbol: dict[str, list[dict]] = {} + for t in trades: + by_symbol.setdefault(t["symbol"], []).append(t) + + overlap_symbols = [] + for sym, sym_trades in by_symbol.items(): + sorted_trades = sorted(sym_trades, key=lambda x: pd.Timestamp(x["entry_time"])) + for j in range(1, len(sorted_trades)): + prev_exit = pd.Timestamp(sorted_trades[j - 1]["exit_time"]) + curr_entry = pd.Timestamp(sorted_trades[j]["entry_time"]) + if prev_exit > curr_entry: + overlap_symbols.append(sym) + break + passed = len(overlap_symbols) == 0 + results.append(CheckResult( + "no_overlap", + passed, + "FAIL", + "포지션 비중첩 확인" if passed else f"중첩 심볼: {overlap_symbols}", + )) + + # 4. 수수료 항상 양수 + bad_fees = [i for i, t in enumerate(trades) if t["entry_fee"] <= 0 or t["exit_fee"] <= 0] + passed = len(bad_fees) == 0 + results.append(CheckResult( + "positive_fees", + passed, + "FAIL", + "수수료 양수 확인" if passed else f"위반 트레이드 인덱스: {bad_fees}", + )) + + # 5. 잔고가 음수가 된 적 없음 + balance = 1000.0 # cfg.initial_balance를 몰라도 trades에서 추적 가능 + min_balance = balance + for t in trades: + balance += t["net_pnl"] + min_balance = min(min_balance, balance) + passed = min_balance >= 0 + results.append(CheckResult( + "no_negative_balance", + passed, + "FAIL", + "잔고 양수 유지" if passed else f"최저 잔고: {min_balance:.4f}", + )) + + return results + + +def _check_statistics(trades: list[dict], summary: dict) -> list[CheckResult]: + """통계적 이상 감지. WARNING 수준.""" + results = [] + + if not trades: + return results + + win_rate = summary.get("win_rate", 0) + mdd = summary.get("max_drawdown_pct", 0) + pf = summary.get("profit_factor", 0) + + # 승률 > 80% + passed = win_rate <= 80 + results.append(CheckResult( + "win_rate_high", + passed, + "WARNING", + f"승률 정상 ({win_rate:.1f}%)" if passed else f"승률 {win_rate:.1f}% > 80% — look-ahead bias 의심", + )) + + # 승률 < 20% + passed = win_rate >= 20 + results.append(CheckResult( + "win_rate_low", + passed, + "WARNING", + f"승률 정상 ({win_rate:.1f}%)" if passed else f"승률 {win_rate:.1f}% < 20% — 신호 로직 반전 의심", + )) + + # MDD 0% + passed = mdd > 0 + results.append(CheckResult( + "mdd_nonzero", + passed, + "WARNING", + f"MDD 정상 ({mdd:.1f}%)" if passed else "MDD 0% — SL 미작동 의심", + )) + + # 월 평균 거래 < 5건 + if len(trades) >= 2: + first = pd.Timestamp(trades[0]["entry_time"]) + last = pd.Timestamp(trades[-1]["entry_time"]) + months = max(1, (last - first).days / 30) + trades_per_month = len(trades) / months + passed = trades_per_month >= 5 + results.append(CheckResult( + "trade_frequency", + passed, + "WARNING", + f"월 평균 {trades_per_month:.1f}건" if passed else f"월 평균 {trades_per_month:.1f}건 < 5건 — 신호 생성 부족", + )) + + # Profit Factor > 5.0 + if pf != float("inf"): + passed = pf <= 5.0 + results.append(CheckResult( + "profit_factor_high", + passed, + "WARNING", + f"PF 정상 ({pf:.2f})" if passed else f"PF {pf:.2f} > 5.0 — 비현실적 수익", + )) + + return results + + +def _print_results(results: list[CheckResult]): + print("\n" + "=" * 60) + print(" BACKTEST SANITY CHECK") + print("=" * 60) + + has_fail = any(not r.passed and r.level == "FAIL" for r in results) + has_warn = any(not r.passed and r.level == "WARNING" for r in results) + + for r in results: + if r.passed: + status = f"{GREEN}PASS{RESET}" + elif r.level == "FAIL": + status = f"{RED}FAIL{RESET}" + else: + status = f"{YELLOW}WARNING{RESET}" + print(f" [{status}] {r.name}: {r.message}") + + print("-" * 60) + if has_fail: + print(f" {RED}RESULT: FAIL — 논리적 불변 조건 위반{RESET}") + elif has_warn: + print(f" {YELLOW}RESULT: WARNING — 수동 확인 필요{RESET}") + else: + print(f" {GREEN}RESULT: ALL PASS{RESET}") + print("=" * 60 + "\n") diff --git a/src/backtester.py b/src/backtester.py new file mode 100644 index 0000000..f0d9c55 --- /dev/null +++ b/src/backtester.py @@ -0,0 +1,837 @@ +""" +독립 백테스트 엔진. +봇 코드(src/bot.py)를 수정하지 않고, 기존 모듈을 재활용하여 +풀 파이프라인(지표 → 시그널 → ML 필터 → 진입/청산)을 동기 루프로 시뮬레이션한다. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass, field, asdict +from datetime import datetime +from pathlib import Path + +import numpy as np +import pandas as pd +from loguru import logger + +import warnings + +import joblib +import lightgbm as lgb + +from src.dataset_builder import ( + _calc_indicators, _calc_signals, _calc_features_vectorized, + generate_dataset_vectorized, stratified_undersample, +) +from src.ml_features import FEATURE_COLS +from src.ml_filter import MLFilter + + +# ── 설정 ───────────────────────────────────────────────────────────── +@dataclass +class BacktestConfig: + symbols: list[str] = field(default_factory=lambda: ["XRPUSDT"]) + start: str | None = None + end: str | None = None + initial_balance: float = 1000.0 + leverage: int = 10 + fee_pct: float = 0.04 # taker 수수료 (%) + slippage_pct: float = 0.01 # 슬리피지 (%) + use_ml: bool = True + ml_threshold: float = 0.55 + # 리스크 + max_daily_loss_pct: float = 0.05 + max_positions: int = 3 + max_same_direction: int = 2 + # 증거금 + margin_max_ratio: float = 0.50 + margin_min_ratio: float = 0.20 + margin_decay_rate: float = 0.0006 + # SL/TP ATR 배수 + atr_sl_mult: float = 2.0 + atr_tp_mult: float = 2.0 + min_notional: float = 5.0 + # 전략 파라미터 + signal_threshold: int = 3 + adx_threshold: float = 25.0 + volume_multiplier: float = 2.5 + + WARMUP = 60 # 지표 안정화에 필요한 캔들 수 + + +# ── 포지션 상태 ────────────────────────────────────────────────────── +@dataclass +class Position: + symbol: str + side: str # "LONG" | "SHORT" + entry_price: float + quantity: float + sl: float + tp: float + entry_time: pd.Timestamp + entry_fee: float + entry_indicators: dict = field(default_factory=dict) + ml_proba: float | None = None + + +# ── 동기 RiskManager ───────────────────────────────────────────────── +class BacktestRiskManager: + def __init__(self, cfg: BacktestConfig): + self.cfg = cfg + self.daily_pnl: float = 0.0 + self.initial_balance: float = cfg.initial_balance + self.base_balance: float = cfg.initial_balance + self.open_positions: dict[str, str] = {} # {symbol: side} + self._current_date: str | None = None + + def new_day(self, date_str: str): + if self._current_date != date_str: + self._current_date = date_str + self.daily_pnl = 0.0 + + def is_trading_allowed(self) -> bool: + if self.initial_balance <= 0: + return True + if self.daily_pnl < 0 and abs(self.daily_pnl) / self.initial_balance >= self.cfg.max_daily_loss_pct: + return False + return True + + def can_open(self, symbol: str, side: str) -> bool: + if len(self.open_positions) >= self.cfg.max_positions: + return False + if symbol in self.open_positions: + return False + same_dir = sum(1 for s in self.open_positions.values() if s == side) + if same_dir >= self.cfg.max_same_direction: + return False + return True + + def register(self, symbol: str, side: str): + self.open_positions[symbol] = side + + def close(self, symbol: str, pnl: float): + self.open_positions.pop(symbol, None) + self.daily_pnl += pnl + + def get_dynamic_margin_ratio(self, balance: float) -> float: + ratio = self.cfg.margin_max_ratio - ( + (balance - self.base_balance) * self.cfg.margin_decay_rate + ) + return max(self.cfg.margin_min_ratio, min(self.cfg.margin_max_ratio, ratio)) + + +# ── 유틸 ───────────────────────────────────────────────────────────── +def _apply_slippage(price: float, side: str, slippage_pct: float) -> float: + """시장가 주문의 슬리피지 적용. BUY는 불리하게(+), SELL은 불리하게(-).""" + factor = slippage_pct / 100.0 + if side == "BUY": + return price * (1 + factor) + return price * (1 - factor) + + +def _calc_fee(price: float, quantity: float, fee_pct: float) -> float: + return price * quantity * fee_pct / 100.0 + + +def _load_data(symbol: str, start: str | None, end: str | None) -> pd.DataFrame: + path = Path(f"data/{symbol.lower()}/combined_15m.parquet") + if not path.exists(): + raise FileNotFoundError(f"데이터 파일 없음: {path}") + df = pd.read_parquet(path) + if "timestamp" in df.columns: + df["timestamp"] = pd.to_datetime(df["timestamp"]) + df = df.set_index("timestamp").sort_index() + elif not isinstance(df.index, pd.DatetimeIndex): + df.index = pd.to_datetime(df.index) + df = df.sort_index() + # tz-aware → tz-naive 통일 (UTC 기준) + if df.index.tz is not None: + df.index = df.index.tz_localize(None) + if start: + df = df[df.index >= pd.Timestamp(start)] + if end: + df = df[df.index <= pd.Timestamp(end)] + return df + + +def _get_ml_proba(ml_filter: MLFilter | None, features: pd.Series) -> float | None: + """ML 확률을 반환. 모델이 없거나 비활성이면 None.""" + if ml_filter is None or not ml_filter.is_model_loaded(): + return None + try: + if ml_filter._onnx_session is not None: + input_name = ml_filter._onnx_session.get_inputs()[0].name + X = features[FEATURE_COLS].values.astype(np.float32).reshape(1, -1) + return float(ml_filter._onnx_session.run(None, {input_name: X})[0][0]) + else: + X = features.to_frame().T + return float(ml_filter._lgbm_model.predict_proba(X)[0][1]) + except Exception: + return None + + +# ── 메인 엔진 ──────────────────────────────────────────────────────── +class Backtester: + def __init__(self, cfg: BacktestConfig): + self.cfg = cfg + self.risk = BacktestRiskManager(cfg) + self.balance = cfg.initial_balance + self.positions: dict[str, Position] = {} # {symbol: Position} + self.trades: list[dict] = [] + self.equity_curve: list[dict] = [] + self._peak_equity: float = cfg.initial_balance + + # ML 필터 (심볼별) + self.ml_filters: dict[str, MLFilter | None] = {} + if cfg.use_ml: + for sym in cfg.symbols: + sym_dir = Path(f"models/{sym.lower()}") + onnx = str(sym_dir / "mlx_filter.weights.onnx") + lgbm = str(sym_dir / "lgbm_filter.pkl") + if not sym_dir.exists(): + onnx = "models/mlx_filter.weights.onnx" + lgbm = "models/lgbm_filter.pkl" + mf = MLFilter(onnx_path=onnx, lgbm_path=lgbm, threshold=cfg.ml_threshold) + self.ml_filters[sym] = mf if mf.is_model_loaded() else None + else: + for sym in cfg.symbols: + self.ml_filters[sym] = None + + def run(self, ml_models: dict[str, object] | None = None) -> dict: + """백테스트 실행. 결과 dict(config, summary, trades, validation) 반환. + + ml_models: walk-forward에서 심볼별 사전 학습 모델을 전달할 때 사용. + {symbol: lgbm_model} 형태. None이면 기존 파일 기반 MLFilter 사용. + """ + # 데이터 로드 + all_data: dict[str, pd.DataFrame] = {} + all_indicators: dict[str, pd.DataFrame] = {} + all_signals: dict[str, np.ndarray] = {} + all_features: dict[str, pd.DataFrame] = {} + + # BTC/ETH 상관 데이터 (있으면 로드) + btc_df = self._try_load_corr("BTCUSDT") + eth_df = self._try_load_corr("ETHUSDT") + + for sym in self.cfg.symbols: + df = _load_data(sym, self.cfg.start, self.cfg.end) + all_data[sym] = df + df_ind = _calc_indicators(df) + all_indicators[sym] = df_ind + sig_arr = _calc_signals( + df_ind, + signal_threshold=self.cfg.signal_threshold, + adx_threshold=self.cfg.adx_threshold, + volume_multiplier=self.cfg.volume_multiplier, + ) + all_signals[sym] = sig_arr + # 벡터화 피처 미리 계산 (학습과 동일한 z-score 적용) + all_features[sym] = _calc_features_vectorized( + df_ind, sig_arr, btc_df=btc_df, eth_df=eth_df, + ) + logger.info(f"[{sym}] 데이터 로드: {len(df):,}캔들 ({df.index[0]} ~ {df.index[-1]})") + + # walk-forward 모델 주입 + if ml_models is not None: + self.ml_filters = {} + for sym in self.cfg.symbols: + if sym in ml_models and ml_models[sym] is not None: + mf = MLFilter.__new__(MLFilter) + mf._disabled = False + mf._onnx_session = None + mf._lgbm_model = ml_models[sym] + mf._threshold = self.cfg.ml_threshold + mf._onnx_path = Path("/dev/null") + mf._lgbm_path = Path("/dev/null") + mf._loaded_onnx_mtime = 0.0 + mf._loaded_lgbm_mtime = 0.0 + self.ml_filters[sym] = mf + else: + self.ml_filters[sym] = None + + # 멀티심볼: 타임스탬프 기준 통합 이벤트 생성 + events = self._build_events(all_indicators, all_signals) + logger.info(f"총 이벤트: {len(events):,}개") + + # 메인 루프 + for ts, sym, candle_idx in events: + date_str = str(ts.date()) + self.risk.new_day(date_str) + + df_ind = all_indicators[sym] + signal = all_signals[sym][candle_idx] + row = df_ind.iloc[candle_idx] + + # 에퀴티 기록 + self._record_equity(ts) + + # 1) 일일 손실 체크 + if not self.risk.is_trading_allowed(): + continue + + # 2) SL/TP 체크 (보유 포지션) + if sym in self.positions: + closed = self._check_sl_tp(sym, row, ts) + if closed: + continue + + # 3) 반대 시그널 재진입 + if sym in self.positions and signal != "HOLD": + pos = self.positions[sym] + if (pos.side == "LONG" and signal == "SHORT") or \ + (pos.side == "SHORT" and signal == "LONG"): + self._close_position(sym, row["close"], ts, "REVERSE_SIGNAL") + # 새 방향으로 재진입 시도 + if self.risk.can_open(sym, signal): + self._try_enter( + sym, signal, df_ind, candle_idx, + all_features[sym], ts=ts, + ) + continue + + # 4) 신규 진입 + if sym not in self.positions and signal != "HOLD": + if self.risk.can_open(sym, signal): + self._try_enter( + sym, signal, df_ind, candle_idx, + all_features[sym], ts=ts, + ) + + # 미청산 포지션 강제 청산 + for sym in list(self.positions.keys()): + last_df = all_indicators[sym] + last_price = last_df["close"].iloc[-1] + last_ts = last_df.index[-1] + self._close_position(sym, last_price, last_ts, "END_OF_DATA") + + return self._build_result() + + def _try_load_corr(self, symbol: str) -> pd.DataFrame | None: + path = Path(f"data/{symbol.lower()}/combined_15m.parquet") + if not path.exists(): + alt = Path(f"data/combined_15m.parquet") + if not alt.exists(): + return None + path = alt + try: + df = pd.read_parquet(path) + if "timestamp" in df.columns: + df["timestamp"] = pd.to_datetime(df["timestamp"]) + df = df.set_index("timestamp").sort_index() + elif not isinstance(df.index, pd.DatetimeIndex): + df.index = pd.to_datetime(df.index) + df = df.sort_index() + if df.index.tz is not None: + df.index = df.index.tz_localize(None) + if self.cfg.start: + df = df[df.index >= pd.Timestamp(self.cfg.start)] + if self.cfg.end: + df = df[df.index <= pd.Timestamp(self.cfg.end)] + return df + except Exception: + return None + + def _build_events( + self, + all_indicators: dict[str, pd.DataFrame], + all_signals: dict[str, np.ndarray], + ) -> list[tuple[pd.Timestamp, str, int]]: + """모든 심볼의 캔들을 타임스탬프 순서로 정렬한 이벤트 리스트 생성.""" + events = [] + for sym, df_ind in all_indicators.items(): + for i in range(self.cfg.WARMUP, len(df_ind)): + ts = df_ind.index[i] + events.append((ts, sym, i)) + events.sort(key=lambda x: (x[0], x[1])) + return events + + def _check_sl_tp(self, symbol: str, row: pd.Series, ts: pd.Timestamp) -> bool: + """캔들의 고가/저가로 SL/TP 체크. SL 우선. 청산 시 True 반환.""" + pos = self.positions[symbol] + high = row["high"] + low = row["low"] + + if pos.side == "LONG": + # SL 먼저 (보수적) + if low <= pos.sl: + self._close_position(symbol, pos.sl, ts, "STOP_LOSS") + return True + if high >= pos.tp: + self._close_position(symbol, pos.tp, ts, "TAKE_PROFIT") + return True + else: # SHORT + if high >= pos.sl: + self._close_position(symbol, pos.sl, ts, "STOP_LOSS") + return True + if low <= pos.tp: + self._close_position(symbol, pos.tp, ts, "TAKE_PROFIT") + return True + return False + + def _try_enter( + self, + symbol: str, + signal: str, + df_ind: pd.DataFrame, + candle_idx: int, + feat_df: pd.DataFrame, + ts: pd.Timestamp, + ): + """ML 필터 + 포지션 크기 계산 → 진입.""" + row = df_ind.iloc[candle_idx] + + # 벡터화된 피처에서 해당 행을 lookup (학습과 동일한 z-score 적용) + available_cols = [c for c in FEATURE_COLS if c in feat_df.columns] + features = feat_df.iloc[candle_idx][available_cols] + + # ML 필터 + ml_filter = self.ml_filters.get(symbol) + ml_proba = _get_ml_proba(ml_filter, features) + + if ml_filter is not None and ml_filter.is_model_loaded(): + if ml_proba is not None and ml_proba < self.cfg.ml_threshold: + return # ML 차단 + + # 포지션 크기 계산 + num_symbols = len(self.cfg.symbols) + per_symbol_balance = self.balance / num_symbols + price = float(row["close"]) + margin_ratio = self.risk.get_dynamic_margin_ratio(self.balance) + notional = per_symbol_balance * margin_ratio * self.cfg.leverage + if notional < self.cfg.min_notional: + notional = self.cfg.min_notional + quantity = round(notional / price, 1) + if quantity * price < self.cfg.min_notional: + quantity = round(self.cfg.min_notional / price + 0.05, 1) + if quantity <= 0 or quantity * price < self.cfg.min_notional: + return + + # 슬리피지 적용 (시장가 진입) + buy_side = "BUY" if signal == "LONG" else "SELL" + entry_price = _apply_slippage(price, buy_side, self.cfg.slippage_pct) + + # 수수료 + entry_fee = _calc_fee(entry_price, quantity, self.cfg.fee_pct) + self.balance -= entry_fee + + # SL/TP 계산 + atr = float(row.get("atr", 0)) + if atr <= 0: + return + if signal == "LONG": + sl = entry_price - atr * self.cfg.atr_sl_mult + tp = entry_price + atr * self.cfg.atr_tp_mult + else: + sl = entry_price + atr * self.cfg.atr_sl_mult + tp = entry_price - atr * self.cfg.atr_tp_mult + + indicators_snapshot = { + "rsi": float(row.get("rsi", 0)), + "macd_hist": float(row.get("macd_hist", 0)), + "atr": float(atr), + "adx": float(row.get("adx", 0)), + } + + pos = Position( + symbol=symbol, + side=signal, + entry_price=entry_price, + quantity=quantity, + sl=sl, + tp=tp, + entry_time=ts, + entry_fee=entry_fee, + entry_indicators=indicators_snapshot, + ml_proba=ml_proba, + ) + self.positions[symbol] = pos + self.risk.register(symbol, signal) + + def _close_position( + self, symbol: str, exit_price: float, ts: pd.Timestamp, reason: str + ): + pos = self.positions.pop(symbol) + + # SL/TP 히트는 지정가이므로 슬리피지 없음. 그 외는 시장가. + if reason in ("REVERSE_SIGNAL", "END_OF_DATA"): + close_side = "SELL" if pos.side == "LONG" else "BUY" + exit_price = _apply_slippage(exit_price, close_side, self.cfg.slippage_pct) + + exit_fee = _calc_fee(exit_price, pos.quantity, self.cfg.fee_pct) + + if pos.side == "LONG": + gross_pnl = (exit_price - pos.entry_price) * pos.quantity + else: + gross_pnl = (pos.entry_price - exit_price) * pos.quantity + + net_pnl = gross_pnl - pos.entry_fee - exit_fee + self.balance += net_pnl + self.risk.close(symbol, net_pnl) + + trade = { + "symbol": symbol, + "side": pos.side, + "entry_time": str(pos.entry_time), + "exit_time": str(ts), + "entry_price": round(pos.entry_price, 6), + "exit_price": round(exit_price, 6), + "quantity": pos.quantity, + "sl": round(pos.sl, 6), + "tp": round(pos.tp, 6), + "gross_pnl": round(gross_pnl, 6), + "entry_fee": round(pos.entry_fee, 6), + "exit_fee": round(exit_fee, 6), + "net_pnl": round(net_pnl, 6), + "close_reason": reason, + "ml_proba": round(pos.ml_proba, 4) if pos.ml_proba is not None else None, + "indicators": pos.entry_indicators, + } + self.trades.append(trade) + + def _record_equity(self, ts: pd.Timestamp): + # 미실현 PnL 포함 에퀴티 + unrealized = 0.0 + for pos in self.positions.values(): + # 에퀴티 기록 시점에는 현재가를 알 수 없으므로 entry_price 기준으로 0 처리 + pass + equity = self.balance + unrealized + self.equity_curve.append({"timestamp": str(ts), "equity": round(equity, 4)}) + if equity > self._peak_equity: + self._peak_equity = equity + + def _build_result(self) -> dict: + summary = self._calc_summary() + from src.backtest_validator import validate + validation = validate(self.trades, summary, self.cfg) + return { + "config": asdict(self.cfg), + "summary": summary, + "trades": self.trades, + "validation": validation, + } + + def _calc_summary(self) -> dict: + if not self.trades: + return { + "total_trades": 0, + "total_pnl": 0.0, + "return_pct": 0.0, + "win_rate": 0.0, + "avg_win": 0.0, + "avg_loss": 0.0, + "profit_factor": 0.0, + "max_drawdown_pct": 0.0, + "sharpe_ratio": 0.0, + "total_fees": 0.0, + "close_reasons": {}, + } + + pnls = [t["net_pnl"] for t in self.trades] + wins = [p for p in pnls if p > 0] + losses = [p for p in pnls if p <= 0] + + total_pnl = sum(pnls) + total_fees = sum(t["entry_fee"] + t["exit_fee"] for t in self.trades) + gross_profit = sum(wins) if wins else 0.0 + gross_loss = abs(sum(losses)) if losses else 0.0 + + # MDD 계산 + cumulative = np.cumsum(pnls) + equity = self.cfg.initial_balance + cumulative + peak = np.maximum.accumulate(equity) + drawdown = (peak - equity) / peak + mdd = float(np.max(drawdown)) * 100 if len(drawdown) > 0 else 0.0 + + # 샤프비율 (연율화, 15분봉 기준: 252일 * 96봉 = 24192) + if len(pnls) > 1: + pnl_arr = np.array(pnls) + sharpe = float(np.mean(pnl_arr) / np.std(pnl_arr) * np.sqrt(24192)) if np.std(pnl_arr) > 0 else 0.0 + else: + sharpe = 0.0 + + # 청산 사유별 비율 + reasons = {} + for t in self.trades: + r = t["close_reason"] + reasons[r] = reasons.get(r, 0) + 1 + + return { + "total_trades": len(self.trades), + "total_pnl": round(total_pnl, 4), + "return_pct": round(total_pnl / self.cfg.initial_balance * 100, 2), + "win_rate": round(len(wins) / len(self.trades) * 100, 2) if self.trades else 0.0, + "avg_win": round(np.mean(wins), 4) if wins else 0.0, + "avg_loss": round(np.mean(losses), 4) if losses else 0.0, + "profit_factor": round(gross_profit / gross_loss, 2) if gross_loss > 0 else float("inf"), + "max_drawdown_pct": round(mdd, 2), + "sharpe_ratio": round(sharpe, 2), + "total_fees": round(total_fees, 4), + "close_reasons": reasons, + } + + +# ── Walk-Forward 백테스트 ───────────────────────────────────────────── +@dataclass +class WalkForwardConfig(BacktestConfig): + train_months: int = 6 # 학습 윈도우 (개월) + test_months: int = 1 # 검증 윈도우 (개월) + time_weight_decay: float = 2.0 + negative_ratio: int = 5 + + +class WalkForwardBacktester: + """ + Walk-Forward 백테스트: 기간별로 모델을 학습하고 미래 데이터에서만 검증한다. + look-ahead bias를 완전히 제거한다. + """ + + def __init__(self, cfg: WalkForwardConfig): + self.cfg = cfg + + def run(self) -> dict: + # 데이터 로드 (전체 기간) + all_raw: dict[str, pd.DataFrame] = {} + for sym in self.cfg.symbols: + all_raw[sym] = _load_data(sym, self.cfg.start, self.cfg.end) + + # 윈도우 생성 + windows = self._build_windows(all_raw) + logger.info(f"Walk-Forward: {len(windows)}개 윈도우 " + f"(학습 {self.cfg.train_months}개월, 검증 {self.cfg.test_months}개월)") + + all_trades = [] + fold_summaries = [] + + for i, (train_start, train_end, test_start, test_end) in enumerate(windows): + logger.info(f" 폴드 {i+1}/{len(windows)}: " + f"학습 {train_start.date()}~{train_end.date()}, " + f"검증 {test_start.date()}~{test_end.date()}") + + # 심볼별 모델 학습 + models = {} + for sym in self.cfg.symbols: + model = self._train_model( + all_raw[sym], train_start, train_end, sym + ) + models[sym] = model + + # 검증 구간 백테스트 + test_cfg = BacktestConfig( + symbols=self.cfg.symbols, + start=str(test_start.date()), + end=str(test_end.date()), + initial_balance=self.cfg.initial_balance, + leverage=self.cfg.leverage, + fee_pct=self.cfg.fee_pct, + slippage_pct=self.cfg.slippage_pct, + use_ml=self.cfg.use_ml, + ml_threshold=self.cfg.ml_threshold, + max_daily_loss_pct=self.cfg.max_daily_loss_pct, + max_positions=self.cfg.max_positions, + max_same_direction=self.cfg.max_same_direction, + margin_max_ratio=self.cfg.margin_max_ratio, + margin_min_ratio=self.cfg.margin_min_ratio, + margin_decay_rate=self.cfg.margin_decay_rate, + atr_sl_mult=self.cfg.atr_sl_mult, + atr_tp_mult=self.cfg.atr_tp_mult, + min_notional=self.cfg.min_notional, + signal_threshold=self.cfg.signal_threshold, + adx_threshold=self.cfg.adx_threshold, + volume_multiplier=self.cfg.volume_multiplier, + ) + bt = Backtester(test_cfg) + result = bt.run(ml_models=models) + + # 폴드별 트레이드에 폴드 번호 추가 + for t in result["trades"]: + t["fold"] = i + 1 + all_trades.extend(result["trades"]) + + fold_summaries.append({ + "fold": i + 1, + "train_period": f"{train_start.date()} ~ {train_end.date()}", + "test_period": f"{test_start.date()} ~ {test_end.date()}", + "summary": result["summary"], + }) + + # 전체 결과 집계 + return self._aggregate_results(all_trades, fold_summaries) + + def _build_windows( + self, all_raw: dict[str, pd.DataFrame] + ) -> list[tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]: + # 모든 심볼의 공통 기간 + start = max(df.index[0] for df in all_raw.values()) + end = min(df.index[-1] for df in all_raw.values()) + + train_delta = pd.DateOffset(months=self.cfg.train_months) + test_delta = pd.DateOffset(months=self.cfg.test_months) + + windows = [] + cursor = start + while cursor + train_delta + test_delta <= end: + train_start = cursor + train_end = cursor + train_delta + test_start = train_end + test_end = test_start + test_delta + windows.append((train_start, train_end, test_start, test_end)) + cursor = test_start # 슬라이딩 (겹침 없음) + + return windows + + def _train_model( + self, + raw_df: pd.DataFrame, + train_start: pd.Timestamp, + train_end: pd.Timestamp, + symbol: str, + ) -> object | None: + """학습 구간 데이터로 LightGBM 모델 학습. 실패 시 None 반환.""" + # tz-naive로 비교 + ts_start = train_start.tz_localize(None) if train_start.tz else train_start + ts_end = train_end.tz_localize(None) if train_end.tz else train_end + idx = raw_df.index + if idx.tz is not None: + idx = idx.tz_localize(None) + train_df = raw_df[(idx >= ts_start) & (idx < ts_end)] + if len(train_df) < 200: + logger.warning(f" [{symbol}] 학습 데이터 부족: {len(train_df)}캔들") + return None + + base_cols = ["open", "high", "low", "close", "volume"] + df = train_df[base_cols].copy() + + # BTC/ETH 상관 데이터 (있으면) + btc_df = eth_df = None + if "close_btc" in train_df.columns: + btc_df = train_df[[c + "_btc" for c in base_cols]].copy() + btc_df.columns = base_cols + if "close_eth" in train_df.columns: + eth_df = train_df[[c + "_eth" for c in base_cols]].copy() + eth_df.columns = base_cols + + try: + dataset = generate_dataset_vectorized( + df, btc_df=btc_df, eth_df=eth_df, + time_weight_decay=self.cfg.time_weight_decay, + negative_ratio=self.cfg.negative_ratio, + signal_threshold=self.cfg.signal_threshold, + adx_threshold=self.cfg.adx_threshold, + volume_multiplier=self.cfg.volume_multiplier, + ) + except Exception as e: + logger.warning(f" [{symbol}] 데이터셋 생성 실패: {e}") + return None + + if dataset.empty or "label" not in dataset.columns: + return None + + actual_cols = [c for c in FEATURE_COLS if c in dataset.columns] + X = dataset[actual_cols].values + y = dataset["label"].values + w = dataset["sample_weight"].values + source = dataset["source"].values if "source" in dataset.columns else np.full(len(X), "signal") + + # 언더샘플링 + idx = stratified_undersample(y, source, seed=42) + + # LightGBM 파라미터 (active 파일 또는 기본값) + lgbm_params = self._load_params(symbol) + + model = lgb.LGBMClassifier(**lgbm_params, random_state=42, verbose=-1) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model.fit(X[idx], y[idx], sample_weight=w[idx]) + + return model + + def _load_params(self, symbol: str) -> dict: + """심볼별 active 파라미터 로드. 없으면 기본값.""" + params_path = Path(f"models/{symbol.lower()}/active_lgbm_params.json") + if not params_path.exists(): + params_path = Path("models/active_lgbm_params.json") + + default = { + "n_estimators": 434, + "learning_rate": 0.123659, + "max_depth": 6, + "num_leaves": 14, + "min_child_samples": 10, + "subsample": 0.929062, + "colsample_bytree": 0.946330, + "reg_alpha": 0.573971, + "reg_lambda": 0.000157, + } + + if params_path.exists(): + import json + with open(params_path) as f: + data = json.load(f) + best = dict(data["best_trial"]["params"]) + best.pop("weight_scale", None) + default.update(best) + + return default + + def _aggregate_results( + self, all_trades: list[dict], fold_summaries: list[dict] + ) -> dict: + """폴드별 결과를 합산하여 전체 Walk-Forward 결과 생성.""" + from src.backtest_validator import validate + + # 전체 통계 계산 + if not all_trades: + summary = {"total_trades": 0, "total_pnl": 0.0, "return_pct": 0.0, + "win_rate": 0.0, "avg_win": 0.0, "avg_loss": 0.0, + "profit_factor": 0.0, "max_drawdown_pct": 0.0, + "sharpe_ratio": 0.0, "total_fees": 0.0, "close_reasons": {}} + else: + pnls = [t["net_pnl"] for t in all_trades] + wins = [p for p in pnls if p > 0] + losses = [p for p in pnls if p <= 0] + total_pnl = sum(pnls) + total_fees = sum(t["entry_fee"] + t["exit_fee"] for t in all_trades) + gross_profit = sum(wins) if wins else 0.0 + gross_loss = abs(sum(losses)) if losses else 0.0 + + cumulative = np.cumsum(pnls) + equity = self.cfg.initial_balance + cumulative + peak = np.maximum.accumulate(equity) + drawdown = (peak - equity) / peak + mdd = float(np.max(drawdown)) * 100 if len(drawdown) > 0 else 0.0 + + if len(pnls) > 1: + pnl_arr = np.array(pnls) + sharpe = float(np.mean(pnl_arr) / np.std(pnl_arr) * np.sqrt(24192)) if np.std(pnl_arr) > 0 else 0.0 + else: + sharpe = 0.0 + + reasons = {} + for t in all_trades: + r = t["close_reason"] + reasons[r] = reasons.get(r, 0) + 1 + + summary = { + "total_trades": len(all_trades), + "total_pnl": round(total_pnl, 4), + "return_pct": round(total_pnl / self.cfg.initial_balance * 100, 2), + "win_rate": round(len(wins) / len(all_trades) * 100, 2), + "avg_win": round(np.mean(wins), 4) if wins else 0.0, + "avg_loss": round(np.mean(losses), 4) if losses else 0.0, + "profit_factor": round(gross_profit / gross_loss, 2) if gross_loss > 0 else float("inf"), + "max_drawdown_pct": round(mdd, 2), + "sharpe_ratio": round(sharpe, 2), + "total_fees": round(total_fees, 4), + "close_reasons": reasons, + } + + validation = validate(all_trades, summary, self.cfg) + + return { + "mode": "walk_forward", + "config": asdict(self.cfg), + "summary": summary, + "folds": fold_summaries, + "trades": all_trades, + "validation": validation, + } diff --git a/src/bot.py b/src/bot.py index 98a5290..100d84d 100644 --- a/src/bot.py +++ b/src/bot.py @@ -10,7 +10,7 @@ from src.data_stream import MultiSymbolStream from src.notifier import DiscordNotifier from src.risk_manager import RiskManager from src.ml_filter import MLFilter -from src.ml_features import build_features +from src.ml_features import build_features_aligned from src.user_data_stream import UserDataStream @@ -139,7 +139,12 @@ class TradingBot: ind = Indicators(df) df_with_indicators = ind.calculate_all() - raw_signal = ind.get_signal(df_with_indicators) + raw_signal = ind.get_signal( + df_with_indicators, + signal_threshold=self.config.signal_threshold, + adx_threshold=self.config.adx_threshold, + volume_multiplier=self.config.volume_multiplier, + ) current_price = df_with_indicators["close"].iloc[-1] logger.info(f"[{self.symbol}] 신호: {raw_signal} | 현재가: {current_price:.4f} USDT") @@ -152,7 +157,7 @@ class TradingBot: logger.info(f"[{self.symbol}] 포지션 오픈 불가") return signal = raw_signal - features = build_features( + features = build_features_aligned( df_with_indicators, signal, btc_df=btc_df, eth_df=eth_df, oi_change=oi_change, funding_rate=funding_rate, @@ -185,7 +190,11 @@ class TradingBot: balance=per_symbol_balance, price=price, leverage=self.config.leverage, margin_ratio=margin_ratio ) logger.info(f"[{self.symbol}] 포지션 크기: 잔고={per_symbol_balance:.2f}/{balance:.2f} USDT, 증거금비율={margin_ratio:.1%}, 수량={quantity}") - stop_loss, take_profit = Indicators(df).get_atr_stop(df, signal, price) + stop_loss, take_profit = Indicators(df).get_atr_stop( + df, signal, price, + atr_sl_mult=self.config.atr_sl_mult, + atr_tp_mult=self.config.atr_tp_mult, + ) notional = quantity * price if quantity <= 0 or notional < self.exchange.MIN_NOTIONAL: @@ -339,7 +348,7 @@ class TradingBot: return if self.ml_filter.is_model_loaded(): - features = build_features( + features = build_features_aligned( df, signal, btc_df=btc_df, eth_df=eth_df, oi_change=oi_change, funding_rate=funding_rate, diff --git a/src/config.py b/src/config.py index 3f9b477..dde160a 100644 --- a/src/config.py +++ b/src/config.py @@ -23,6 +23,11 @@ class Config: margin_min_ratio: float = 0.20 margin_decay_rate: float = 0.0006 ml_threshold: float = 0.55 + atr_sl_mult: float = 2.0 + atr_tp_mult: float = 2.0 + signal_threshold: int = 3 + adx_threshold: float = 25.0 + volume_multiplier: float = 2.5 def __post_init__(self): self.api_key = os.getenv("BINANCE_API_KEY", "") @@ -35,6 +40,11 @@ class Config: self.margin_decay_rate = float(os.getenv("MARGIN_DECAY_RATE", "0.0006")) self.ml_threshold = float(os.getenv("ML_THRESHOLD", "0.55")) self.max_same_direction = int(os.getenv("MAX_SAME_DIRECTION", "2")) + self.atr_sl_mult = float(os.getenv("ATR_SL_MULT", "2.0")) + self.atr_tp_mult = float(os.getenv("ATR_TP_MULT", "2.0")) + self.signal_threshold = int(os.getenv("SIGNAL_THRESHOLD", "3")) + self.adx_threshold = float(os.getenv("ADX_THRESHOLD", "25")) + self.volume_multiplier = float(os.getenv("VOL_MULTIPLIER", "2.5")) # symbols: SYMBOLS 환경변수 우선, 없으면 SYMBOL에서 변환 symbols_env = os.getenv("SYMBOLS", "") diff --git a/src/dataset_builder.py b/src/dataset_builder.py index 7d9f517..30344a9 100644 --- a/src/dataset_builder.py +++ b/src/dataset_builder.py @@ -54,10 +54,19 @@ def _calc_indicators(df: pd.DataFrame) -> pd.DataFrame: return d -def _calc_signals(d: pd.DataFrame) -> np.ndarray: +def _calc_signals( + d: pd.DataFrame, + signal_threshold: int = 3, + adx_threshold: float = 25, + volume_multiplier: float = 2.5, +) -> np.ndarray: """ indicators.py get_signal() 로직을 numpy 배열 연산으로 재현한다. 반환: signal_arr — 각 행에 대해 "LONG" | "SHORT" | "HOLD" + + signal_threshold: 최소 가중치 합계 (기본 3) + adx_threshold: ADX 최소값 필터 (0=비활성화) + volume_multiplier: 거래량 급증 배수 (기본 1.5) """ n = len(d) @@ -105,10 +114,11 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray: short_score += ((stoch_k > 80) & (stoch_k < stoch_d)).astype(np.float32) # 6. 거래량 급증 - vol_surge = volume > vol_ma20 * 1.5 + vol_surge = volume > vol_ma20 * volume_multiplier - long_enter = (long_score >= 3) & (vol_surge | (long_score >= 4)) - short_enter = (short_score >= 3) & (vol_surge | (short_score >= 4)) + thr = signal_threshold + long_enter = (long_score >= thr) & (vol_surge | (long_score >= thr + 1)) + short_enter = (short_score >= thr) & (vol_surge | (short_score >= thr + 1)) signal_arr = np.full(n, "HOLD", dtype=object) signal_arr[long_enter] = "LONG" @@ -116,6 +126,12 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray: # 둘 다 해당하면 HOLD (충돌 방지) signal_arr[long_enter & short_enter] = "HOLD" + # ADX 필터 + if adx_threshold > 0 and "adx" in d.columns: + adx_vals = d["adx"].values + low_adx = adx_vals < adx_threshold + signal_arr[low_adx] = "HOLD" + return signal_arr @@ -372,6 +388,9 @@ def generate_dataset_vectorized( eth_df: pd.DataFrame | None = None, time_weight_decay: float = 0.0, negative_ratio: int = 0, + signal_threshold: int = 3, + adx_threshold: float = 25, + volume_multiplier: float = 2.5, ) -> pd.DataFrame: """ 전체 시계열을 1회 계산해 학습 데이터셋을 생성한다. @@ -390,7 +409,12 @@ def generate_dataset_vectorized( d = _calc_indicators(df) print(" [2/3] 신호 마스킹 및 피처 추출...") - signal_arr = _calc_signals(d) + signal_arr = _calc_signals( + d, + signal_threshold=signal_threshold, + adx_threshold=adx_threshold, + volume_multiplier=volume_multiplier, + ) feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df) # 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만 diff --git a/src/indicators.py b/src/indicators.py index c3b5c70..b67cddb 100644 --- a/src/indicators.py +++ b/src/indicators.py @@ -52,18 +52,29 @@ class Indicators: return df - def get_signal(self, df: pd.DataFrame) -> str: + def get_signal( + self, + df: pd.DataFrame, + signal_threshold: int = 3, + adx_threshold: float = 25, + volume_multiplier: float = 2.5, + ) -> str: """ 복합 지표 기반 매매 신호 생성. - 공격적 전략: 3개 이상 지표 일치 시 진입. + + signal_threshold: 최소 가중치 합계 (기본 3) + adx_threshold: ADX 최소값 필터 (0=비활성화, 25=ADX<25이면 HOLD) + volume_multiplier: 거래량 급증 배수 (기본 1.5) """ last = df.iloc[-1] prev = df.iloc[-2] - # ADX 로깅 (ML 피처로 위임, 하드필터 제거) + # ADX 필터 adx = last.get("adx", None) if adx is not None and not pd.isna(adx): logger.debug(f"ADX: {adx:.1f}") + if adx_threshold > 0 and adx < adx_threshold: + return "HOLD" long_signals = 0 short_signals = 0 @@ -99,22 +110,22 @@ class Indicators: short_signals += 1 # 6. 거래량 확인 (신호 강화) - vol_surge = last["volume"] > last["vol_ma20"] * 1.5 + vol_surge = last["volume"] > last["vol_ma20"] * volume_multiplier - threshold = 3 - if long_signals >= threshold and (vol_surge or long_signals >= 4): + if long_signals >= signal_threshold and (vol_surge or long_signals >= signal_threshold + 1): return "LONG" - elif short_signals >= threshold and (vol_surge or short_signals >= 4): + elif short_signals >= signal_threshold and (vol_surge or short_signals >= signal_threshold + 1): return "SHORT" return "HOLD" def get_atr_stop( - self, df: pd.DataFrame, side: str, entry_price: float + self, df: pd.DataFrame, side: str, entry_price: float, + atr_sl_mult: float = 2.0, atr_tp_mult: float = 2.0, ) -> tuple[float, float]: """ATR 기반 손절/익절 가격 반환 (stop_loss, take_profit)""" atr = df["atr"].iloc[-1] - multiplier_sl = 1.5 - multiplier_tp = 3.0 + multiplier_sl = atr_sl_mult + multiplier_tp = atr_tp_mult if side == "LONG": stop_loss = entry_price - atr * multiplier_sl take_profit = entry_price + atr * multiplier_tp diff --git a/src/ml_features.py b/src/ml_features.py index a61073c..e51bf3b 100644 --- a/src/ml_features.py +++ b/src/ml_features.py @@ -15,6 +15,10 @@ FEATURE_COLS = [ "adx", ] +# rolling z-score 윈도우 (학습과 동일) +_ZSCORE_WINDOW = 288 # 일반 피처: 15분봉 × 288 = 3일 +_ZSCORE_WINDOW_OI = 96 # OI/펀딩비: 15분봉 × 96 = 1일 + def _calc_ret(closes: pd.Series, n: int) -> float: """n캔들 전 대비 수익률. 데이터 부족 시 0.0.""" @@ -31,6 +35,18 @@ def _calc_rs(xrp_ret: float, other_ret: float) -> float: return xrp_ret / other_ret +def _rolling_zscore_last(arr: np.ndarray, window: int = _ZSCORE_WINDOW) -> float: + """배열의 마지막 값에 대한 rolling z-score를 반환한다. + 학습(dataset_builder._rolling_zscore)과 동일한 로직.""" + s = pd.Series(arr, dtype=np.float64) + r = s.rolling(window=window, min_periods=1) + mean = r.mean().iloc[-1] + std = r.std(ddof=0).iloc[-1] + if std < 1e-8: + std = 1e-8 + return float((s.iloc[-1] - mean) / std) + + def build_features( df: pd.DataFrame, signal: str, @@ -42,10 +58,8 @@ def build_features( oi_price_spread: float | None = None, ) -> pd.Series: """ - 기술 지표가 계산된 DataFrame의 마지막 행에서 ML 피처를 추출한다. - btc_df, eth_df가 제공되면 26개 피처를, 없으면 18개 피처를 반환한다. - signal: "LONG" | "SHORT" - oi_change, funding_rate, oi_change_ma5, oi_price_spread: 실제 값이 제공되면 사용, 없으면 0.0으로 채운다. + [Deprecated] raw 값 기반 피처. 하위 호환용으로 유지. + 신규 코드는 build_features_aligned()를 사용할 것. """ last = df.iloc[-1] close = last["close"] @@ -142,3 +156,154 @@ def build_features( base["adx"] = float(last.get("adx", 0)) return pd.Series(base) + + +def build_features_aligned( + df: pd.DataFrame, + signal: str, + btc_df: pd.DataFrame | None = None, + eth_df: pd.DataFrame | None = None, + oi_change: float | None = None, + funding_rate: float | None = None, + oi_change_ma5: float | None = None, + oi_price_spread: float | None = None, +) -> pd.Series: + """ + 학습(dataset_builder._calc_features_vectorized)과 동일한 rolling z-score를 + 적용한 피처를 반환한다. train-serve skew를 방지한다. + + df: 지표가 이미 계산된 DataFrame (최소 60캔들 이상) + signal: "LONG" | "SHORT" + """ + last = df.iloc[-1] + close_series = df["close"] + close = float(close_series.iloc[-1]) + + # --- raw 값 계산 (z-score 전) --- + bb_upper = df["bb_upper"] if "bb_upper" in df.columns else pd.Series(close, index=df.index) + bb_lower = df["bb_lower"] if "bb_lower" in df.columns else pd.Series(close, index=df.index) + bb_range = bb_upper - bb_lower + bb_pct_series = (close_series - bb_lower) / (bb_range + 1e-8) + + ema9 = df.get("ema9", close_series) + ema21 = df.get("ema21", close_series) + ema50 = df.get("ema50", close_series) + + ema_align_arr = np.where( + (ema9 > ema21) & (ema21 > ema50), 1, + np.where((ema9 < ema21) & (ema21 < ema50), -1, 0) + ).astype(np.float32) + + atr_series = df["atr"] if "atr" in df.columns else pd.Series(0.0, index=df.index) + atr_pct_arr = (atr_series / (close_series + 1e-8)).values + + volume = df["volume"] + vol_ma20 = df["vol_ma20"] if "vol_ma20" in df.columns else pd.Series(1.0, index=df.index) + vol_ratio_arr = (volume / (vol_ma20 + 1e-8)).values + + ret_1_arr = close_series.pct_change(1).fillna(0).values + ret_3_arr = close_series.pct_change(3).fillna(0).values + ret_5_arr = close_series.pct_change(5).fillna(0).values + + # z-score 적용 (학습과 동일) + atr_pct_z = _rolling_zscore_last(atr_pct_arr) + vol_ratio_z = _rolling_zscore_last(vol_ratio_arr) + ret_1_z = _rolling_zscore_last(ret_1_arr) + ret_3_z = _rolling_zscore_last(ret_3_arr) + ret_5_z = _rolling_zscore_last(ret_5_arr) + + # signal_strength + rsi = float(last.get("rsi", 50)) + macd_val = float(last.get("macd", 0)) + macd_sig_val = float(last.get("macd_signal", 0)) + stoch_k = float(last.get("stoch_k", 50)) + stoch_d = float(last.get("stoch_d", 50)) + prev = df.iloc[-2] if len(df) >= 2 else last + prev_macd = float(prev.get("macd", 0)) + prev_macd_sig = float(prev.get("macd_signal", 0)) + + strength = 0 + if signal == "LONG": + if rsi < 35: strength += 1 + if prev_macd < prev_macd_sig and macd_val > macd_sig_val: strength += 2 + if close < float(last.get("bb_lower", close)): strength += 1 + if ema_align_arr[-1] == 1: strength += 1 + if stoch_k < 20 and stoch_k > stoch_d: strength += 1 + else: + if rsi > 65: strength += 1 + if prev_macd > prev_macd_sig and macd_val < macd_sig_val: strength += 2 + if close > float(last.get("bb_upper", close)): strength += 1 + if ema_align_arr[-1] == -1: strength += 1 + if stoch_k > 80 and stoch_k < stoch_d: strength += 1 + + # ADX z-score + adx_arr = df["adx"].values.astype(np.float64) if "adx" in df.columns else np.zeros(len(df)) + adx_z = _rolling_zscore_last(adx_arr) + + base = { + "rsi": rsi, + "macd_hist": float(last.get("macd_hist", 0)), + "bb_pct": float(bb_pct_series.iloc[-1]), + "ema_align": float(ema_align_arr[-1]), + "stoch_k": stoch_k, + "stoch_d": stoch_d, + "atr_pct": atr_pct_z, + "vol_ratio": vol_ratio_z, + "ret_1": ret_1_z, + "ret_3": ret_3_z, + "ret_5": ret_5_z, + "signal_strength": float(strength), + "side": 1.0 if signal == "LONG" else 0.0, + } + + # BTC/ETH 상관 피처 (z-score) + if btc_df is not None and eth_df is not None: + btc_r1 = btc_df["close"].pct_change(1).fillna(0).values + btc_r3 = btc_df["close"].pct_change(3).fillna(0).values + btc_r5 = btc_df["close"].pct_change(5).fillna(0).values + eth_r1 = eth_df["close"].pct_change(1).fillna(0).values + eth_r3 = eth_df["close"].pct_change(3).fillna(0).values + eth_r5 = eth_df["close"].pct_change(5).fillna(0).values + + # 길이 맞춤 (btc/eth가 더 길 수 있음) + n = len(df) + def _align(arr): + if len(arr) >= n: + return arr[-n:] + return np.concatenate([np.zeros(n - len(arr)), arr]) + + btc_r1 = _align(btc_r1) + btc_r3 = _align(btc_r3) + btc_r5 = _align(btc_r5) + eth_r1 = _align(eth_r1) + eth_r3 = _align(eth_r3) + eth_r5 = _align(eth_r5) + + # 상대강도 (raw → z-score) + xrp_r1 = ret_1_arr.astype(np.float32) + btc_r1_f = btc_r1.astype(np.float32) + eth_r1_f = eth_r1.astype(np.float32) + rs_btc = np.divide(xrp_r1, btc_r1_f, out=np.zeros_like(xrp_r1), where=(btc_r1_f != 0)) + rs_eth = np.divide(xrp_r1, eth_r1_f, out=np.zeros_like(xrp_r1), where=(eth_r1_f != 0)) + + base.update({ + "btc_ret_1": _rolling_zscore_last(btc_r1), + "btc_ret_3": _rolling_zscore_last(btc_r3), + "btc_ret_5": _rolling_zscore_last(btc_r5), + "eth_ret_1": _rolling_zscore_last(eth_r1), + "eth_ret_3": _rolling_zscore_last(eth_r3), + "eth_ret_5": _rolling_zscore_last(eth_r5), + "xrp_btc_rs": _rolling_zscore_last(rs_btc), + "xrp_eth_rs": _rolling_zscore_last(rs_eth), + }) + + # OI/펀딩비 z-score (실시간 값이 제공되면 히스토리 끝에 추가하여 z-score) + # 서빙 시 OI/펀딩비 히스토리가 없으므로 단일 값 → z-score 불가, NaN 처리 + # LightGBM은 NaN을 자체 처리함 + base["oi_change"] = float(oi_change) if oi_change is not None else np.nan + base["funding_rate"] = float(funding_rate) if funding_rate is not None else np.nan + base["oi_change_ma5"] = float(oi_change_ma5) if oi_change_ma5 is not None else np.nan + base["oi_price_spread"] = float(oi_price_spread) if oi_price_spread is not None else np.nan + base["adx"] = adx_z + + return pd.Series(base) diff --git a/tests/test_bot.py b/tests/test_bot.py index e9846ba..07de871 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -246,7 +246,7 @@ async def test_process_candle_fetches_oi_and_funding(config, sample_df): mock_ind.get_signal.return_value = "LONG" mock_ind_cls.return_value = mock_ind - with patch("src.bot.build_features") as mock_build: + with patch("src.bot.build_features_aligned") as mock_build: from src.ml_features import FEATURE_COLS mock_build.return_value = pd.Series({col: 0.0 for col in FEATURE_COLS}) bot.ml_filter.is_model_loaded = MagicMock(return_value=False) diff --git a/tests/test_dataset_builder.py b/tests/test_dataset_builder.py index 2899f48..bc5c6ce 100644 --- a/tests/test_dataset_builder.py +++ b/tests/test_dataset_builder.py @@ -230,7 +230,7 @@ def signal_producing_df(): def test_hold_negative_labels_are_all_zero(signal_producing_df): """HOLD negative 샘플의 label은 전부 0이어야 한다.""" - result = generate_dataset_vectorized(signal_producing_df, negative_ratio=3) + result = generate_dataset_vectorized(signal_producing_df, negative_ratio=3, adx_threshold=0, volume_multiplier=1.5) assert len(result) > 0, "시그널이 발생하지 않아 테스트 불가" assert "source" in result.columns hold_neg = result[result["source"] == "hold_negative"] @@ -241,8 +241,8 @@ def test_hold_negative_labels_are_all_zero(signal_producing_df): def test_signal_samples_preserved_after_sampling(signal_producing_df): """계층적 샘플링 후 source='signal' 샘플이 하나도 버려지지 않아야 한다.""" - result_signal_only = generate_dataset_vectorized(signal_producing_df, negative_ratio=0) - result_with_hold = generate_dataset_vectorized(signal_producing_df, negative_ratio=3) + result_signal_only = generate_dataset_vectorized(signal_producing_df, negative_ratio=0, adx_threshold=0, volume_multiplier=1.5) + result_with_hold = generate_dataset_vectorized(signal_producing_df, negative_ratio=3, adx_threshold=0, volume_multiplier=1.5) assert len(result_signal_only) > 0, "시그널이 발생하지 않아 테스트 불가" assert "source" in result_with_hold.columns diff --git a/tests/test_indicators.py b/tests/test_indicators.py index 8dbad7e..43acc63 100644 --- a/tests/test_indicators.py +++ b/tests/test_indicators.py @@ -54,20 +54,22 @@ def test_adx_column_exists(sample_df): assert (valid >= 0).all() -def test_adx_low_does_not_block_signal(sample_df): - """ADX < 25여도 시그널이 차단되지 않는다 (ML에 위임).""" +def test_adx_filter_blocks_low_adx(sample_df): + """ADX < adx_threshold이면 HOLD 반환.""" ind = Indicators(sample_df) df = ind.calculate_all() - # 강한 LONG 신호가 나오도록 지표 조작 df.loc[df.index[-1], "rsi"] = 20 df.loc[df.index[-2], "macd"] = -1 df.loc[df.index[-2], "macd_signal"] = 0 df.loc[df.index[-1], "macd"] = 1 df.loc[df.index[-1], "macd_signal"] = 0 - df.loc[df.index[-1], "volume"] = df.loc[df.index[-1], "vol_ma20"] * 2 + df.loc[df.index[-1], "volume"] = df.loc[df.index[-1], "vol_ma20"] * 3 df["adx"] = 15.0 + # 기본 adx_threshold=25이므로 ADX=15은 HOLD signal = ind.get_signal(df) - # ADX 낮아도 지표 조건 충족 시 LONG 반환 (ML이 최종 판단) + assert signal == "HOLD" + # adx_threshold=0이면 ADX 필터 비활성화 → LONG + signal = ind.get_signal(df, adx_threshold=0) assert signal == "LONG"