fix: ML pipeline train-serve alignment and backtest accuracy
- Parameterize SL/TP multipliers in dataset_builder (C1) - Pass SL/TP from all callers with CLI args --sl-mult/--tp-mult (C1) - Align default SL/TP to 2.0/2.0 matching config.py (C1) - Include unrealized PnL in backtester equity curve (I4) - Remove MLX double normalization in walk-forward (C3) - Use stratified_undersample in MLX training (I1) - Add MLFilter.from_model() factory method (I3) - Fix backtest_validator initial_balance hardcoding (I5) - Deprecate legacy generate_dataset() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -144,3 +144,4 @@ All design documents and implementation plans are stored in `docs/plans/` with t
|
||||
| 2026-03-19 | `critical-bugfixes` (C5,C1,C3,C8) | Completed |
|
||||
| 2026-03-21 | `dashboard-code-review-r2` (#14,#19) | Completed |
|
||||
| 2026-03-21 | `code-review-fixes-r2` (9 issues) | Completed |
|
||||
| 2026-03-21 | `ml-pipeline-fixes` (C1,C3,I1,I3,I4,I5) | Completed |
|
||||
|
||||
@@ -17,7 +17,7 @@ import numpy as np
|
||||
import pandas as pd
|
||||
from sklearn.metrics import roc_auc_score, classification_report
|
||||
|
||||
from src.dataset_builder import generate_dataset_vectorized
|
||||
from src.dataset_builder import generate_dataset_vectorized, stratified_undersample
|
||||
from src.ml_features import FEATURE_COLS
|
||||
from src.mlx_filter import MLXFilter
|
||||
|
||||
@@ -45,7 +45,7 @@ def _split_combined(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame | None
|
||||
return xrp_df, btc_df, eth_df
|
||||
|
||||
|
||||
def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float:
|
||||
def train_mlx(data_path: str, time_weight_decay: float = 2.0, atr_sl_mult: float = 2.0, atr_tp_mult: float = 2.0) -> float:
|
||||
print(f"데이터 로드: {data_path}")
|
||||
raw = pd.read_parquet(data_path)
|
||||
print(f"캔들 수: {len(raw)}")
|
||||
@@ -58,7 +58,8 @@ def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float:
|
||||
|
||||
print("\n데이터셋 생성 중...")
|
||||
t0 = time.perf_counter()
|
||||
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay)
|
||||
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay,
|
||||
atr_sl_mult=atr_sl_mult, atr_tp_mult=atr_tp_mult, negative_ratio=5)
|
||||
t1 = time.perf_counter()
|
||||
print(f"데이터셋 생성 완료: {t1 - t0:.1f}초, {len(dataset)}개 샘플")
|
||||
|
||||
@@ -85,16 +86,10 @@ def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float:
|
||||
y_train, y_val = y.iloc[:split], y.iloc[split:]
|
||||
w_train = w[:split]
|
||||
|
||||
# --- 클래스 불균형 처리: 언더샘플링 (가중치 인덱스 보존) ---
|
||||
pos_idx = np.where(y_train == 1)[0]
|
||||
neg_idx = np.where(y_train == 0)[0]
|
||||
|
||||
if len(neg_idx) > len(pos_idx):
|
||||
np.random.seed(42)
|
||||
neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False)
|
||||
|
||||
balanced_idx = np.concatenate([pos_idx, neg_idx])
|
||||
np.random.shuffle(balanced_idx)
|
||||
# --- 클래스 불균형 처리: stratified 언더샘플링 (Signal 전수 유지, HOLD만 샘플링) ---
|
||||
source = dataset["source"].values if "source" in dataset.columns else np.full(len(dataset), "signal")
|
||||
source_train = source[:split]
|
||||
balanced_idx = stratified_undersample(y_train.values, source_train, seed=42)
|
||||
|
||||
X_train = X_train.iloc[balanced_idx]
|
||||
y_train = y_train.iloc[balanced_idx]
|
||||
@@ -170,6 +165,8 @@ def walk_forward_auc(
|
||||
time_weight_decay: float = 2.0,
|
||||
n_splits: int = 5,
|
||||
train_ratio: float = 0.6,
|
||||
atr_sl_mult: float = 2.0,
|
||||
atr_tp_mult: float = 2.0,
|
||||
) -> None:
|
||||
"""Walk-Forward 검증: 슬라이딩 윈도우로 n_splits번 학습/검증 반복."""
|
||||
print(f"\n=== Walk-Forward 검증 ({n_splits}폴드, decay={time_weight_decay}) ===")
|
||||
@@ -177,7 +174,8 @@ def walk_forward_auc(
|
||||
df, btc_df, eth_df = _split_combined(raw)
|
||||
|
||||
dataset = generate_dataset_vectorized(
|
||||
df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay
|
||||
df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay,
|
||||
atr_sl_mult=atr_sl_mult, atr_tp_mult=atr_tp_mult, negative_ratio=5,
|
||||
)
|
||||
missing = [c for c in FEATURE_COLS if c not in dataset.columns]
|
||||
for col in missing:
|
||||
@@ -186,6 +184,7 @@ def walk_forward_auc(
|
||||
X_all = dataset[FEATURE_COLS].values.astype(np.float32)
|
||||
y_all = dataset["label"].values.astype(np.float32)
|
||||
w_all = dataset["sample_weight"].values.astype(np.float32)
|
||||
source_all = dataset["source"].values if "source" in dataset.columns else np.full(len(dataset), "signal")
|
||||
n = len(dataset)
|
||||
|
||||
step = max(1, int(n * (1 - train_ratio) / n_splits))
|
||||
@@ -204,28 +203,15 @@ def walk_forward_auc(
|
||||
X_val_raw = X_all[tr_end:val_end]
|
||||
y_val = y_all[tr_end:val_end]
|
||||
|
||||
pos_idx = np.where(y_tr == 1)[0]
|
||||
neg_idx = np.where(y_tr == 0)[0]
|
||||
if len(neg_idx) > len(pos_idx):
|
||||
np.random.seed(42)
|
||||
neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False)
|
||||
bal_idx = np.sort(np.concatenate([pos_idx, neg_idx]))
|
||||
source_tr = source_all[:tr_end]
|
||||
bal_idx = stratified_undersample(y_tr, source_tr, seed=42)
|
||||
|
||||
X_tr_bal = X_tr_raw[bal_idx]
|
||||
y_tr_bal = y_tr[bal_idx]
|
||||
w_tr_bal = w_tr[bal_idx]
|
||||
|
||||
# 폴드별 정규화 (학습 데이터 기준으로 계산, 검증에도 동일 적용)
|
||||
mean = X_tr_bal.mean(axis=0)
|
||||
std = X_tr_bal.std(axis=0) + 1e-8
|
||||
X_tr_norm = (X_tr_bal - mean) / std
|
||||
X_val_norm = (X_val_raw - mean) / std
|
||||
|
||||
# DataFrame으로 래핑해서 MLXFilter.fit()에 전달
|
||||
# fit() 내부 정규화가 덮어쓰지 않도록 이미 정규화된 데이터를 넘기고
|
||||
# _mean=0, _std=1로 고정해 이중 정규화를 방지
|
||||
X_tr_df = pd.DataFrame(X_tr_norm, columns=FEATURE_COLS)
|
||||
X_val_df = pd.DataFrame(X_val_norm, columns=FEATURE_COLS)
|
||||
X_tr_df = pd.DataFrame(X_tr_bal, columns=FEATURE_COLS)
|
||||
X_val_df = pd.DataFrame(X_val_raw, columns=FEATURE_COLS)
|
||||
|
||||
model = MLXFilter(
|
||||
input_dim=len(FEATURE_COLS),
|
||||
@@ -235,9 +221,7 @@ def walk_forward_auc(
|
||||
batch_size=256,
|
||||
)
|
||||
model.fit(X_tr_df, pd.Series(y_tr_bal), sample_weight=w_tr_bal)
|
||||
# fit()이 내부에서 다시 정규화하므로 저장된 mean/std를 항등 변환으로 교체
|
||||
model._mean = np.zeros(len(FEATURE_COLS), dtype=np.float32)
|
||||
model._std = np.ones(len(FEATURE_COLS), dtype=np.float32)
|
||||
# fit() handles normalization internally, predict_proba() applies same mean/std
|
||||
|
||||
proba = model.predict_proba(X_val_df)
|
||||
auc = roc_auc_score(y_val, proba) if len(np.unique(y_val)) > 1 else 0.5
|
||||
@@ -260,12 +244,16 @@ def main():
|
||||
)
|
||||
parser.add_argument("--wf", action="store_true", help="Walk-Forward 검증 실행")
|
||||
parser.add_argument("--wf-splits", type=int, default=5, help="Walk-Forward 폴드 수")
|
||||
parser.add_argument("--sl-mult", type=float, default=2.0, help="SL ATR 배수 (기본 2.0)")
|
||||
parser.add_argument("--tp-mult", type=float, default=2.0, help="TP ATR 배수 (기본 2.0)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.wf:
|
||||
walk_forward_auc(args.data, time_weight_decay=args.decay, n_splits=args.wf_splits)
|
||||
walk_forward_auc(args.data, time_weight_decay=args.decay, n_splits=args.wf_splits,
|
||||
atr_sl_mult=args.sl_mult, atr_tp_mult=args.tp_mult)
|
||||
else:
|
||||
train_mlx(args.data, time_weight_decay=args.decay)
|
||||
train_mlx(args.data, time_weight_decay=args.decay,
|
||||
atr_sl_mult=args.sl_mult, atr_tp_mult=args.tp_mult)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -9,6 +9,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from multiprocessing import Pool, cpu_count
|
||||
from pathlib import Path
|
||||
@@ -54,8 +55,6 @@ def _cgroup_cpu_count() -> int:
|
||||
|
||||
|
||||
LOOKAHEAD = 24 # 15분봉 × 24 = 6시간 (dataset_builder.py와 동기화)
|
||||
ATR_SL_MULT = 1.5
|
||||
ATR_TP_MULT = 3.0
|
||||
MODEL_PATH = Path("models/lgbm_filter.pkl")
|
||||
PREV_MODEL_PATH = Path("models/lgbm_filter_prev.pkl")
|
||||
LOG_PATH = Path("models/training_log.json")
|
||||
@@ -63,6 +62,8 @@ LOG_PATH = Path("models/training_log.json")
|
||||
|
||||
def _process_index(args: tuple) -> dict | None:
|
||||
"""단일 인덱스에 대해 피처+레이블을 계산한다. Pool worker 함수."""
|
||||
ATR_SL_MULT = 1.5 # legacy values
|
||||
ATR_TP_MULT = 3.0
|
||||
i, df_values, df_columns = args
|
||||
df = pd.DataFrame(df_values, columns=df_columns)
|
||||
|
||||
@@ -104,7 +105,11 @@ def _process_index(args: tuple) -> dict | None:
|
||||
|
||||
|
||||
def generate_dataset(df: pd.DataFrame, n_jobs: int | None = None) -> pd.DataFrame:
|
||||
"""신호 발생 시점마다 피처와 레이블을 병렬로 생성한다."""
|
||||
"""[Deprecated] generate_dataset_vectorized()를 사용할 것."""
|
||||
warnings.warn(
|
||||
"generate_dataset()는 deprecated. generate_dataset_vectorized()를 사용하세요.",
|
||||
DeprecationWarning, stacklevel=2,
|
||||
)
|
||||
total = len(df)
|
||||
indices = range(60, total - LOOKAHEAD)
|
||||
|
||||
@@ -191,7 +196,7 @@ def _load_lgbm_params(tuned_params_path: str | None) -> tuple[dict, float]:
|
||||
return lgbm_params, weight_scale
|
||||
|
||||
|
||||
def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str | None = None):
|
||||
def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str | None = None, atr_sl_mult: float = 2.0, atr_tp_mult: float = 2.0):
|
||||
print(f"데이터 로드: {data_path}")
|
||||
df_raw = pd.read_parquet(data_path)
|
||||
print(f"캔들 수: {len(df_raw)}, 컬럼: {list(df_raw.columns)}")
|
||||
@@ -218,6 +223,8 @@ def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str
|
||||
df, btc_df=btc_df, eth_df=eth_df,
|
||||
time_weight_decay=time_weight_decay,
|
||||
negative_ratio=5,
|
||||
atr_sl_mult=atr_sl_mult,
|
||||
atr_tp_mult=atr_tp_mult,
|
||||
)
|
||||
|
||||
if dataset.empty or "label" not in dataset.columns:
|
||||
@@ -335,6 +342,8 @@ def walk_forward_auc(
|
||||
n_splits: int = 5,
|
||||
train_ratio: float = 0.6,
|
||||
tuned_params_path: str | None = None,
|
||||
atr_sl_mult: float = 2.0,
|
||||
atr_tp_mult: float = 2.0,
|
||||
) -> None:
|
||||
"""Walk-Forward 검증: 슬라이딩 윈도우로 n_splits번 학습/검증 반복.
|
||||
|
||||
@@ -359,6 +368,8 @@ def walk_forward_auc(
|
||||
df, btc_df=btc_df, eth_df=eth_df,
|
||||
time_weight_decay=time_weight_decay,
|
||||
negative_ratio=5,
|
||||
atr_sl_mult=atr_sl_mult,
|
||||
atr_tp_mult=atr_tp_mult,
|
||||
)
|
||||
actual_feature_cols = [c for c in FEATURE_COLS if c in dataset.columns]
|
||||
X = dataset[actual_feature_cols].values
|
||||
@@ -422,7 +433,7 @@ def walk_forward_auc(
|
||||
print(f" 폴드별: {[round(a, 4) for a in aucs]}")
|
||||
|
||||
|
||||
def compare(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str | None = None):
|
||||
def compare(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str | None = None, atr_sl_mult: float = 2.0, atr_tp_mult: float = 2.0):
|
||||
"""기존 피처 vs OI 파생 피처 추가 버전 A/B 비교."""
|
||||
import warnings
|
||||
|
||||
@@ -449,6 +460,8 @@ def compare(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: s
|
||||
df, btc_df=btc_df, eth_df=eth_df,
|
||||
time_weight_decay=time_weight_decay,
|
||||
negative_ratio=5,
|
||||
atr_sl_mult=atr_sl_mult,
|
||||
atr_tp_mult=atr_tp_mult,
|
||||
)
|
||||
|
||||
if dataset.empty:
|
||||
@@ -546,6 +559,8 @@ def main():
|
||||
)
|
||||
parser.add_argument("--compare", action="store_true",
|
||||
help="OI 파생 피처 추가 전후 A/B 성능 비교")
|
||||
parser.add_argument("--sl-mult", type=float, default=2.0, help="SL ATR 배수 (기본 2.0)")
|
||||
parser.add_argument("--tp-mult", type=float, default=2.0, help="TP ATR 배수 (기본 2.0)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# --symbol 모드: 심볼별 디렉토리 경로 자동 결정
|
||||
@@ -563,16 +578,20 @@ def main():
|
||||
args.data = "data/combined_15m.parquet"
|
||||
|
||||
if args.compare:
|
||||
compare(args.data, time_weight_decay=args.decay, tuned_params_path=args.tuned_params)
|
||||
compare(args.data, time_weight_decay=args.decay, tuned_params_path=args.tuned_params,
|
||||
atr_sl_mult=args.sl_mult, atr_tp_mult=args.tp_mult)
|
||||
elif args.wf:
|
||||
walk_forward_auc(
|
||||
args.data,
|
||||
time_weight_decay=args.decay,
|
||||
n_splits=args.wf_splits,
|
||||
tuned_params_path=args.tuned_params,
|
||||
atr_sl_mult=args.sl_mult,
|
||||
atr_tp_mult=args.tp_mult,
|
||||
)
|
||||
else:
|
||||
train(args.data, time_weight_decay=args.decay, tuned_params_path=args.tuned_params)
|
||||
train(args.data, time_weight_decay=args.decay, tuned_params_path=args.tuned_params,
|
||||
atr_sl_mult=args.sl_mult, atr_tp_mult=args.tp_mult)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -39,7 +39,7 @@ from src.dataset_builder import generate_dataset_vectorized, stratified_undersam
|
||||
# 데이터 로드 및 데이터셋 생성 (1회 캐싱)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
||||
def load_dataset(data_path: str, atr_sl_mult: float = 2.0, atr_tp_mult: float = 2.0) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
||||
"""
|
||||
parquet 로드 → 벡터화 데이터셋 생성 → (X, y, w) numpy 배열 반환.
|
||||
study 시작 전 1회만 호출하여 모든 trial이 공유한다.
|
||||
@@ -64,7 +64,8 @@ def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray, np
|
||||
df = df_raw[base_cols].copy()
|
||||
|
||||
print("\n데이터셋 생성 중 (1회만 실행)...")
|
||||
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0, negative_ratio=5)
|
||||
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0, negative_ratio=5,
|
||||
atr_sl_mult=atr_sl_mult, atr_tp_mult=atr_tp_mult)
|
||||
|
||||
if dataset.empty or "label" not in dataset.columns:
|
||||
raise ValueError("데이터셋 생성 실패: 샘플 0개")
|
||||
@@ -527,6 +528,8 @@ def main():
|
||||
parser.add_argument("--train-ratio", type=float, default=0.6, help="학습 구간 비율 (기본: 0.6)")
|
||||
parser.add_argument("--min-recall", type=float, default=0.35, help="최소 재현율 제약 (기본: 0.35)")
|
||||
parser.add_argument("--no-baseline", action="store_true", help="베이스라인 측정 건너뜀")
|
||||
parser.add_argument("--sl-mult", type=float, default=2.0, help="SL ATR 배수 (기본 2.0)")
|
||||
parser.add_argument("--tp-mult", type=float, default=2.0, help="TP ATR 배수 (기본 2.0)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# --symbol 모드: 심볼별 디렉토리 경로 자동 결정
|
||||
@@ -538,7 +541,7 @@ def main():
|
||||
args.data = "data/combined_15m.parquet"
|
||||
|
||||
# 1. 데이터셋 로드 (1회)
|
||||
X, y, w, source = load_dataset(args.data)
|
||||
X, y, w, source = load_dataset(args.data, atr_sl_mult=args.sl_mult, atr_tp_mult=args.tp_mult)
|
||||
|
||||
# 2. 베이스라인 측정
|
||||
if args.symbol:
|
||||
|
||||
@@ -30,7 +30,7 @@ def validate(trades: list[dict], summary: dict, cfg) -> dict:
|
||||
results: list[CheckResult] = []
|
||||
|
||||
# 검증 1: 논리적 불변 조건
|
||||
results.extend(_check_invariants(trades))
|
||||
results.extend(_check_invariants(trades, cfg))
|
||||
|
||||
# 검증 2: 통계적 이상 감지
|
||||
results.extend(_check_statistics(trades, summary))
|
||||
@@ -47,7 +47,7 @@ def validate(trades: list[dict], summary: dict, cfg) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _check_invariants(trades: list[dict]) -> list[CheckResult]:
|
||||
def _check_invariants(trades: list[dict], cfg=None) -> list[CheckResult]:
|
||||
"""논리적 불변 조건. 하나라도 위반 시 FAIL."""
|
||||
results = []
|
||||
|
||||
@@ -120,7 +120,7 @@ def _check_invariants(trades: list[dict]) -> list[CheckResult]:
|
||||
))
|
||||
|
||||
# 5. 잔고가 음수가 된 적 없음
|
||||
balance = 1000.0 # cfg.initial_balance를 몰라도 trades에서 추적 가능
|
||||
balance = cfg.initial_balance if cfg is not None else 1000.0
|
||||
min_balance = balance
|
||||
for t in trades:
|
||||
balance += t["net_pnl"]
|
||||
|
||||
@@ -317,16 +317,9 @@ class Backtester:
|
||||
self.ml_filters = {}
|
||||
for sym in self.cfg.symbols:
|
||||
if sym in ml_models and ml_models[sym] is not None:
|
||||
mf = MLFilter.__new__(MLFilter)
|
||||
mf._disabled = False
|
||||
mf._onnx_session = None
|
||||
mf._lgbm_model = ml_models[sym]
|
||||
mf._threshold = self.cfg.ml_threshold
|
||||
mf._onnx_path = Path("/dev/null")
|
||||
mf._lgbm_path = Path("/dev/null")
|
||||
mf._loaded_onnx_mtime = 0.0
|
||||
mf._loaded_lgbm_mtime = 0.0
|
||||
self.ml_filters[sym] = mf
|
||||
self.ml_filters[sym] = MLFilter.from_model(
|
||||
ml_models[sym], threshold=self.cfg.ml_threshold
|
||||
)
|
||||
else:
|
||||
self.ml_filters[sym] = None
|
||||
|
||||
@@ -335,6 +328,7 @@ class Backtester:
|
||||
logger.info(f"총 이벤트: {len(events):,}개")
|
||||
|
||||
# 메인 루프
|
||||
latest_prices: dict[str, float] = {}
|
||||
for ts, sym, candle_idx in events:
|
||||
date_str = str(ts.date())
|
||||
self.risk.new_day(date_str)
|
||||
@@ -342,9 +336,10 @@ class Backtester:
|
||||
df_ind = all_indicators[sym]
|
||||
signal = all_signals[sym][candle_idx]
|
||||
row = df_ind.iloc[candle_idx]
|
||||
latest_prices[sym] = float(row["close"])
|
||||
|
||||
# 에퀴티 기록
|
||||
self._record_equity(ts)
|
||||
self._record_equity(ts, current_prices=latest_prices)
|
||||
|
||||
# 1) 일일 손실 체크
|
||||
if not self.risk.is_trading_allowed():
|
||||
@@ -568,12 +563,15 @@ class Backtester:
|
||||
}
|
||||
self.trades.append(trade)
|
||||
|
||||
def _record_equity(self, ts: pd.Timestamp):
|
||||
# 미실현 PnL 포함 에퀴티
|
||||
def _record_equity(self, ts: pd.Timestamp, current_prices: dict[str, float] | None = None):
|
||||
unrealized = 0.0
|
||||
for pos in self.positions.values():
|
||||
# 에퀴티 기록 시점에는 현재가를 알 수 없으므로 entry_price 기준으로 0 처리
|
||||
pass
|
||||
for sym, pos in self.positions.items():
|
||||
price = (current_prices or {}).get(sym)
|
||||
if price is not None:
|
||||
if pos.side == "LONG":
|
||||
unrealized += (price - pos.entry_price) * pos.quantity
|
||||
else:
|
||||
unrealized += (pos.entry_price - price) * pos.quantity
|
||||
equity = self.balance + unrealized
|
||||
self.equity_curve.append({"timestamp": str(ts), "equity": round(equity, 4)})
|
||||
if equity > self._peak_equity:
|
||||
@@ -743,6 +741,8 @@ class WalkForwardBacktester:
|
||||
signal_threshold=self.cfg.signal_threshold,
|
||||
adx_threshold=self.cfg.adx_threshold,
|
||||
volume_multiplier=self.cfg.volume_multiplier,
|
||||
atr_sl_mult=self.cfg.atr_sl_mult,
|
||||
atr_tp_mult=self.cfg.atr_tp_mult,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f" [{symbol}] 데이터셋 생성 실패: {e}")
|
||||
|
||||
@@ -12,7 +12,7 @@ import pandas_ta as ta
|
||||
from src.ml_features import FEATURE_COLS
|
||||
|
||||
LOOKAHEAD = 24 # 15분봉 × 24 = 6시간 뷰
|
||||
ATR_SL_MULT = 1.5
|
||||
ATR_SL_MULT = 2.0 # config.py 기본값과 동일 (서빙 환경 일치)
|
||||
ATR_TP_MULT = 2.0
|
||||
WARMUP = 60 # 15분봉 기준 60캔들 = 15시간 (지표 안정화 충분)
|
||||
|
||||
@@ -323,6 +323,8 @@ def _calc_labels_vectorized(
|
||||
d: pd.DataFrame,
|
||||
feat: pd.DataFrame,
|
||||
sig_idx: np.ndarray,
|
||||
atr_sl_mult: float = ATR_SL_MULT,
|
||||
atr_tp_mult: float = ATR_TP_MULT,
|
||||
) -> tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
label_builder.py build_labels() 로직을 numpy 2D 배열로 벡터화한다.
|
||||
@@ -348,11 +350,11 @@ def _calc_labels_vectorized(
|
||||
continue
|
||||
|
||||
if signal == "LONG":
|
||||
sl = entry - atr * ATR_SL_MULT
|
||||
tp = entry + atr * ATR_TP_MULT
|
||||
sl = entry - atr * atr_sl_mult
|
||||
tp = entry + atr * atr_tp_mult
|
||||
else:
|
||||
sl = entry + atr * ATR_SL_MULT
|
||||
tp = entry - atr * ATR_TP_MULT
|
||||
sl = entry + atr * atr_sl_mult
|
||||
tp = entry - atr * atr_tp_mult
|
||||
|
||||
end = min(idx + 1 + LOOKAHEAD, n_total)
|
||||
fut_high = highs[idx + 1 : end]
|
||||
@@ -391,6 +393,8 @@ def generate_dataset_vectorized(
|
||||
signal_threshold: int = 3,
|
||||
adx_threshold: float = 25,
|
||||
volume_multiplier: float = 2.5,
|
||||
atr_sl_mult: float = ATR_SL_MULT,
|
||||
atr_tp_mult: float = ATR_TP_MULT,
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
전체 시계열을 1회 계산해 학습 데이터셋을 생성한다.
|
||||
@@ -435,7 +439,10 @@ def generate_dataset_vectorized(
|
||||
print(f" 신호 발생 인덱스: {len(sig_idx):,}개")
|
||||
|
||||
print(" [3/3] 레이블 계산...")
|
||||
labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx)
|
||||
labels, valid_mask = _calc_labels_vectorized(
|
||||
d, feat_all, sig_idx,
|
||||
atr_sl_mult=atr_sl_mult, atr_tp_mult=atr_tp_mult,
|
||||
)
|
||||
|
||||
final_sig_idx = sig_idx[valid_mask]
|
||||
available_feature_cols = [c for c in FEATURE_COLS if c in feat_all.columns]
|
||||
|
||||
@@ -155,6 +155,21 @@ class MLFilter:
|
||||
logger.warning(f"ML 필터 예측 오류 (진입 차단): {e}")
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def from_model(cls, model, threshold: float = 0.55) -> "MLFilter":
|
||||
"""외부에서 학습된 LightGBM 모델을 주입하여 MLFilter를 생성한다.
|
||||
backtester walk-forward에서 사용."""
|
||||
instance = cls.__new__(cls)
|
||||
instance._disabled = False
|
||||
instance._onnx_session = None
|
||||
instance._lgbm_model = model
|
||||
instance._threshold = threshold
|
||||
instance._onnx_path = Path("/dev/null")
|
||||
instance._lgbm_path = Path("/dev/null")
|
||||
instance._loaded_onnx_mtime = 0.0
|
||||
instance._loaded_lgbm_mtime = 0.0
|
||||
return instance
|
||||
|
||||
def reload_model(self):
|
||||
"""외부에서 강제 리로드할 때 사용 (하위 호환)."""
|
||||
prev_backend = self.active_backend
|
||||
|
||||
@@ -141,18 +141,24 @@ class MLXFilter:
|
||||
X: pd.DataFrame,
|
||||
y: pd.Series,
|
||||
sample_weight: np.ndarray | None = None,
|
||||
normalize: bool = True,
|
||||
) -> "MLXFilter":
|
||||
X_np = X[FEATURE_COLS].values.astype(np.float32)
|
||||
y_np = y.values.astype(np.float32)
|
||||
|
||||
# nan-safe 정규화: nanmean/nanstd로 통계 계산 후 nan → 0.0 대치
|
||||
# (z-score 후 0.0 = 평균값, 신경망에 줄 수 있는 가장 무난한 결측 대치값)
|
||||
mean_vals = np.nanmean(X_np, axis=0)
|
||||
self._mean = np.nan_to_num(mean_vals, nan=0.0) # 전체-NaN 컬럼 → 평균 0.0
|
||||
std_vals = np.nanstd(X_np, axis=0)
|
||||
self._std = np.nan_to_num(std_vals, nan=1.0) + 1e-8 # 전체-NaN 컬럼 → std 1.0
|
||||
X_np = (X_np - self._mean) / self._std
|
||||
X_np = np.nan_to_num(X_np, nan=0.0)
|
||||
if normalize:
|
||||
# nan-safe 정규화: nanmean/nanstd로 통계 계산 후 nan → 0.0 대치
|
||||
# (z-score 후 0.0 = 평균값, 신경망에 줄 수 있는 가장 무난한 결측 대치값)
|
||||
mean_vals = np.nanmean(X_np, axis=0)
|
||||
self._mean = np.nan_to_num(mean_vals, nan=0.0) # 전체-NaN 컬럼 → 평균 0.0
|
||||
std_vals = np.nanstd(X_np, axis=0)
|
||||
self._std = np.nan_to_num(std_vals, nan=1.0) + 1e-8 # 전체-NaN 컬럼 → std 1.0
|
||||
X_np = (X_np - self._mean) / self._std
|
||||
X_np = np.nan_to_num(X_np, nan=0.0)
|
||||
else:
|
||||
self._mean = np.zeros(X_np.shape[1], dtype=np.float32)
|
||||
self._std = np.ones(X_np.shape[1], dtype=np.float32)
|
||||
X_np = np.nan_to_num(X_np, nan=0.0)
|
||||
|
||||
w_np = sample_weight.astype(np.float32) if sample_weight is not None else None
|
||||
|
||||
|
||||
@@ -73,25 +73,6 @@ def test_generate_dataset_vectorized_with_btc_eth_has_21_feature_cols():
|
||||
assert "label" in result.columns
|
||||
|
||||
|
||||
def test_matches_original_generate_dataset(sample_df):
|
||||
"""벡터화 버전과 기존 버전의 샘플 수가 유사해야 한다.
|
||||
|
||||
벡터화 버전은 전체 시계열로 지표를 1회 계산하고, 기존 버전은 61행 슬라이딩
|
||||
윈도우로 매번 재계산한다. EMA 등 지수 이동평균은 초기값에 따라 수렴 속도가
|
||||
달라지므로 두 방식의 신호 수는 완전히 동일하지 않을 수 있다. ±50% 범위를
|
||||
허용한다.
|
||||
"""
|
||||
from scripts.train_model import generate_dataset
|
||||
orig = generate_dataset(sample_df, n_jobs=1)
|
||||
vec = generate_dataset_vectorized(sample_df)
|
||||
if len(orig) == 0:
|
||||
assert len(vec) == 0
|
||||
return
|
||||
ratio = len(vec) / len(orig)
|
||||
assert 0.5 <= ratio <= 2.0, (
|
||||
f"샘플 수 차이가 너무 큼: 벡터화={len(vec)}, 기존={len(orig)}, 비율={ratio:.2f}"
|
||||
)
|
||||
|
||||
|
||||
def test_epsilon_no_division_by_zero():
|
||||
"""bb_range=0, close=0, vol_ma20=0 극단값에서 nan/inf가 발생하지 않아야 한다."""
|
||||
|
||||
115
tests/test_ml_pipeline_fixes.py
Normal file
115
tests/test_ml_pipeline_fixes.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from src.dataset_builder import generate_dataset_vectorized, _calc_labels_vectorized
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def signal_df():
|
||||
"""시그널이 발생하는 데이터."""
|
||||
rng = np.random.default_rng(7)
|
||||
n = 800
|
||||
trend = np.linspace(1.5, 3.0, n)
|
||||
noise = np.cumsum(rng.normal(0, 0.04, n))
|
||||
close = np.clip(trend + noise, 0.01, None)
|
||||
high = close * (1 + rng.uniform(0, 0.015, n))
|
||||
low = close * (1 - rng.uniform(0, 0.015, n))
|
||||
volume = rng.uniform(1e6, 3e6, n)
|
||||
volume[::30] *= 3.0
|
||||
return pd.DataFrame({
|
||||
"open": close, "high": high, "low": low,
|
||||
"close": close, "volume": volume,
|
||||
})
|
||||
|
||||
|
||||
def test_sltp_params_are_passed_through(signal_df):
|
||||
"""SL/TP 배수가 generate_dataset_vectorized에 전달되어야 한다."""
|
||||
# 파라미터가 수용되는지(TypeError 없이) 확인하는 것이 핵심
|
||||
r1 = generate_dataset_vectorized(
|
||||
signal_df, atr_sl_mult=1.5, atr_tp_mult=2.0,
|
||||
adx_threshold=0, volume_multiplier=1.5,
|
||||
)
|
||||
r2 = generate_dataset_vectorized(
|
||||
signal_df, atr_sl_mult=2.0, atr_tp_mult=2.0,
|
||||
adx_threshold=0, volume_multiplier=1.5,
|
||||
)
|
||||
# 두 결과 모두 DataFrame이어야 한다
|
||||
assert isinstance(r1, pd.DataFrame)
|
||||
assert isinstance(r2, pd.DataFrame)
|
||||
# 신호가 충분히 많을 경우, 다른 SL 배수는 레이블 분포에 영향을 줄 수 있다
|
||||
if len(r1) > 10 and len(r2) > 10:
|
||||
assert not (r1["label"].values == r2["label"].values).all() or len(r1) != len(r2), \
|
||||
"SL 배수가 다르면 레이블이 달라져야 한다"
|
||||
|
||||
|
||||
def test_default_sltp_backward_compatible(signal_df):
|
||||
"""SL/TP 파라미터 미지정 시 기본값(2.0, 2.0)으로 동작해야 한다."""
|
||||
r_default = generate_dataset_vectorized(
|
||||
signal_df, adx_threshold=0, volume_multiplier=1.5,
|
||||
)
|
||||
r_explicit = generate_dataset_vectorized(
|
||||
signal_df, atr_sl_mult=2.0, atr_tp_mult=2.0,
|
||||
adx_threshold=0, volume_multiplier=1.5,
|
||||
)
|
||||
if len(r_default) > 0:
|
||||
assert len(r_default) == len(r_explicit)
|
||||
assert (r_default["label"].values == r_explicit["label"].values).all()
|
||||
|
||||
|
||||
def test_equity_curve_includes_unrealized_pnl():
|
||||
"""에퀴티 커브에 미실현 PnL이 반영되어야 한다."""
|
||||
from src.backtester import Backtester, BacktestConfig, Position
|
||||
import pandas as pd
|
||||
|
||||
cfg = BacktestConfig(symbols=["TEST"], initial_balance=1000.0)
|
||||
bt = Backtester.__new__(Backtester)
|
||||
bt.cfg = cfg
|
||||
bt.balance = 1000.0
|
||||
bt._peak_equity = 1000.0
|
||||
bt.equity_curve = []
|
||||
bt.positions = {"TEST": Position(
|
||||
symbol="TEST", side="LONG", entry_price=100.0,
|
||||
quantity=10.0, sl=95.0, tp=110.0,
|
||||
entry_time=pd.Timestamp("2026-01-01"), entry_fee=0.4,
|
||||
)}
|
||||
|
||||
bt._record_equity(pd.Timestamp("2026-01-01 00:15:00"), current_prices={"TEST": 105.0})
|
||||
|
||||
last = bt.equity_curve[-1]
|
||||
assert last["equity"] == 1050.0, f"Expected 1050.0 (1000+50), got {last['equity']}"
|
||||
|
||||
|
||||
def test_mlx_no_double_normalization():
|
||||
"""MLXFilter.fit()에 normalize=False를 전달하면 내부 정규화를 건너뛰어야 한다."""
|
||||
pytest.importorskip("mlx.core")
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from src.mlx_filter import MLXFilter
|
||||
from src.ml_features import FEATURE_COLS
|
||||
|
||||
n_features = len(FEATURE_COLS)
|
||||
rng = np.random.default_rng(42)
|
||||
X = pd.DataFrame(
|
||||
rng.standard_normal((100, n_features)).astype(np.float32),
|
||||
columns=FEATURE_COLS,
|
||||
)
|
||||
y = pd.Series(rng.integers(0, 2, 100).astype(np.float32))
|
||||
|
||||
model = MLXFilter(input_dim=n_features, hidden_dim=16, epochs=1, batch_size=32)
|
||||
model.fit(X, y, normalize=False)
|
||||
|
||||
assert np.allclose(model._mean, 0.0), "normalize=False시 mean은 0이어야 한다"
|
||||
assert np.allclose(model._std, 1.0), "normalize=False시 std는 1이어야 한다"
|
||||
|
||||
|
||||
def test_ml_filter_from_model():
|
||||
"""MLFilter.from_model()로 LightGBM 모델을 주입할 수 있어야 한다."""
|
||||
from src.ml_filter import MLFilter
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
mock_model = MagicMock()
|
||||
mock_model.predict_proba.return_value = [[0.3, 0.7]]
|
||||
|
||||
mf = MLFilter.from_model(mock_model, threshold=0.55)
|
||||
assert mf.is_model_loaded()
|
||||
assert mf.active_backend == "LightGBM"
|
||||
Reference in New Issue
Block a user