Files
cointrader/docs/plans/2026-03-01-ml-filter-implementation.md
21in7 7e4e9315c2 feat: implement ML filter with LightGBM for trading signal validation
- Added MLFilter class to load and evaluate LightGBM model for trading signals.
- Introduced retraining mechanism to update the model daily based on new data.
- Created feature engineering and label building utilities for model training.
- Updated bot logic to incorporate ML filter for signal validation.
- Added scripts for data fetching and model training.

Made-with: Cursor
2026-03-01 17:07:18 +09:00

32 KiB

ML 필터 (LightGBM) 구현 계획

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 기존 규칙 기반 신호(LONG/SHORT/HOLD)가 발생했을 때 LightGBM 모델이 수익 확률을 계산해 낮은 확률의 진입을 차단하는 보조 필터를 구현한다.

Architecture: 과거 캔들 데이터로 LightGBM을 오프라인 학습시키고, 봇의 process_candle()에서 규칙 기반 신호가 나오면 ML 필터가 확률을 계산해 0.60 미만이면 진입을 차단한다. 매일 새벽 3시에 자동 재학습하며 성능이 나빠지면 이전 모델로 롤백한다.

Tech Stack: Python 3.11+, lightgbm, scikit-learn, joblib, pandas, asyncio (기존 스택 유지)


Task 1: 의존성 추가

Files:

  • Modify: requirements.txt

Step 1: requirements.txt에 ML 패키지 추가

lightgbm>=4.3.0
scikit-learn>=1.4.0
joblib>=1.3.0
pyarrow>=15.0.0

Step 2: 패키지 설치

pip install lightgbm scikit-learn joblib pyarrow

Expected: 설치 완료 메시지 출력

Step 3: models/ 디렉토리 생성

mkdir -p models data scripts
touch models/.gitkeep data/.gitkeep

Step 4: .gitignore에 모델/데이터 파일 추가

기존 .gitignore에 추가:

models/*.pkl
data/*.parquet

Step 5: Commit

git add requirements.txt .gitignore models/.gitkeep data/.gitkeep
git commit -m "feat: add ML dependencies and directory structure"

Task 2: 피처 엔지니어링 모듈 (src/ml_features.py)

Files:

  • Create: src/ml_features.py
  • Create: tests/test_ml_features.py

Step 1: 실패하는 테스트 작성

# tests/test_ml_features.py
import pandas as pd
import numpy as np
import pytest
from src.ml_features import build_features, FEATURE_COLS

def make_df(n=100):
    """테스트용 최소 DataFrame 생성"""
    np.random.seed(42)
    close = 100 + np.cumsum(np.random.randn(n) * 0.5)
    df = pd.DataFrame({
        "open":   close * 0.999,
        "high":   close * 1.002,
        "low":    close * 0.998,
        "close":  close,
        "volume": np.random.uniform(1000, 5000, n),
    })
    return df

def test_build_features_returns_series():
    from src.indicators import Indicators
    df = make_df(100)
    ind = Indicators(df)
    df_ind = ind.calculate_all()
    features = build_features(df_ind, signal="LONG")
    assert isinstance(features, pd.Series)

def test_build_features_has_all_cols():
    from src.indicators import Indicators
    df = make_df(100)
    ind = Indicators(df)
    df_ind = ind.calculate_all()
    features = build_features(df_ind, signal="LONG")
    for col in FEATURE_COLS:
        assert col in features.index, f"피처 누락: {col}"

def test_build_features_no_nan():
    from src.indicators import Indicators
    df = make_df(100)
    ind = Indicators(df)
    df_ind = ind.calculate_all()
    features = build_features(df_ind, signal="LONG")
    assert not features.isna().any(), f"NaN 존재: {features[features.isna()]}"

def test_side_encoding():
    from src.indicators import Indicators
    df = make_df(100)
    ind = Indicators(df)
    df_ind = ind.calculate_all()
    long_feat  = build_features(df_ind, signal="LONG")
    short_feat = build_features(df_ind, signal="SHORT")
    assert long_feat["side"] == 1
    assert short_feat["side"] == 0

Step 2: 테스트 실패 확인

pytest tests/test_ml_features.py -v

Expected: FAIL with "cannot import name 'build_features'"

Step 3: src/ml_features.py 구현

import pandas as pd
import numpy as np

FEATURE_COLS = [
    "rsi", "macd_hist", "bb_pct", "ema_align",
    "stoch_k", "stoch_d", "atr_pct", "vol_ratio",
    "ret_1", "ret_3", "ret_5", "signal_strength", "side",
]


def build_features(df: pd.DataFrame, signal: str) -> pd.Series:
    """
    기술 지표가 계산된 DataFrame의 마지막 행에서 ML 피처를 추출한다.
    signal: "LONG" | "SHORT"
    """
    last = df.iloc[-1]
    close = last["close"]

    bb_upper = last.get("bb_upper", close)
    bb_lower = last.get("bb_lower", close)
    bb_range = bb_upper - bb_lower
    bb_pct = (close - bb_lower) / bb_range if bb_range > 0 else 0.5

    ema9  = last.get("ema9",  close)
    ema21 = last.get("ema21", close)
    ema50 = last.get("ema50", close)
    if ema9 > ema21 > ema50:
        ema_align = 1
    elif ema9 < ema21 < ema50:
        ema_align = -1
    else:
        ema_align = 0

    atr = last.get("atr", 0)
    atr_pct = atr / close if close > 0 else 0

    vol_ma20 = last.get("vol_ma20", last.get("volume", 1))
    vol_ratio = last["volume"] / vol_ma20 if vol_ma20 > 0 else 1.0

    closes = df["close"]
    ret_1 = (close - closes.iloc[-2]) / closes.iloc[-2] if len(closes) >= 2 else 0.0
    ret_3 = (close - closes.iloc[-4]) / closes.iloc[-4] if len(closes) >= 4 else 0.0
    ret_5 = (close - closes.iloc[-6]) / closes.iloc[-6] if len(closes) >= 6 else 0.0

    # 규칙 기반 신호 강도 재계산 (indicators.py get_signal 로직 참조)
    prev = df.iloc[-2] if len(df) >= 2 else last
    strength = 0
    rsi = last.get("rsi", 50)
    macd = last.get("macd", 0)
    macd_sig = last.get("macd_signal", 0)
    prev_macd = prev.get("macd", 0)
    prev_macd_sig = prev.get("macd_signal", 0)
    stoch_k = last.get("stoch_k", 50)
    stoch_d = last.get("stoch_d", 50)

    if signal == "LONG":
        if rsi < 35: strength += 1
        if prev_macd < prev_macd_sig and macd > macd_sig: strength += 2
        if close < last.get("bb_lower", close): strength += 1
        if ema_align == 1: strength += 1
        if stoch_k < 20 and stoch_k > stoch_d: strength += 1
    else:
        if rsi > 65: strength += 1
        if prev_macd > prev_macd_sig and macd < macd_sig: strength += 2
        if close > last.get("bb_upper", close): strength += 1
        if ema_align == -1: strength += 1
        if stoch_k > 80 and stoch_k < stoch_d: strength += 1

    return pd.Series({
        "rsi":            float(rsi),
        "macd_hist":      float(last.get("macd_hist", 0)),
        "bb_pct":         float(bb_pct),
        "ema_align":      float(ema_align),
        "stoch_k":        float(stoch_k),
        "stoch_d":        float(last.get("stoch_d", 50)),
        "atr_pct":        float(atr_pct),
        "vol_ratio":      float(vol_ratio),
        "ret_1":          float(ret_1),
        "ret_3":          float(ret_3),
        "ret_5":          float(ret_5),
        "signal_strength": float(strength),
        "side":           1.0 if signal == "LONG" else 0.0,
    })

Step 4: 테스트 통과 확인

pytest tests/test_ml_features.py -v

Expected: 4개 PASS

Step 5: Commit

git add src/ml_features.py tests/test_ml_features.py
git commit -m "feat: add ML feature engineering module"

Task 3: 레이블 생성 유틸리티 (src/label_builder.py)

Files:

  • Create: src/label_builder.py
  • Create: tests/test_label_builder.py

Step 1: 실패하는 테스트 작성

# tests/test_label_builder.py
import pandas as pd
import numpy as np
import pytest
from src.label_builder import build_labels


def make_signal_df():
    """
    신호 발생 시점 이후 가격이 TP에 도달하는 시나리오
    entry=100, TP=103, SL=98.5
    """
    # 신호 시점 이후 캔들: 점진적으로 상승해서 103 돌파
    future_closes = [100.5, 101.0, 101.8, 102.5, 103.1, 103.5]
    future_highs  = [c + 0.3 for c in future_closes]
    future_lows   = [c - 0.3 for c in future_closes]
    return future_closes, future_highs, future_lows


def test_label_tp_reached():
    closes, highs, lows = make_signal_df()
    label = build_labels(
        future_closes=closes,
        future_highs=highs,
        future_lows=lows,
        take_profit=103.0,
        stop_loss=98.5,
        side="LONG",
    )
    assert label == 1, "TP 먼저 도달해야 레이블 1"


def test_label_sl_reached():
    # 하락해서 SL 먼저 도달
    future_closes = [99.5, 99.0, 98.8, 98.4, 98.0]
    future_highs  = [c + 0.3 for c in future_closes]
    future_lows   = [c - 0.3 for c in future_closes]
    label = build_labels(
        future_closes=future_closes,
        future_highs=future_highs,
        future_lows=future_lows,
        take_profit=103.0,
        stop_loss=98.5,
        side="LONG",
    )
    assert label == 0, "SL 먼저 도달해야 레이블 0"


def test_label_neither_reached_returns_none():
    # 아무것도 도달 못함
    future_closes = [100.1, 100.2, 100.3]
    future_highs  = [c + 0.1 for c in future_closes]
    future_lows   = [c - 0.1 for c in future_closes]
    label = build_labels(
        future_closes=future_closes,
        future_highs=future_highs,
        future_lows=future_lows,
        take_profit=103.0,
        stop_loss=98.5,
        side="LONG",
    )
    assert label is None, "미결 시 None 반환"


def test_label_short_tp():
    # SHORT: 가격 하락 → TP 도달
    future_closes = [99.5, 99.0, 98.0, 97.0]
    future_highs  = [c + 0.3 for c in future_closes]
    future_lows   = [c - 0.3 for c in future_closes]
    label = build_labels(
        future_closes=future_closes,
        future_highs=future_highs,
        future_lows=future_lows,
        take_profit=97.0,
        stop_loss=101.5,
        side="SHORT",
    )
    assert label == 1

Step 2: 테스트 실패 확인

pytest tests/test_label_builder.py -v

Expected: FAIL with "cannot import name 'build_labels'"

Step 3: src/label_builder.py 구현

from typing import Optional


def build_labels(
    future_closes: list[float],
    future_highs: list[float],
    future_lows: list[float],
    take_profit: float,
    stop_loss: float,
    side: str,
) -> Optional[int]:
    """
    진입 이후 미래 캔들을 순서대로 확인해 TP/SL 도달 여부를 판단한다.
    LONG: high >= TP → 1, low <= SL → 0
    SHORT: low <= TP → 1, high >= SL → 0
    둘 다 미도달 → None (학습 데이터에서 제외)
    """
    for high, low in zip(future_highs, future_lows):
        if side == "LONG":
            if high >= take_profit:
                return 1
            if low <= stop_loss:
                return 0
        else:  # SHORT
            if low <= take_profit:
                return 1
            if high >= stop_loss:
                return 0
    return None

Step 4: 테스트 통과 확인

pytest tests/test_label_builder.py -v

Expected: 4개 PASS

Step 5: Commit

git add src/label_builder.py tests/test_label_builder.py
git commit -m "feat: add label builder for TP/SL simulation"

Task 4: 과거 데이터 수집 스크립트 (scripts/fetch_history.py)

Files:

  • Create: scripts/fetch_history.py
  • Create: scripts/__init__.py

Step 1: scripts/fetch_history.py 작성

"""
바이낸스 선물 REST API로 과거 캔들 데이터를 수집해 parquet으로 저장한다.
사용법: python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90
"""
import asyncio
import argparse
from datetime import datetime, timedelta
import pandas as pd
from binance import AsyncClient
from dotenv import load_dotenv
import os

load_dotenv()


async def fetch_klines(symbol: str, interval: str, days: int) -> pd.DataFrame:
    client = await AsyncClient.create(
        api_key=os.getenv("BINANCE_API_KEY", ""),
        api_secret=os.getenv("BINANCE_API_SECRET", ""),
    )
    try:
        start_ts = int((datetime.utcnow() - timedelta(days=days)).timestamp() * 1000)
        all_klines = []
        while True:
            klines = await client.futures_klines(
                symbol=symbol,
                interval=interval,
                startTime=start_ts,
                limit=1500,
            )
            if not klines:
                break
            all_klines.extend(klines)
            last_ts = klines[-1][0]
            if last_ts >= int(datetime.utcnow().timestamp() * 1000):
                break
            start_ts = last_ts + 1
            print(f"수집 중... {len(all_klines)}개")
    finally:
        await client.close_connection()

    df = pd.DataFrame(all_klines, columns=[
        "timestamp", "open", "high", "low", "close", "volume",
        "close_time", "quote_volume", "trades",
        "taker_buy_base", "taker_buy_quote", "ignore",
    ])
    df = df[["timestamp", "open", "high", "low", "close", "volume"]].copy()
    for col in ["open", "high", "low", "close", "volume"]:
        df[col] = df[col].astype(float)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    df.set_index("timestamp", inplace=True)
    return df


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--symbol",   default="XRPUSDT")
    parser.add_argument("--interval", default="1m")
    parser.add_argument("--days",     type=int, default=90)
    parser.add_argument("--output",   default="data/xrpusdt_1m.parquet")
    args = parser.parse_args()

    df = asyncio.run(fetch_klines(args.symbol, args.interval, args.days))
    df.to_parquet(args.output)
    print(f"저장 완료: {args.output} ({len(df)}행)")


if __name__ == "__main__":
    main()

Step 2: 실행 테스트 (실제 API 호출)

python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90

Expected: 저장 완료: data/xrpusdt_1m.parquet (약 129600행)

Step 3: Commit

git add scripts/fetch_history.py scripts/__init__.py
git commit -m "feat: add historical data fetcher script"

Task 5: 모델 학습 스크립트 (scripts/train_model.py)

Files:

  • Create: scripts/train_model.py

Step 1: scripts/train_model.py 작성

"""
과거 캔들 데이터로 LightGBM 필터 모델을 학습하고 저장한다.
사용법: python scripts/train_model.py --data data/xrpusdt_1m.parquet
"""
import argparse
import json
from datetime import datetime
from pathlib import Path

import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.model_selection import TimeSeriesSplit

from src.indicators import Indicators
from src.ml_features import build_features, FEATURE_COLS
from src.label_builder import build_labels

LOOKAHEAD = 60       # 최대 60캔들(1시간) 이내 TP/SL 도달 확인
ATR_SL_MULT = 1.5
ATR_TP_MULT = 3.0
MODEL_PATH = Path("models/lgbm_filter.pkl")
PREV_MODEL_PATH = Path("models/lgbm_filter_prev.pkl")
LOG_PATH = Path("models/training_log.json")


def generate_dataset(df: pd.DataFrame) -> pd.DataFrame:
    """신호 발생 시점마다 피처와 레이블을 생성한다."""
    rows = []
    total = len(df)

    for i in range(60, total - LOOKAHEAD):
        window = df.iloc[i - 60: i + 1].copy()
        ind = Indicators(window)
        df_ind = ind.calculate_all()

        if df_ind.isna().any().any():
            continue

        signal = ind.get_signal(df_ind)
        if signal == "HOLD":
            continue

        entry_price = float(df_ind["close"].iloc[-1])
        atr = float(df_ind["atr"].iloc[-1])
        if atr <= 0:
            continue

        stop_loss   = entry_price - atr * ATR_SL_MULT if signal == "LONG" else entry_price + atr * ATR_SL_MULT
        take_profit = entry_price + atr * ATR_TP_MULT if signal == "LONG" else entry_price - atr * ATR_TP_MULT

        future = df.iloc[i + 1: i + 1 + LOOKAHEAD]
        label = build_labels(
            future_closes=future["close"].tolist(),
            future_highs=future["high"].tolist(),
            future_lows=future["low"].tolist(),
            take_profit=take_profit,
            stop_loss=stop_loss,
            side=signal,
        )
        if label is None:
            continue

        features = build_features(df_ind, signal)
        row = features.to_dict()
        row["label"] = label
        rows.append(row)

        if len(rows) % 500 == 0:
            print(f"  샘플 생성 중: {len(rows)}개 (인덱스 {i}/{total})")

    return pd.DataFrame(rows)


def train(data_path: str):
    print(f"데이터 로드: {data_path}")
    df = pd.read_parquet(data_path)
    print(f"캔들 수: {len(df)}")

    print("데이터셋 생성 중...")
    dataset = generate_dataset(df)
    print(f"학습 샘플: {len(dataset)}개 (양성={dataset['label'].sum():.0f}, 음성={(dataset['label']==0).sum():.0f})")

    if len(dataset) < 200:
        raise ValueError(f"학습 샘플 부족: {len(dataset)}개 (최소 200 필요)")

    X = dataset[FEATURE_COLS]
    y = dataset["label"]

    # 시계열 분할: 앞 80% 학습, 뒤 20% 검증
    split = int(len(X) * 0.8)
    X_train, X_val = X.iloc[:split], X.iloc[split:]
    y_train, y_val = y.iloc[:split], y.iloc[split:]

    model = lgb.LGBMClassifier(
        n_estimators=300,
        learning_rate=0.05,
        num_leaves=31,
        min_child_samples=20,
        subsample=0.8,
        colsample_bytree=0.8,
        class_weight="balanced",
        random_state=42,
        verbose=-1,
    )
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        callbacks=[lgb.early_stopping(30, verbose=False), lgb.log_evaluation(50)],
    )

    val_proba = model.predict_proba(X_val)[:, 1]
    auc = roc_auc_score(y_val, val_proba)
    print(f"\n검증 AUC: {auc:.4f}")
    print(classification_report(y_val, (val_proba >= 0.60).astype(int)))

    # 기존 모델이 있으면 백업
    if MODEL_PATH.exists():
        import shutil
        shutil.copy(MODEL_PATH, PREV_MODEL_PATH)
        print(f"기존 모델 백업: {PREV_MODEL_PATH}")

    MODEL_PATH.parent.mkdir(exist_ok=True)
    joblib.dump(model, MODEL_PATH)
    print(f"모델 저장: {MODEL_PATH}")

    # 학습 이력 기록
    log = []
    if LOG_PATH.exists():
        with open(LOG_PATH) as f:
            log = json.load(f)
    log.append({
        "date": datetime.now().isoformat(),
        "auc": round(auc, 4),
        "samples": len(dataset),
        "model_path": str(MODEL_PATH),
    })
    with open(LOG_PATH, "w") as f:
        json.dump(log, f, indent=2)

    return auc


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", default="data/xrpusdt_1m.parquet")
    args = parser.parse_args()
    train(args.data)


if __name__ == "__main__":
    main()

Step 2: 학습 실행 테스트

python scripts/train_model.py --data data/xrpusdt_1m.parquet

Expected: 학습 완료 후 models/lgbm_filter.pkl 생성, AUC 출력

Step 3: Commit

git add scripts/train_model.py
git commit -m "feat: add LightGBM training script with TP/SL label generation"

Task 6: ML 필터 클래스 (src/ml_filter.py)

Files:

  • Create: src/ml_filter.py
  • Create: tests/test_ml_filter.py

Step 1: 실패하는 테스트 작성

# tests/test_ml_filter.py
import pandas as pd
import numpy as np
import pytest
from unittest.mock import MagicMock, patch
from pathlib import Path
from src.ml_filter import MLFilter
from src.ml_features import FEATURE_COLS


def make_features(side="LONG") -> pd.Series:
    return pd.Series({col: 0.5 for col in FEATURE_COLS} | {"side": 1.0 if side == "LONG" else 0.0})


def test_no_model_file_is_not_loaded(tmp_path):
    f = MLFilter(model_path=str(tmp_path / "nonexistent.pkl"))
    assert not f.is_model_loaded()


def test_no_model_should_enter_returns_true(tmp_path):
    """모델 없으면 항상 진입 허용 (폴백)"""
    f = MLFilter(model_path=str(tmp_path / "nonexistent.pkl"))
    features = make_features()
    assert f.should_enter(features) is True


def test_should_enter_above_threshold():
    """확률 >= 0.60 이면 True"""
    f = MLFilter(threshold=0.60)
    mock_model = MagicMock()
    mock_model.predict_proba.return_value = np.array([[0.35, 0.65]])
    f._model = mock_model
    features = make_features()
    assert f.should_enter(features) is True


def test_should_enter_below_threshold():
    """확률 < 0.60 이면 False"""
    f = MLFilter(threshold=0.60)
    mock_model = MagicMock()
    mock_model.predict_proba.return_value = np.array([[0.55, 0.45]])
    f._model = mock_model
    features = make_features()
    assert f.should_enter(features) is False


def test_reload_model(tmp_path):
    """reload_model 호출 후 모델 로드 상태 변경"""
    import joblib
    import lightgbm as lgb
    # 더미 모델 저장
    dummy = MagicMock()
    model_path = tmp_path / "lgbm_filter.pkl"
    joblib.dump(dummy, model_path)
    f = MLFilter(model_path=str(model_path))
    f.reload_model()
    assert f.is_model_loaded()

Step 2: 테스트 실패 확인

pytest tests/test_ml_filter.py -v

Expected: FAIL with "cannot import name 'MLFilter'"

Step 3: src/ml_filter.py 구현

from pathlib import Path
import joblib
import pandas as pd
from loguru import logger


class MLFilter:
    """
    LightGBM 모델을 로드하고 진입 여부를 판단한다.
    모델 파일이 없으면 항상 진입을 허용한다 (폴백).
    """

    def __init__(self, model_path: str = "models/lgbm_filter.pkl", threshold: float = 0.60):
        self._model_path = Path(model_path)
        self._threshold = threshold
        self._model = None
        self._try_load()

    def _try_load(self):
        if self._model_path.exists():
            try:
                self._model = joblib.load(self._model_path)
                logger.info(f"ML 필터 모델 로드 완료: {self._model_path}")
            except Exception as e:
                logger.warning(f"ML 필터 모델 로드 실패: {e}")
                self._model = None

    def is_model_loaded(self) -> bool:
        return self._model is not None

    def should_enter(self, features: pd.Series) -> bool:
        """
        확률 >= threshold 이면 True (진입 허용).
        모델 없으면 True 반환 (폴백).
        """
        if not self.is_model_loaded():
            return True
        try:
            X = features.to_frame().T
            proba = self._model.predict_proba(X)[0][1]
            logger.debug(f"ML 필터 확률: {proba:.3f} (임계값: {self._threshold})")
            return proba >= self._threshold
        except Exception as e:
            logger.warning(f"ML 필터 예측 오류 (폴백 허용): {e}")
            return True

    def reload_model(self):
        """재학습 후 모델을 핫 리로드한다."""
        self._try_load()
        logger.info("ML 필터 모델 리로드 완료")

Step 4: 테스트 통과 확인

pytest tests/test_ml_filter.py -v

Expected: 5개 PASS

Step 5: Commit

git add src/ml_filter.py tests/test_ml_filter.py
git commit -m "feat: add MLFilter class with fallback support"

Task 7: 자동 재학습 스케줄러 (src/retrainer.py)

Files:

  • Create: src/retrainer.py
  • Create: tests/test_retrainer.py

Step 1: 실패하는 테스트 작성

# tests/test_retrainer.py
import pytest
import json
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from src.retrainer import Retrainer


@pytest.mark.asyncio
async def test_retrain_calls_train(tmp_path):
    """재학습 시 train 함수가 호출되는지 확인"""
    ml_filter = MagicMock()
    r = Retrainer(ml_filter=ml_filter, data_path=str(tmp_path / "data.parquet"))

    with patch("src.retrainer.fetch_and_save", new_callable=AsyncMock) as mock_fetch, \
         patch("src.retrainer.run_training", return_value=0.72) as mock_train, \
         patch("src.retrainer.get_current_auc", return_value=0.65):
        await r.retrain()

    mock_fetch.assert_called_once()
    mock_train.assert_called_once()


@pytest.mark.asyncio
async def test_retrain_rollback_when_worse(tmp_path):
    """새 모델이 기존보다 나쁘면 롤백"""
    ml_filter = MagicMock()
    r = Retrainer(ml_filter=ml_filter, data_path=str(tmp_path / "data.parquet"))

    with patch("src.retrainer.fetch_and_save", new_callable=AsyncMock), \
         patch("src.retrainer.run_training", return_value=0.55), \
         patch("src.retrainer.get_current_auc", return_value=0.70), \
         patch("src.retrainer.rollback_model") as mock_rollback:
        await r.retrain()

    mock_rollback.assert_called_once()

Step 2: 테스트 실패 확인

pytest tests/test_retrainer.py -v

Expected: FAIL with "cannot import name 'Retrainer'"

Step 3: src/retrainer.py 구현

import asyncio
import json
from datetime import datetime
from pathlib import Path

from loguru import logger

from src.ml_filter import MLFilter

MODEL_PATH      = Path("models/lgbm_filter.pkl")
PREV_MODEL_PATH = Path("models/lgbm_filter_prev.pkl")
LOG_PATH        = Path("models/training_log.json")


def get_current_auc() -> float:
    """training_log.json에서 가장 최근 AUC를 읽는다."""
    if not LOG_PATH.exists():
        return 0.0
    with open(LOG_PATH) as f:
        log = json.load(f)
    return log[-1]["auc"] if log else 0.0


def rollback_model():
    """이전 모델로 롤백한다."""
    if PREV_MODEL_PATH.exists():
        import shutil
        shutil.copy(PREV_MODEL_PATH, MODEL_PATH)
        logger.warning("ML 모델 롤백 완료")
    else:
        logger.warning("롤백할 이전 모델 없음")


async def fetch_and_save(data_path: str):
    """증분 데이터 수집 (fetch_history.py 로직 재사용)."""
    import subprocess
    result = subprocess.run(
        ["python", "scripts/fetch_history.py", "--output", data_path, "--days", "90"],
        capture_output=True, text=True,
    )
    if result.returncode != 0:
        raise RuntimeError(f"데이터 수집 실패: {result.stderr}")
    logger.info(f"데이터 수집 완료: {data_path}")


def run_training(data_path: str) -> float:
    """train_model.py를 실행하고 새 AUC를 반환한다."""
    import subprocess
    result = subprocess.run(
        ["python", "scripts/train_model.py", "--data", data_path],
        capture_output=True, text=True,
    )
    if result.returncode != 0:
        raise RuntimeError(f"학습 실패: {result.stderr}")
    new_auc = get_current_auc()
    return new_auc


class Retrainer:
    def __init__(self, ml_filter: MLFilter, data_path: str = "data/xrpusdt_1m.parquet"):
        self._ml_filter = ml_filter
        self._data_path = data_path

    async def retrain(self):
        logger.info("자동 재학습 시작")
        old_auc = get_current_auc()
        try:
            await fetch_and_save(self._data_path)
            new_auc = run_training(self._data_path)
            logger.info(f"재학습 완료: 이전 AUC={old_auc:.4f} → 새 AUC={new_auc:.4f}")

            if new_auc < old_auc - 0.01:
                logger.warning(f"새 모델 성능 저하 ({new_auc:.4f} < {old_auc:.4f}), 롤백")
                rollback_model()
            else:
                self._ml_filter.reload_model()
                logger.success("새 ML 모델 적용 완료")
        except Exception as e:
            logger.error(f"재학습 실패: {e}")

    async def schedule_daily(self, hour: int = 3):
        """매일 지정 시각(UTC 기준)에 재학습을 실행한다."""
        while True:
            now = datetime.utcnow()
            next_run = now.replace(hour=hour, minute=0, second=0, microsecond=0)
            if next_run <= now:
                from datetime import timedelta
                next_run += timedelta(days=1)
            wait_secs = (next_run - now).total_seconds()
            logger.info(f"다음 재학습까지 {wait_secs/3600:.1f}시간 대기")
            await asyncio.sleep(wait_secs)
            await self.retrain()

Step 4: 테스트 통과 확인

pytest tests/test_retrainer.py -v

Expected: 2개 PASS

Step 5: Commit

git add src/retrainer.py tests/test_retrainer.py
git commit -m "feat: add daily retrainer with rollback support"

Task 8: bot.py에 ML 필터 통합

Files:

  • Modify: src/bot.py:1-10 (import 추가)
  • Modify: src/bot.py:11-22 (__init__ 수정)
  • Modify: src/bot.py:47-65 (process_candle 수정)
  • Modify: src/bot.py:153-160 (run 수정)

Step 1: src/bot.py import 추가

기존 import 블록 끝에 추가:

from src.ml_filter import MLFilter
from src.ml_features import build_features
from src.retrainer import Retrainer

Step 2: __init__에 MLFilter, Retrainer 추가

def __init__(self, config: Config):
    self.config = config
    self.exchange = BinanceFuturesClient(config)
    self.notifier = DiscordNotifier(config.discord_webhook_url)
    self.risk = RiskManager(config)
    self.ml_filter = MLFilter()                          # 추가
    self.retrainer = Retrainer(ml_filter=self.ml_filter) # 추가
    self.current_trade_side: str | None = None
    self.stream = KlineStream(
        symbol=config.symbol,
        interval="1m",
        on_candle=self._on_candle_closed,
    )

Step 3: process_candle에 ML 필터 적용

signal = ind.get_signal(df_with_indicators) 바로 아래에 추가:

        # ML 필터: 모델이 있을 때만 적용, 없으면 폴백(통과)
        if signal != "HOLD" and self.ml_filter.is_model_loaded():
            features = build_features(df_with_indicators, signal)
            if not self.ml_filter.should_enter(features):
                logger.info(f"ML 필터 차단: {signal} 신호 무시")
                signal = "HOLD"

Step 4: run에 재학습 스케줄러 추가

    async def run(self):
        logger.info(f"봇 시작: {self.config.symbol}, 레버리지 {self.config.leverage}x")
        await self._recover_position()
        asyncio.create_task(self.retrainer.schedule_daily(hour=3))  # 추가
        await self.stream.start(
            api_key=self.config.api_key,
            api_secret=self.config.api_secret,
        )

Step 5: 기존 bot 테스트 통과 확인

pytest tests/test_bot.py -v

Expected: 기존 테스트 모두 PASS (ML 필터는 모델 없으면 폴백)

Step 6: Commit

git add src/bot.py
git commit -m "feat: integrate ML filter into trading bot with fallback"

Task 9: 전체 테스트 실행 및 검증

Step 1: 전체 테스트 실행

pytest tests/ -v --tb=short

Expected: 모든 테스트 PASS

Step 2: 린트 확인

python -m py_compile src/ml_features.py src/ml_filter.py src/label_builder.py src/retrainer.py scripts/train_model.py scripts/fetch_history.py

Expected: 오류 없음

Step 3: 초기 학습 실행 (실제 데이터)

python scripts/fetch_history.py --days 90
python scripts/train_model.py

Expected: models/lgbm_filter.pkl 생성, AUC 출력

Step 4: 봇 시작 후 ML 필터 로그 확인

python main.py

Expected: 로그에 ML 필터 모델 로드 완료 메시지 출력

Step 5: Final Commit

git add -A
git commit -m "feat: complete ML filter integration with LightGBM"

파일 구조 최종 요약

cointrader/
├── src/
│   ├── ml_features.py     ← 피처 엔지니어링 (신규)
│   ├── ml_filter.py       ← LightGBM 필터 클래스 (신규)
│   ├── label_builder.py   ← TP/SL 레이블 생성 (신규)
│   ├── retrainer.py       ← 자동 재학습 스케줄러 (신규)
│   └── bot.py             ← ML 필터 통합 (수정)
├── scripts/
│   ├── fetch_history.py   ← 과거 데이터 수집 (신규)
│   └── train_model.py     ← LightGBM 학습 (신규)
├── tests/
│   ├── test_ml_features.py  (신규)
│   ├── test_ml_filter.py    (신규)
│   ├── test_label_builder.py (신규)
│   └── test_retrainer.py    (신규)
├── models/
│   ├── lgbm_filter.pkl      ← 현재 모델 (학습 후 생성)
│   ├── lgbm_filter_prev.pkl ← 롤백용
│   └── training_log.json    ← 재학습 이력
└── data/
    └── xrpusdt_1m.parquet   ← 과거 캔들 데이터