8 Commits

Author SHA1 Message Date
21in7
6fe2158511 feat: enhance precision optimization in model training
- Introduced a new plan to modify the Optuna objective function to prioritize precision under a recall constraint of 0.35, improving model performance in scenarios where false positives are costly.
- Updated training scripts to implement precision-based metrics and adjusted the walk-forward cross-validation process to incorporate precision and recall calculations.
- Enhanced the active LGBM parameters and training log to reflect the new metrics and model configurations.
- Added a new design document outlining the implementation steps for the precision-focused optimization.

This update aims to refine the model's decision-making process by emphasizing precision, thereby reducing potential losses from false positives.
2026-03-03 00:57:19 +09:00
21in7
3613e3bf18 feat: update active LGBM parameters and training log with new metrics
- Updated active LGBM parameters with new timestamp, trial results, and model configurations to reflect recent training outcomes.
- Added new entries to the training log, capturing detailed metrics including AUC, precision, recall, and tuned parameters for the latest model iterations.

This update enhances the tracking of model performance and parameter tuning in the ML pipeline.
2026-03-03 00:21:43 +09:00
21in7
fce4d536ea feat: implement HOLD negative sampling and stratified undersampling in ML pipeline
Added HOLD candles as negative samples to increase training data from ~535 to ~3,200 samples. Introduced a negative_ratio parameter in generate_dataset_vectorized() for sampling HOLD candles alongside signal candles. Implemented stratified undersampling to ensure signal samples are preserved during training. Updated relevant tests to validate new functionality and maintain compatibility with existing tests.

- Modified dataset_builder.py to include HOLD negative sampling logic
- Updated train_model.py to apply stratified undersampling
- Added tests for new sampling methods

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 00:13:42 +09:00
21in7
74966590b5 feat: apply stratified undersampling to hyperparameter tuning
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 00:09:43 +09:00
21in7
6cd54b46d9 feat: apply stratified undersampling to training pipeline
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 00:03:09 +09:00
21in7
0af138d8ee feat: add stratified_undersample helper function
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 23:58:15 +09:00
21in7
b7ad358a0a fix: make HOLD negative sampling tests non-vacuous
The two HOLD negative tests (test_hold_negative_labels_are_all_zero,
test_signal_samples_preserved_after_sampling) were passing vacuously
because sample_df produces 0 signal candles (ADX ~18, below threshold
25). Added signal_producing_df fixture with higher volatility and volume
surges to reliably generate signals. Removed if-guards so assertions
are mandatory. Also restored the full docstring for
generate_dataset_vectorized() documenting btc_df/eth_df,
time_weight_decay, and negative_ratio parameters.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 23:45:10 +09:00
21in7
8e56301d52 feat: add HOLD negative sampling to dataset_builder
Add negative_ratio parameter to generate_dataset_vectorized() that
samples HOLD candles as label=0 negatives alongside signal candles.
This increases training data from ~535 to ~3,200 samples when enabled.

- Split valid_rows into base_valid (shared) and sig_valid (signal-only)
- Add 'source' column ("signal" vs "hold_negative") for traceability
- HOLD samples get label=0 and random 50/50 side assignment
- Default negative_ratio=0 preserves backward compatibility
- Fix incorrect column count assertion in existing test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 23:34:45 +09:00
14 changed files with 2418 additions and 904 deletions

View File

@@ -24,29 +24,29 @@ CoinTrader는 **Binance Futures 자동매매 봇**입니다. 기술 지표 신
```mermaid
flowchart TD
subgraph 외부["외부 데이터 소스 (Binance)"]
WS1["Combined WebSocket\nXRP/BTC/ETH 15분봉 캔들"]
WS2["User Data Stream WebSocket\nORDER_TRADE_UPDATE 이벤트"]
REST["REST API\nOI·펀딩비·잔고·포지션 조회"]
WS1["Combined WebSocket<br/>XRP/BTC/ETH 15분봉 캔들"]
WS2["User Data Stream WebSocket<br/>ORDER_TRADE_UPDATE 이벤트"]
REST["REST API<br/>OI·펀딩비·잔고·포지션 조회"]
end
subgraph 실시간봇["실시간 봇 (bot.py — asyncio)"]
DS["data_stream.py\nMultiSymbolStream\n캔들 버퍼 (deque 200개)"]
IND["indicators.py\n기술 지표 계산\nRSI·MACD·BB·EMA·StochRSI·ATR"]
MF["ml_features.py\n23개 피처 추출\n(XRP 13 + BTC/ETH 8 + OI/FR 2)"]
ML["ml_filter.py\nMLFilter\nONNX 우선 / LightGBM 폴백\n확률 ≥ 0.60 시 진입 허용"]
RM["risk_manager.py\nRiskManager\n일일 손실 5% 한도\n동적 증거금 비율"]
EX["exchange.py\nBinanceFuturesClient\n주문·레버리지·잔고 API"]
UDS["user_data_stream.py\nUserDataStream\nTP/SL 즉시 감지"]
NT["notifier.py\nDiscordNotifier\n진입·청산·오류 알림"]
DS["data_stream.py<br/>MultiSymbolStream<br/>캔들 버퍼 (deque 200개)"]
IND["indicators.py<br/>기술 지표 계산<br/>RSI·MACD·BB·EMA·StochRSI·ATR·ADX"]
MF["ml_features.py<br/>23개 피처 추출<br/>(XRP 13 + BTC/ETH 8 + OI/FR 2)"]
ML["ml_filter.py<br/>MLFilter<br/>ONNX 우선 / LightGBM 폴백<br/>확률 ≥ 0.60 시 진입 허용"]
RM["risk_manager.py<br/>RiskManager<br/>일일 손실 5% 한도<br/>동적 증거금 비율"]
EX["exchange.py<br/>BinanceFuturesClient<br/>주문·레버리지·잔고 API"]
UDS["user_data_stream.py<br/>UserDataStream<br/>TP/SL 즉시 감지"]
NT["notifier.py<br/>DiscordNotifier<br/>진입·청산·오류 알림"]
end
subgraph mlops["MLOps 파이프라인 (맥미니 — 수동/크론)"]
FH["fetch_history.py\n과거 캔들 + OI/펀딩비\nParquet Upsert"]
DB["dataset_builder.py\n벡터화 데이터셋 생성\n레이블: ATR SL/TP 6시간 룩어헤드"]
TM["train_model.py\nLightGBM 학습\nWalk-Forward 5폴드 검증"]
TN["tune_hyperparams.py\nOptuna 50 trials\nTPE + MedianPruner"]
AP["active_lgbm_params.json\nActive Config 패턴\n승인된 파라미터 저장"]
DM["deploy_model.sh\nrsync → LXC 서버\n봇 핫리로드 트리거"]
FH["fetch_history.py<br/>과거 캔들 + OI/펀딩비<br/>Parquet Upsert"]
DB["dataset_builder.py<br/>벡터화 데이터셋 생성<br/>레이블: ATR SL/TP 6시간 룩어헤드"]
TM["train_model.py<br/>LightGBM 학습<br/>Walk-Forward 5폴드 검증"]
TN["tune_hyperparams.py<br/>Optuna 50 trials<br/>TPE + MedianPruner"]
AP["active_lgbm_params.json<br/>Active Config 패턴<br/>승인된 파라미터 저장"]
DM["deploy_model.sh<br/>rsync → LXC 서버<br/>봇 핫리로드 트리거"]
end
WS1 -->|캔들 마감 이벤트| DS
@@ -152,12 +152,16 @@ Combined WebSocket
| EMA | (9, 21, 50) | 추세 방향 (정배열/역배열) |
| Stochastic RSI | (14, 14, 3, 3) | 단기 과매수/과매도 |
| ATR | length=14 | 변동성 측정 → SL/TP 계산에 사용 |
| ADX | length=14 | 추세 강도 측정 → 횡보장 필터 (ADX < 25 시 진입 차단) |
| Volume MA | length=20 | 거래량 급증 감지 |
**신호 생성 로직 (가중치 합산):**
**신호 생성 로직 (ADX 필터 + 가중치 합산):**
```
롱 신호 점수:
[1단계] ADX 횡보장 필터:
ADX < 25 → 즉시 HOLD 반환 (추세 부재로 진입 차단)
[2단계] 롱 신호 점수:
RSI < 35 → +1
MACD 골든크로스 (전봉→현봉) → +2 ← 강한 신호
종가 < 볼린저 하단 → +1
@@ -313,13 +317,13 @@ RSI: 32.50 | MACD Hist: -0.000123 | ATR: 0.023400
```mermaid
flowchart LR
A["주말 수동 트리거\ntune_hyperparams.py\n(Optuna 50 trials, ~30분)"]
B["결과 검토\ntune_results_YYYYMMDD.json\nBest AUC vs Baseline 비교"]
C{"개선폭 충분?\n(AUC +0.01 이상\n폴드 분산 낮음)"}
D["active_lgbm_params.json\n업데이트\n(Active Config 패턴)"]
E["새벽 2시 크론탭\ntrain_and_deploy.sh\n(데이터 수집 → 학습 → 배포)"]
F["LXC 서버\nlgbm_filter.pkl 교체"]
G["봇 핫리로드\n다음 캔들 mtime 감지\n→ 자동 리로드"]
A["주말 수동 트리거<br/>tune_hyperparams.py<br/>(Optuna 50 trials, ~30분)"]
B["결과 검토<br/>tune_results_YYYYMMDD.json<br/>Best AUC vs Baseline 비교"]
C{"개선폭 충분?<br/>(AUC +0.01 이상<br/>폴드 분산 낮음)"}
D["active_lgbm_params.json<br/>업데이트<br/>(Active Config 패턴)"]
E["새벽 2시 크론탭<br/>train_and_deploy.sh<br/>(데이터 수집 → 학습 → 배포)"]
F["LXC 서버<br/>lgbm_filter.pkl 교체"]
G["봇 핫리로드<br/>다음 캔들 mtime 감지<br/>→ 자동 리로드"]
A --> B
B --> C
@@ -465,7 +469,7 @@ sequenceDiagram
BOT->>RM: is_trading_allowed() [일일 손실 한도 확인]
BOT->>IND: calculate_all(xrp_df) [지표 계산]
IND-->>BOT: df_with_indicators (RSI, MACD, BB, EMA, StochRSI, ATR)
IND-->>BOT: df_with_indicators (RSI, MACD, BB, EMA, StochRSI, ATR, ADX)
BOT->>IND: get_signal(df) [신호 생성]
IND-->>BOT: "LONG" | "SHORT" | "HOLD"
@@ -543,7 +547,7 @@ sequenceDiagram
### 5.1 테스트 파일 구성
`tests/` 폴더에 14개 테스트 파일, 총 **80개 이상의 테스트 케이스**가 작성되어 있습니다.
`tests/` 폴더에 12개 테스트 파일, 총 **81개의 테스트 케이스**가 작성되어 있습니다.
```bash
pytest tests/ -v # 전체 실행
@@ -554,22 +558,20 @@ bash scripts/run_tests.sh # 래퍼 스크립트 실행
| 테스트 파일 | 대상 모듈 | 테스트 케이스 | 주요 검증 항목 |
|------------|----------|:------------:|--------------|
| `test_bot.py` | `src/bot.py` | 10 | 반대 시그널 재진입 흐름, ML 차단 시 재진입 스킵, OI/펀딩비 피처 전달, OI 변화율 계산 |
| `test_indicators.py` | `src/indicators.py` | 4 | RSI 범위(0~100), MACD 컬럼 존재, 볼린저 밴드 상하단 대소관계, 신호 반환값 유효성 |
| `test_bot.py` | `src/bot.py` | 11 | 반대 시그널 재진입 흐름, ML 차단 시 재진입 스킵, OI/펀딩비 피처 전달, OI 변화율 계산 |
| `test_indicators.py` | `src/indicators.py` | 7 | RSI 범위(0~100), MACD 컬럼 존재, 볼린저 밴드 상하단 대소관계, 신호 반환값 유효성, ADX 컬럼 존재, ADX<25 횡보장 차단, ADX NaN 폴스루 |
| `test_ml_features.py` | `src/ml_features.py` | 11 | 23개 피처 수, BTC/ETH 포함 시 피처 수, RS 분모 0 처리, NaN 없음, side 인코딩, OI/펀딩비 파라미터 반영 |
| `test_ml_filter.py` | `src/ml_filter.py` | 5 | 모델 없을 때 폴백 허용, 임계값 이상/미만 판단, 핫리로드 후 상태 변화 |
| `test_risk_manager.py` | `src/risk_manager.py` | 8 | 일일 손실 한도 초과 차단, 최대 포지션 수 제한, 동적 증거금 비율 상한/하한 클램핑 |
| `test_exchange.py` | `src/exchange.py` | 8 | 수량 계산(기본/최소명목금액/잔고0), OI·펀딩비 조회 정상/오류 시 반환값 |
| `test_data_stream.py` | `src/data_stream.py` | 5 | 3심볼 버퍼 존재, 빈 버퍼 None 반환, 캔들 파싱, 마감 캔들 콜백 호출, 프리로드 200개 |
| `test_data_stream.py` | `src/data_stream.py` | 6 | 3심볼 버퍼 존재, 빈 버퍼 None 반환, 캔들 파싱, 마감 캔들 콜백 호출, 프리로드 200개 |
| `test_label_builder.py` | `src/label_builder.py` | 4 | LONG TP 도달 → 1, LONG SL 도달 → 0, 미결 → None, SHORT TP 도달 → 1 |
| `test_dataset_builder.py` | `src/dataset_builder.py` | 9 | DataFrame 반환, 필수 컬럼 존재, 레이블 이진값, BTC/ETH 포함 시 23개 피처, inf/NaN 없음, OI nan 마스킹, RS 분모 0 처리 |
| `test_mlx_filter.py` | `src/mlx_filter.py` | 5 | GPU 디바이스 확인, 학습 전 예측 형태, 학습 후 유효 확률, NaN 피처 처리, 저장/로드 후 동일 예측 |
| `test_fetch_history.py` | `scripts/fetch_history.py` | 5 | OI=0 구간 Upsert, 신규 행 추가, 기존 비0값 보존, 파일 없을 때 신규 반환, 타임스탬프 오름차순 정렬 |
| `test_config.py` | `src/config.py` | 2 | 환경변수 로드, 동적 증거금 파라미터 로드 |
| `test_database.py` | `src/database.py` | 2 | 거래 저장, 거래 청산 업데이트 (Notion API Mock) |
> `test_mlx_filter.py`는 Apple Silicon(`mlx` 패키지)이 없는 환경에서 자동 스킵됩니다.
> `test_database.py`는 현재 미사용 모듈(`src/database.py`)을 대상으로 하며, 실제 운영 경로와 무관합니다.
> `test_mlx_filter.py`는 Apple Silicon(`mlx` 패키지)이 없는 환경에서 자동 스킵됩니다.
### 5.3 커버리지 매트릭스
@@ -577,20 +579,21 @@ bash scripts/run_tests.sh # 래퍼 스크립트 실행
| 기능 | 단위 테스트 | 통합 수준 테스트 | 비고 |
|------|:----------:|:--------------:|------|
| 기술 지표 계산 (RSI/MACD/BB/EMA/StochRSI) | ✅ | ✅ | `test_indicators` + `test_ml_features` |
| 기술 지표 계산 (RSI/MACD/BB/EMA/StochRSI/ADX) | ✅ | ✅ | `test_indicators` + `test_ml_features` + `test_dataset_builder` |
| 신호 생성 (가중치 합산) | ✅ | ✅ | `test_indicators` + `test_dataset_builder` |
| ML 피처 추출 (23개) | ✅ | | `test_ml_features` |
| ADX 횡보장 필터 (ADX < 25 차단) | ✅ | | `test_indicators` + `test_dataset_builder` (`_calc_signals` 실제 호출) |
| ML 피처 추출 (23개) | ✅ | ✅ | `test_ml_features` + `test_dataset_builder` (`_calc_features_vectorized` 실제 호출) |
| ML 필터 추론 (임계값 판단) | ✅ | — | `test_ml_filter` |
| MLX 신경망 학습/저장/로드 | ✅ | — | `test_mlx_filter` (Apple Silicon 전용) |
| 레이블 생성 (SL/TP 룩어헤드) | ✅ | | `test_label_builder` |
| 레이블 생성 (SL/TP 룩어헤드) | ✅ | | `test_label_builder` + `test_dataset_builder` (전체 파이프라인 실제 호출) |
| 벡터화 데이터셋 빌더 | ✅ | ✅ | `test_dataset_builder` |
| 동적 증거금 비율 계산 | ✅ | — | `test_risk_manager` |
| 일일 손실 한도 제어 | ✅ | — | `test_risk_manager` |
| 포지션 수량 계산 | ✅ | — | `test_exchange` |
| OI/펀딩비 API 조회 (정상/오류) | ✅ | | `test_exchange` |
| OI/펀딩비 API 조회 (정상/오류) | ✅ | | `test_exchange` + `test_bot` (`process_candle` → OI/펀딩비 → `build_features` 전달) |
| 반대 시그널 재진입 흐름 | ✅ | ✅ | `test_bot` |
| ML 차단 시 재진입 스킵 | ✅ | | `test_bot` |
| OI 변화율 계산 (API 실패 폴백) | ✅ | | `test_bot` |
| ML 차단 시 재진입 스킵 | ✅ | | `test_bot` (`_close_and_reenter` → ML 판단 → 스킵 전체 흐름) |
| OI 변화율 계산 (API 실패 폴백) | ✅ | | `test_bot` (`process_candle` → OI 조회 → `_calc_oi_change` 흐름) |
| 캔들 버퍼 관리 및 프리로드 | ✅ | — | `test_data_stream` |
| Parquet Upsert (OI=0 보충) | ✅ | — | `test_fetch_history` |
| User Data Stream TP/SL 감지 | ❌ | — | 미작성 (실제 WebSocket 의존) |

View File

@@ -81,3 +81,36 @@ Environment variables via `.env` file (see `.env.example`). Key vars: `BINANCE_A
- **Docker**: `Dockerfile` (Python 3.12-slim) + `docker-compose.yml`
- **CI/CD**: Jenkins pipeline (Gitea → Docker registry → LXC production server)
- Models stored in `models/`, data cache in `data/`, logs in `logs/`
## Design & Implementation Plans
All design documents and implementation plans are stored in `docs/plans/` with the naming convention `YYYY-MM-DD-feature-name.md`. Design docs (`-design.md`) describe architecture decisions; implementation plans (`-plan.md`) contain step-by-step tasks for Claude to execute.
**Chronological plan history:**
| Date | Plan | Status |
|------|------|--------|
| 2026-03-01 | `xrp-futures-autotrader` | Completed |
| 2026-03-01 | `discord-notifier-and-position-recovery` | Completed |
| 2026-03-01 | `upload-to-gitea` | Completed |
| 2026-03-01 | `dockerfile-and-docker-compose` | Completed |
| 2026-03-01 | `fix-pandas-ta-python312` | Completed |
| 2026-03-01 | `jenkins-gitea-registry-cicd` | Completed |
| 2026-03-01 | `ml-filter-design` / `ml-filter-implementation` | Completed |
| 2026-03-01 | `train-on-mac-deploy-to-lxc` | Completed |
| 2026-03-01 | `m4-accelerated-training` | Completed |
| 2026-03-01 | `vectorized-dataset-builder` | Completed |
| 2026-03-01 | `btc-eth-correlation-features` (design + plan) | Completed |
| 2026-03-01 | `dynamic-margin-ratio` (design + plan) | Completed |
| 2026-03-01 | `lgbm-improvement` | Completed |
| 2026-03-01 | `15m-timeframe-upgrade` | Completed |
| 2026-03-01 | `oi-nan-epsilon-precision-threshold` | Completed |
| 2026-03-02 | `rs-divide-mlx-nan-fix` | Completed |
| 2026-03-02 | `reverse-signal-reenter` (design + plan) | Completed |
| 2026-03-02 | `realtime-oi-funding-features` | Completed |
| 2026-03-02 | `oi-funding-accumulation` | Completed |
| 2026-03-02 | `optuna-hyperparam-tuning` (design + plan) | Completed |
| 2026-03-02 | `user-data-stream-tp-sl-detection` (design + plan) | Completed |
| 2026-03-02 | `adx-filter-design` | Completed |
| 2026-03-02 | `hold-negative-sampling` (design + plan) | Completed |
| 2026-03-03 | `optuna-precision-objective-plan` | Pending |

View File

@@ -0,0 +1,91 @@
# HOLD Negative Sampling + Stratified Undersampling Design
## Problem
현재 ML 파이프라인의 학습 데이터가 535개로 매우 적음.
`dataset_builder.py`에서 시그널(LONG/SHORT) 발생 캔들만 라벨링하기 때문.
전체 ~35,000개 캔들 중 98.5%가 HOLD로 버려짐.
## Goal
- HOLD 캔들을 negative sample로 활용하여 학습 데이터 증가
- Train-Serve Skew 방지 (학습/추론 데이터 분포 일치)
- 기존 signal 샘플은 하나도 버리지 않는 계층적 샘플링
## Design
### 1. dataset_builder.py — HOLD Negative Sampling
**변경 위치**: `generate_dataset_vectorized()` (line 360-421)
**현재 로직**:
```python
valid_rows = (
(signal_arr != "HOLD") & # ← 시그널 캔들만 선택
...
)
```
**변경 로직**:
1. 기존 시그널 캔들(LONG/SHORT) 라벨링은 그대로 유지
2. HOLD 캔들 중 랜덤 샘플링 (시그널 수의 NEGATIVE_RATIO배)
3. HOLD 캔들: label=0, side=랜덤(50% LONG / 50% SHORT), signal_strength=0
4. `source` 컬럼 추가: "signal" | "hold_negative" (계층적 샘플링에 사용)
**파라미터**:
```python
NEGATIVE_RATIO = 5 # 시그널 대비 HOLD 샘플 비율
RANDOM_SEED = 42 # 재현성
```
**예상 데이터량**:
- 시그널: ~535개 (Win ~200, Loss ~335)
- HOLD negative: ~2,675개
- 총 학습 데이터: ~3,210개
### 2. train_model.py — Stratified Undersampling
**변경 위치**: `train()` 함수 내 언더샘플링 블록 (line 241-257)
**현재 로직**: 양성:음성 = 1:1 블라인드 언더샘플링
```python
if len(neg_idx) > len(pos_idx):
neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False)
```
**변경 로직**: 계층적 3-class 샘플링
```python
# 1. Signal 샘플(source="signal") 전수 유지 (Win + Loss 모두)
# 2. HOLD negative(source="hold_negative")에서만 샘플링
# → 양성(Win) 수와 동일한 수만큼 샘플링
# 최종: Win ~200 + Signal Loss ~335 + HOLD ~200 = ~735개
```
**효과**:
- Signal 샘플 보존율: 100% (Win/Loss 모두)
- HOLD negative: 적절한 양만 추가
- Train-Serve Skew 없음 (추론 시 signal_strength ≥ 3에서만 호출)
### 3. 런타임 (변경 없음)
- `bot.py`: 시그널 발생 시에만 ML 필터 호출 (기존 동일)
- `ml_filter.py`: `should_enter()` 그대로
- `ml_features.py`: `FEATURE_COLS` 그대로
- `label_builder.py`: 기존 SL/TP 룩어헤드 로직 그대로
## Test Cases
### 필수 테스트
1. **HOLD negative label 검증**: HOLD negative 샘플의 label이 전부 0인지 확인
2. **Signal 보존 검증**: 계층적 샘플링 후 source="signal" 샘플이 하나도 버려지지 않았는지 확인
### 기존 테스트 호환성
- 기존 dataset_builder 관련 테스트가 깨지지 않도록 보장
## File Changes
| File | Change |
|------|--------|
| `src/dataset_builder.py` | HOLD negative sampling, source 컬럼 추가 |
| `scripts/train_model.py` | 계층적 샘플링으로 교체 |
| `tests/test_dataset_builder.py` (or equivalent) | 2개 테스트 케이스 추가 |

View File

@@ -0,0 +1,432 @@
# HOLD Negative Sampling Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** HOLD 캔들을 negative sample로 추가하고 계층적 언더샘플링을 도입하여 ML 학습 데이터를 535 → ~3,200개로 증가시킨다.
**Architecture:** `dataset_builder.py`에서 시그널 캔들 외에 HOLD 캔들을 label=0으로 추가 샘플링하고, `source` 컬럼("signal"/"hold_negative")으로 구분한다. 학습 시 signal 샘플은 전수 유지, HOLD negative에서만 양성 수 만큼 샘플링하는 계층적 언더샘플링을 적용한다.
**Tech Stack:** Python, NumPy, pandas, LightGBM, pytest
---
### Task 1: dataset_builder.py — HOLD Negative Sampling 추가
**Files:**
- Modify: `src/dataset_builder.py:360-421` (generate_dataset_vectorized 함수)
- Test: `tests/test_dataset_builder.py`
**Step 1: Write the failing tests**
`tests/test_dataset_builder.py` 끝에 2개 테스트 추가:
```python
def test_hold_negative_labels_are_all_zero(sample_df):
"""HOLD negative 샘플의 label은 전부 0이어야 한다."""
result = generate_dataset_vectorized(sample_df, negative_ratio=3)
if len(result) > 0 and "source" in result.columns:
hold_neg = result[result["source"] == "hold_negative"]
if len(hold_neg) > 0:
assert (hold_neg["label"] == 0).all(), \
f"HOLD negative 중 label != 0인 샘플 존재: {hold_neg['label'].value_counts().to_dict()}"
def test_signal_samples_preserved_after_sampling(sample_df):
"""계층적 샘플링 후 source='signal' 샘플이 하나도 버려지지 않아야 한다."""
# negative_ratio=0이면 기존 동작 (signal만), >0이면 HOLD 추가
result_signal_only = generate_dataset_vectorized(sample_df, negative_ratio=0)
result_with_hold = generate_dataset_vectorized(sample_df, negative_ratio=3)
if len(result_with_hold) > 0 and "source" in result_with_hold.columns:
signal_count = (result_with_hold["source"] == "signal").sum()
assert signal_count == len(result_signal_only), \
f"Signal 샘플 손실: 원본={len(result_signal_only)}, 유지={signal_count}"
```
**Step 2: Run tests to verify they fail**
Run: `pytest tests/test_dataset_builder.py::test_hold_negative_labels_are_all_zero tests/test_dataset_builder.py::test_signal_samples_preserved_after_sampling -v`
Expected: FAIL — `generate_dataset_vectorized()` does not accept `negative_ratio` parameter
**Step 3: Implement HOLD negative sampling in generate_dataset_vectorized**
`src/dataset_builder.py``generate_dataset_vectorized()` 함수를 수정한다.
시그니처에 `negative_ratio: int = 0` 파라미터를 추가하고, HOLD 캔들 샘플링 로직을 삽입한다.
수정 대상: `generate_dataset_vectorized` 함수 전체.
```python
def generate_dataset_vectorized(
df: pd.DataFrame,
btc_df: pd.DataFrame | None = None,
eth_df: pd.DataFrame | None = None,
time_weight_decay: float = 0.0,
negative_ratio: int = 0,
) -> pd.DataFrame:
"""
전체 시계열을 1회 계산해 학습 데이터셋을 생성한다.
negative_ratio: 시그널 샘플 대비 HOLD negative 샘플 비율.
0이면 기존 동작 (시그널만). 5면 시그널의 5배만큼 HOLD 샘플 추가.
"""
print(" [1/3] 전체 시계열 지표 계산 (1회)...")
d = _calc_indicators(df)
print(" [2/3] 신호 마스킹 및 피처 추출...")
signal_arr = _calc_signals(d)
feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
# 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만
OPTIONAL_COLS = {"oi_change", "funding_rate"}
available_cols_for_nan_check = [
c for c in FEATURE_COLS
if c in feat_all.columns and c not in OPTIONAL_COLS
]
base_valid = (
(~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) &
(np.arange(len(d)) >= WARMUP) &
(np.arange(len(d)) < len(d) - LOOKAHEAD)
)
# --- 시그널 캔들 (기존 로직) ---
sig_valid = base_valid & (signal_arr != "HOLD")
sig_idx = np.where(sig_valid)[0]
print(f" 신호 발생 인덱스: {len(sig_idx):,}")
print(" [3/3] 레이블 계산...")
labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx)
final_sig_idx = sig_idx[valid_mask]
available_feature_cols = [c for c in FEATURE_COLS if c in feat_all.columns]
feat_signal = feat_all.iloc[final_sig_idx][available_feature_cols].copy()
feat_signal["label"] = labels
feat_signal["source"] = "signal"
# --- HOLD negative 캔들 ---
if negative_ratio > 0 and len(final_sig_idx) > 0:
hold_valid = base_valid & (signal_arr == "HOLD")
hold_candidates = np.where(hold_valid)[0]
n_neg = min(len(hold_candidates), len(final_sig_idx) * negative_ratio)
if n_neg > 0:
rng = np.random.default_rng(42)
hold_idx = rng.choice(hold_candidates, size=n_neg, replace=False)
hold_idx = np.sort(hold_idx)
feat_hold = feat_all.iloc[hold_idx][available_feature_cols].copy()
feat_hold["label"] = 0
feat_hold["source"] = "hold_negative"
# HOLD 캔들은 시그널이 없으므로 side를 랜덤 할당 (50:50)
sides = rng.integers(0, 2, size=len(feat_hold)).astype(np.float32)
feat_hold["side"] = sides
# signal_strength는 이미 0 (시그널 미발생이므로)
print(f" HOLD negative 추가: {len(feat_hold):,}"
f"(비율 1:{negative_ratio})")
feat_final = pd.concat([feat_signal, feat_hold], ignore_index=True)
# 시간 순서 복원 (원본 인덱스 기반 정렬)
original_order = np.concatenate([final_sig_idx, hold_idx])
sort_order = np.argsort(original_order)
feat_final = feat_final.iloc[sort_order].reset_index(drop=True)
else:
feat_final = feat_signal.reset_index(drop=True)
else:
feat_final = feat_signal.reset_index(drop=True)
# 시간 가중치
n = len(feat_final)
if time_weight_decay > 0 and n > 1:
weights = np.exp(time_weight_decay * np.linspace(0.0, 1.0, n)).astype(np.float32)
weights /= weights.mean()
print(f" 시간 가중치 적용 (decay={time_weight_decay}): "
f"min={weights.min():.3f}, max={weights.max():.3f}")
else:
weights = np.ones(n, dtype=np.float32)
feat_final["sample_weight"] = weights
total_sig = (feat_final["source"] == "signal").sum() if "source" in feat_final.columns else len(feat_final)
total_hold = (feat_final["source"] == "hold_negative").sum() if "source" in feat_final.columns else 0
print(f" 최종 데이터셋: {n:,}개 (시그널={total_sig:,}, HOLD={total_hold:,})")
return feat_final
```
**Step 4: Run the new tests to verify they pass**
Run: `pytest tests/test_dataset_builder.py::test_hold_negative_labels_are_all_zero tests/test_dataset_builder.py::test_signal_samples_preserved_after_sampling -v`
Expected: PASS
**Step 5: Run all existing dataset_builder tests to verify no regressions**
Run: `pytest tests/test_dataset_builder.py -v`
Expected: All existing tests PASS (기존 동작은 negative_ratio=0 기본값으로 유지)
**Step 6: Commit**
```bash
git add src/dataset_builder.py tests/test_dataset_builder.py
git commit -m "feat: add HOLD negative sampling to dataset builder"
```
---
### Task 2: 계층적 언더샘플링 헬퍼 함수
**Files:**
- Modify: `src/dataset_builder.py` (파일 끝에 헬퍼 추가)
- Test: `tests/test_dataset_builder.py`
**Step 1: Write the failing test**
```python
def test_stratified_undersample_preserves_signal():
"""stratified_undersample은 signal 샘플을 전수 유지해야 한다."""
from src.dataset_builder import stratified_undersample
y = np.array([1, 0, 0, 0, 0, 0, 0, 0, 1, 0])
source = np.array(["signal", "signal", "signal", "hold_negative",
"hold_negative", "hold_negative", "hold_negative",
"hold_negative", "signal", "signal"])
idx = stratified_undersample(y, source, seed=42)
# signal 인덱스: 0, 1, 2, 8, 9 → 전부 포함
signal_indices = np.where(source == "signal")[0]
for si in signal_indices:
assert si in idx, f"signal 인덱스 {si}가 누락됨"
```
**Step 2: Run test to verify it fails**
Run: `pytest tests/test_dataset_builder.py::test_stratified_undersample_preserves_signal -v`
Expected: FAIL — `stratified_undersample` 함수 미존재
**Step 3: Implement stratified_undersample**
`src/dataset_builder.py` 끝에 추가:
```python
def stratified_undersample(
y: np.ndarray,
source: np.ndarray,
seed: int = 42,
) -> np.ndarray:
"""Signal 샘플 전수 유지 + HOLD negative만 양성 수 만큼 샘플링.
Args:
y: 라벨 배열 (0 or 1)
source: 소스 배열 ("signal" or "hold_negative")
seed: 랜덤 시드
Returns:
정렬된 인덱스 배열 (학습에 사용할 행 인덱스)
"""
pos_idx = np.where(y == 1)[0] # Signal Win
sig_neg_idx = np.where((y == 0) & (source == "signal"))[0] # Signal Loss
hold_neg_idx = np.where(source == "hold_negative")[0] # HOLD negative
# HOLD negative에서 양성 수 만큼만 샘플링
n_hold = min(len(hold_neg_idx), len(pos_idx))
rng = np.random.default_rng(seed)
if n_hold > 0:
hold_sampled = rng.choice(hold_neg_idx, size=n_hold, replace=False)
else:
hold_sampled = np.array([], dtype=np.intp)
return np.sort(np.concatenate([pos_idx, sig_neg_idx, hold_sampled]))
```
**Step 4: Run tests**
Run: `pytest tests/test_dataset_builder.py::test_stratified_undersample_preserves_signal -v`
Expected: PASS
**Step 5: Commit**
```bash
git add src/dataset_builder.py tests/test_dataset_builder.py
git commit -m "feat: add stratified_undersample helper function"
```
---
### Task 3: train_model.py — 계층적 언더샘플링 적용
**Files:**
- Modify: `scripts/train_model.py:229-257` (train 함수)
- Modify: `scripts/train_model.py:356-391` (walk_forward_auc 함수)
**Step 1: Update train() function**
`scripts/train_model.py`에서 `dataset_builder`에서 `stratified_undersample`을 import하고,
`train()` 함수의 언더샘플링 블록을 교체한다.
import 수정 (line 25):
```python
from src.dataset_builder import generate_dataset_vectorized, stratified_undersample
```
`train()` 함수에서 데이터셋 생성 호출에 `negative_ratio=5` 추가 (line 217):
```python
dataset = generate_dataset_vectorized(
df, btc_df=btc_df, eth_df=eth_df,
time_weight_decay=time_weight_decay,
negative_ratio=5,
)
```
source 배열 추출 추가 (line 231 부근, w 다음):
```python
source = dataset["source"].values if "source" in dataset.columns else np.full(len(X), "signal")
```
언더샘플링 블록 교체 (line 241-257):
```python
# --- 계층적 샘플링: signal 전수 유지, HOLD negative만 양성 수 만큼 ---
source_train = source[:split]
balanced_idx = stratified_undersample(y_train.values, source_train, seed=42)
X_train = X_train.iloc[balanced_idx]
y_train = y_train.iloc[balanced_idx]
w_train = w_train[balanced_idx]
sig_count = (source_train[balanced_idx] == "signal").sum()
hold_count = (source_train[balanced_idx] == "hold_negative").sum()
print(f"\n계층적 샘플링 후 학습 데이터: {len(X_train)}"
f"(Signal={sig_count}, HOLD={hold_count}, "
f"양성={int(y_train.sum())}, 음성={int((y_train==0).sum())})")
print(f"검증 데이터: {len(X_val)}개 (양성={int(y_val.sum())}, 음성={int((y_val==0).sum())})")
```
**Step 2: Update walk_forward_auc() function**
`walk_forward_auc()` 함수에서도 동일하게 적용.
dataset 생성 (line 356-358)에 `negative_ratio=5` 추가:
```python
dataset = generate_dataset_vectorized(
df, btc_df=btc_df, eth_df=eth_df,
time_weight_decay=time_weight_decay,
negative_ratio=5,
)
```
source 배열 추출 (line 362 부근):
```python
source = dataset["source"].values if "source" in dataset.columns else np.full(n, "signal")
```
폴드 내 언더샘플링 교체 (line 381-386):
```python
source_tr = source[:tr_end]
bal_idx = stratified_undersample(y_tr, source_tr, seed=42)
```
**Step 3: Run training to verify**
Run: `python scripts/train_model.py --data data/combined_15m.parquet --decay 2.0`
Expected: 학습 샘플 수 대폭 증가 확인 (기존 ~535 → ~3,200)
**Step 4: Commit**
```bash
git add scripts/train_model.py
git commit -m "feat: apply stratified undersampling to training pipeline"
```
---
### Task 4: tune_hyperparams.py — 계층적 언더샘플링 적용
**Files:**
- Modify: `scripts/tune_hyperparams.py:41-81` (load_dataset)
- Modify: `scripts/tune_hyperparams.py:88-144` (_walk_forward_cv)
- Modify: `scripts/tune_hyperparams.py:151-206` (make_objective)
- Modify: `scripts/tune_hyperparams.py:213-244` (measure_baseline)
- Modify: `scripts/tune_hyperparams.py:370-449` (main)
**Step 1: Update load_dataset to return source**
import 수정 (line 34):
```python
from src.dataset_builder import generate_dataset_vectorized, stratified_undersample
```
`load_dataset()` 시그니처와 반환값 수정:
```python
def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
```
dataset 생성에 `negative_ratio=5` 추가 (line 66):
```python
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0, negative_ratio=5)
```
source 추출 추가 (line 74 부근, w 다음):
```python
source = dataset["source"].values if "source" in dataset.columns else np.full(len(dataset), "signal")
```
return 수정:
```python
return X, y, w, source
```
**Step 2: Update _walk_forward_cv to accept and use source**
시그니처에 source 추가:
```python
def _walk_forward_cv(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
source: np.ndarray,
params: dict,
...
```
폴드 내 언더샘플링 교체 (line 117-122):
```python
source_tr = source[:tr_end]
bal_idx = stratified_undersample(y_tr, source_tr, seed=42)
```
**Step 3: Update make_objective, measure_baseline, main**
`make_objective()`: 클로저에 source 캡처, `_walk_forward_cv` 호출에 source 전달
`measure_baseline()`: source 파라미터 추가, `_walk_forward_cv` 호출에 전달
`main()`: `load_dataset` 반환값 4개로 변경, 하위 함수에 source 전달
**Step 4: Commit**
```bash
git add scripts/tune_hyperparams.py
git commit -m "feat: apply stratified undersampling to hyperparameter tuning"
```
---
### Task 5: 전체 테스트 실행 및 검증
**Step 1: Run full test suite**
Run: `bash scripts/run_tests.sh`
Expected: All tests PASS
**Step 2: Run training pipeline end-to-end**
Run: `python scripts/train_model.py --data data/combined_15m.parquet --decay 2.0`
Expected:
- 학습 샘플 ~3,200개 (기존 535)
- "계층적 샘플링 후" 로그에 Signal/HOLD 카운트 표시
- AUC 출력 (값 자체보다 실행 완료가 중요)
**Step 3: Commit final state**
```bash
git add -A
git commit -m "chore: verify HOLD negative sampling pipeline end-to-end"
```

View File

@@ -0,0 +1,80 @@
# Optuna 목적함수를 Precision 중심으로 변경
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 현재 ROC-AUC만 최적화하는 Optuna objective를 **recall >= 0.35 제약 하에서 precision을 최대화**하는 방향으로 변경한다. AUC는 threshold-independent 지표라 실제 운용 시점의 성능(precision)을 반영하지 못하며, 오탐(false positive = 잘못된 진입)이 실제 손실을 발생시키므로 precision 우선 최적화가 필요하다.
**Tech Stack:** Python, LightGBM, Optuna, scikit-learn
---
## 변경 파일
- `scripts/tune_hyperparams.py` (유일한 변경 대상)
---
## 구현 단계
### 1. `_find_best_precision_at_recall` 헬퍼 함수 추가
- `sklearn.metrics.precision_recall_curve`로 recall >= min_recall 조건의 최대 precision과 threshold 반환
- 조건 불만족 시 `(0.0, 0.0, 0.50)` fallback
- train_model.py:277-292와 동일한 로직
### 2. `_walk_forward_cv` 수정
- 기존 반환: `(mean_auc, fold_aucs)` → 신규: `(mean_score, details_dict)`
- `details_dict` 키: `fold_aucs`, `fold_precisions`, `fold_recalls`, `fold_thresholds`, `fold_n_pos`, `mean_auc`, `mean_precision`, `mean_recall`
- **Score 공식**: `precision + auc * 0.001` (AUC는 precision 동률 시 tiebreaker)
- fold 내 양성 < 3개면 해당 fold precision=0.0으로 처리, 평균 계산에서 제외
- 인자 추가: `min_recall: float = 0.35`
- import 추가: `from sklearn.metrics import precision_recall_curve`
- Pruning: 양성 충분한 fold만 report하여 false pruning 방지
### 3. `make_objective` 수정
- `min_recall` 인자 추가 → `_walk_forward_cv`에 전달
- `trial.set_user_attr`로 precision/recall/threshold/n_pos 등 저장
- 반환값: `mean_score` (precision + auc * 0.001)
### 4. `measure_baseline` 수정
- `min_recall` 인자 추가
- 반환값을 `(mean_score, details_dict)` 형태로 변경
### 5. `--min-recall` CLI 인자 추가
- `parser.add_argument("--min-recall", type=float, default=0.35)`
- `make_objective``measure_baseline`에 전달
### 6. `print_report` 수정
- Best Score, Precision, AUC 모두 표시
- 폴드별 AUC + Precision + Recall + Threshold + 양성수 표시
- Baseline과 비교 시 precision 기준 개선폭 표시
### 7. `save_results` 수정
- JSON에 `min_recall_constraint`, precision/recall/threshold 필드 추가
- `best_trial``score`, `precision`, `recall`, `threshold`, `fold_precisions`, `fold_recalls`, `fold_thresholds`, `fold_n_pos` 추가
- `best_trial.params` 구조는 그대로 유지 (하위호환)
### 8. 비교 로직 및 기타 수정
- line 440: `study.best_value > baseline_auc``study.best_value > baseline_score`
- `study_name`: `"lgbm_wf_auc"``"lgbm_wf_precision"`
- progress callback: Precision과 AUC 동시 표시
- `n_warmup_steps` 2 → 3 (precision이 AUC보다 노이즈가 크므로)
---
## 검증 방법
```bash
# 기본 실행 (min_recall=0.35)
python scripts/tune_hyperparams.py --trials 10 --folds 3
# min_recall 조절
python scripts/tune_hyperparams.py --trials 10 --min-recall 0.4
# 기존 테스트 통과 확인
bash scripts/run_tests.sh
```
확인 포인트:
- 폴드별 precision/recall/threshold가 리포트에 표시되는지
- recall >= min_recall 제약이 올바르게 동작하는지
- active_lgbm_params.json이 precision 기준으로 갱신되는지
- train_model.py가 새 JSON 포맷을 기존과 동일하게 읽는지

File diff suppressed because it is too large Load Diff

View File

@@ -326,5 +326,105 @@
"reg_lambda": 0.000157
},
"weight_scale": 1.783105
},
{
"date": "2026-03-03T00:12:17.351458",
"backend": "lgbm",
"auc": 0.949,
"best_threshold": 0.42,
"best_precision": 0.56,
"best_recall": 0.538,
"samples": 1524,
"features": 23,
"time_weight_decay": 0.5,
"model_path": "models/lgbm_filter.pkl",
"tuned_params_path": null,
"lgbm_params": {
"n_estimators": 434,
"learning_rate": 0.123659,
"max_depth": 6,
"num_leaves": 14,
"min_child_samples": 10,
"subsample": 0.929062,
"colsample_bytree": 0.94633,
"reg_alpha": 0.573971,
"reg_lambda": 0.000157
},
"weight_scale": 1.783105
},
{
"date": "2026-03-03T00:13:56.456518",
"backend": "lgbm",
"auc": 0.9439,
"best_threshold": 0.6558,
"best_precision": 0.667,
"best_recall": 0.154,
"samples": 1524,
"features": 23,
"time_weight_decay": 2.0,
"model_path": "models/lgbm_filter.pkl",
"tuned_params_path": null,
"lgbm_params": {
"n_estimators": 434,
"learning_rate": 0.123659,
"max_depth": 6,
"num_leaves": 14,
"min_child_samples": 10,
"subsample": 0.929062,
"colsample_bytree": 0.94633,
"reg_alpha": 0.573971,
"reg_lambda": 0.000157
},
"weight_scale": 1.783105
},
{
"date": "2026-03-03T00:20:43.712971",
"backend": "lgbm",
"auc": 0.9473,
"best_threshold": 0.3015,
"best_precision": 0.465,
"best_recall": 0.769,
"samples": 1524,
"features": 23,
"time_weight_decay": 0.5,
"model_path": "models/lgbm_filter.pkl",
"tuned_params_path": "models/active_lgbm_params.json",
"lgbm_params": {
"n_estimators": 195,
"learning_rate": 0.033934,
"max_depth": 3,
"num_leaves": 7,
"min_child_samples": 11,
"subsample": 0.998659,
"colsample_bytree": 0.837233,
"reg_alpha": 0.007008,
"reg_lambda": 0.80039
},
"weight_scale": 0.718348
},
{
"date": "2026-03-03T00:39:05.427160",
"backend": "lgbm",
"auc": 0.9436,
"best_threshold": 0.3041,
"best_precision": 0.467,
"best_recall": 0.269,
"samples": 1524,
"features": 23,
"time_weight_decay": 0.5,
"model_path": "models/lgbm_filter.pkl",
"tuned_params_path": "models/active_lgbm_params.json",
"lgbm_params": {
"n_estimators": 221,
"learning_rate": 0.031072,
"max_depth": 5,
"num_leaves": 20,
"min_child_samples": 39,
"subsample": 0.83244,
"colsample_bytree": 0.526349,
"reg_alpha": 0.062177,
"reg_lambda": 0.082872
},
"weight_scale": 1.431662
}
]

View File

@@ -21,6 +21,5 @@ fi
cd "$PROJECT_ROOT"
python -m pytest tests/ \
--ignore=tests/test_database.py \
-v \
"$@"

View File

@@ -17,12 +17,12 @@ import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.metrics import roc_auc_score, classification_report, precision_recall_curve
from src.indicators import Indicators
from src.ml_features import build_features, FEATURE_COLS
from src.label_builder import build_labels
from src.dataset_builder import generate_dataset_vectorized
from src.dataset_builder import generate_dataset_vectorized, stratified_undersample
def _cgroup_cpu_count() -> int:
"""cgroup v1/v2 쿼터를 읽어 실제 할당된 CPU 수를 반환한다.
@@ -214,7 +214,11 @@ def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str
df = df_raw[base_cols].copy()
print("데이터셋 생성 중...")
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay)
dataset = generate_dataset_vectorized(
df, btc_df=btc_df, eth_df=eth_df,
time_weight_decay=time_weight_decay,
negative_ratio=5,
)
if dataset.empty or "label" not in dataset.columns:
raise ValueError(f"데이터셋 생성 실패: 샘플 0개. 위 오류 메시지를 확인하세요.")
@@ -229,6 +233,7 @@ def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str
X = dataset[actual_feature_cols]
y = dataset["label"]
w = dataset["sample_weight"].values
source = dataset["source"].values if "source" in dataset.columns else np.full(len(X), "signal")
split = int(len(X) * 0.8)
X_train, X_val = X.iloc[:split], X.iloc[split:]
@@ -238,21 +243,19 @@ def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str
lgbm_params, weight_scale = _load_lgbm_params(tuned_params_path)
w_train = (w[:split] * weight_scale).astype(np.float32)
# --- 클래스 불균형 처리: 언더샘플링 (시간 가중치 인덱스 보존) ---
pos_idx = np.where(y_train == 1)[0]
neg_idx = np.where(y_train == 0)[0]
if len(neg_idx) > len(pos_idx):
np.random.seed(42)
neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False)
balanced_idx = np.sort(np.concatenate([pos_idx, neg_idx])) # 시간 순서 유지
# --- 계층적 샘플링: signal 전수 유지, HOLD negative만 양성 수 만큼 ---
source_train = source[:split]
balanced_idx = stratified_undersample(y_train.values, source_train, seed=42)
X_train = X_train.iloc[balanced_idx]
y_train = y_train.iloc[balanced_idx]
w_train = w_train[balanced_idx]
print(f"\n언더샘플링 후 학습 데이터: {len(X_train)}개 (양성={y_train.sum()}, 음성={(y_train==0).sum()})")
sig_count = (source_train[balanced_idx] == "signal").sum()
hold_count = (source_train[balanced_idx] == "hold_negative").sum()
print(f"\n계층적 샘플링 후 학습 데이터: {len(X_train)}"
f"(Signal={sig_count}, HOLD={hold_count}, "
f"양성={int(y_train.sum())}, 음성={int((y_train==0).sum())})")
print(f"검증 데이터: {len(X_val)}개 (양성={int(y_val.sum())}, 음성={int((y_val==0).sum())})")
# ---------------------------------------------------------------
@@ -272,7 +275,6 @@ def train(data_path: str, time_weight_decay: float = 2.0, tuned_params_path: str
auc = roc_auc_score(y_val, val_proba)
# 최적 임계값 탐색: 최소 재현율(0.15) 조건부 정밀도 최대화
from sklearn.metrics import precision_recall_curve
precisions, recalls, thresholds = precision_recall_curve(y_val, val_proba)
# precision_recall_curve의 마지막 원소는 (1.0, 0.0)이므로 제외
precisions, recalls = precisions[:-1], recalls[:-1]
@@ -354,13 +356,16 @@ def walk_forward_auc(
df = df_raw[base_cols].copy()
dataset = generate_dataset_vectorized(
df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay
df, btc_df=btc_df, eth_df=eth_df,
time_weight_decay=time_weight_decay,
negative_ratio=5,
)
actual_feature_cols = [c for c in FEATURE_COLS if c in dataset.columns]
X = dataset[actual_feature_cols].values
y = dataset["label"].values
w = dataset["sample_weight"].values
n = len(dataset)
source = dataset["source"].values if "source" in dataset.columns else np.full(n, "signal")
lgbm_params, weight_scale = _load_lgbm_params(tuned_params_path)
w = (w * weight_scale).astype(np.float32)
@@ -369,6 +374,7 @@ def walk_forward_auc(
train_end_start = int(n * train_ratio)
aucs = []
fold_metrics = []
for i in range(n_splits):
tr_end = train_end_start + i * step
val_end = tr_end + step
@@ -378,12 +384,8 @@ def walk_forward_auc(
X_tr, y_tr, w_tr = X[:tr_end], y[:tr_end], w[:tr_end]
X_val, y_val = X[tr_end:val_end], y[tr_end:val_end]
pos_idx = np.where(y_tr == 1)[0]
neg_idx = np.where(y_tr == 0)[0]
if len(neg_idx) > len(pos_idx):
np.random.seed(42)
neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False)
idx = np.sort(np.concatenate([pos_idx, neg_idx]))
source_tr = source[:tr_end]
idx = stratified_undersample(y_tr, source_tr, seed=42)
model = lgb.LGBMClassifier(**lgbm_params, random_state=42, verbose=-1)
with warnings.catch_warnings():
@@ -393,12 +395,30 @@ def walk_forward_auc(
proba = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, proba) if len(np.unique(y_val)) > 1 else 0.5
aucs.append(auc)
# 폴드별 최적 임계값 (recall >= 0.15 조건부 precision 최대화)
MIN_RECALL = 0.15
precs, recs, thrs = precision_recall_curve(y_val, proba)
precs, recs = precs[:-1], recs[:-1]
valid_idx = np.where(recs >= MIN_RECALL)[0]
if len(valid_idx) > 0:
best_i = valid_idx[np.argmax(precs[valid_idx])]
f_thr, f_prec, f_rec = float(thrs[best_i]), float(precs[best_i]), float(recs[best_i])
else:
f_thr, f_prec, f_rec = 0.50, 0.0, 0.0
fold_metrics.append({"auc": auc, "precision": f_prec, "recall": f_rec, "threshold": f_thr})
print(
f" 폴드 {i+1}/{n_splits}: 학습={tr_end}개, "
f"검증={tr_end}~{val_end} ({step}개), AUC={auc:.4f}"
f"검증={tr_end}~{val_end} ({step}개), AUC={auc:.4f} | "
f"Thr={f_thr:.4f} Prec={f_prec:.3f} Rec={f_rec:.3f}"
)
mean_prec = np.mean([m["precision"] for m in fold_metrics])
mean_rec = np.mean([m["recall"] for m in fold_metrics])
mean_thr = np.mean([m["threshold"] for m in fold_metrics])
print(f"\n Walk-Forward 평균 AUC: {np.mean(aucs):.4f} ± {np.std(aucs):.4f}")
print(f" 평균 Precision: {mean_prec:.3f} | 평균 Recall: {mean_rec:.3f} | 평균 Threshold: {mean_thr:.4f}")
print(f" 폴드별: {[round(a, 4) for a in aucs]}")

View File

@@ -7,6 +7,7 @@ Optuna를 사용한 LightGBM 하이퍼파라미터 자동 탐색.
python scripts/tune_hyperparams.py --trials 10 --folds 3 # 빠른 테스트
python scripts/tune_hyperparams.py --data data/combined_15m.parquet --trials 100
python scripts/tune_hyperparams.py --no-baseline # 베이스라인 측정 건너뜀
python scripts/tune_hyperparams.py --min-recall 0.4 # 최소 재현율 제약 조정
결과:
- 콘솔: Best Params + Walk-Forward 리포트
@@ -28,17 +29,17 @@ import lightgbm as lgb
import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_auc_score, precision_recall_curve
from src.ml_features import FEATURE_COLS
from src.dataset_builder import generate_dataset_vectorized
from src.dataset_builder import generate_dataset_vectorized, stratified_undersample
# ──────────────────────────────────────────────
# 데이터 로드 및 데이터셋 생성 (1회 캐싱)
# ──────────────────────────────────────────────
def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
parquet 로드 → 벡터화 데이터셋 생성 → (X, y, w) numpy 배열 반환.
study 시작 전 1회만 호출하여 모든 trial이 공유한다.
@@ -63,7 +64,7 @@ def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
df = df_raw[base_cols].copy()
print("\n데이터셋 생성 중 (1회만 실행)...")
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0)
dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=0.0, negative_ratio=5)
if dataset.empty or "label" not in dataset.columns:
raise ValueError("데이터셋 생성 실패: 샘플 0개")
@@ -72,13 +73,45 @@ def load_dataset(data_path: str) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
X = dataset[actual_feature_cols].values.astype(np.float32)
y = dataset["label"].values.astype(np.int8)
w = dataset["sample_weight"].values.astype(np.float32)
source = dataset["source"].values if "source" in dataset.columns else np.full(len(dataset), "signal")
pos = int(y.sum())
neg = int((y == 0).sum())
print(f"데이터셋 완성: {len(dataset):,}개 샘플 (양성={pos}, 음성={neg})")
print(f"사용 피처: {len(actual_feature_cols)}\n")
return X, y, w
return X, y, w, source
# ──────────────────────────────────────────────
# Precision 헬퍼
# ──────────────────────────────────────────────
def _find_best_precision_at_recall(
y_true: np.ndarray,
proba: np.ndarray,
min_recall: float = 0.35,
) -> tuple[float, float, float]:
"""
precision_recall_curve에서 recall >= min_recall 조건을 만족하는
최대 precision과 해당 threshold를 반환한다.
Returns:
(best_precision, best_recall, best_threshold)
조건 불만족 시 (0.0, 0.0, 0.50)
"""
precisions, recalls, thresholds = precision_recall_curve(y_true, proba)
precisions, recalls = precisions[:-1], recalls[:-1]
valid_idx = np.where(recalls >= min_recall)[0]
if len(valid_idx) > 0:
best_idx = valid_idx[np.argmax(precisions[valid_idx])]
return (
float(precisions[best_idx]),
float(recalls[best_idx]),
float(thresholds[best_idx]),
)
return (0.0, 0.0, 0.50)
# ──────────────────────────────────────────────
@@ -89,20 +122,32 @@ def _walk_forward_cv(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
source: np.ndarray,
params: dict,
n_splits: int,
train_ratio: float,
min_recall: float = 0.35,
trial: "optuna.Trial | None" = None,
) -> tuple[float, list[float]]:
) -> tuple[float, dict]:
"""
Walk-Forward 교차검증으로 평균 AUC를 반환한다.
Walk-Forward 교차검증으로 precision 기반 복합 점수를 반환한다.
Score = mean_precision + mean_auc * 0.001 (AUC는 tiebreaker)
trial이 제공되면 각 폴드 후 Optuna에 중간 값을 보고하여 Pruning을 활성화한다.
Returns:
(mean_score, details) where details contains per-fold metrics.
"""
n = len(X)
step = max(1, int(n * (1 - train_ratio) / n_splits))
train_end_start = int(n * train_ratio)
fold_aucs: list[float] = []
fold_precisions: list[float] = []
fold_recalls: list[float] = []
fold_thresholds: list[float] = []
fold_n_pos: list[int] = []
scores_so_far: list[float] = []
for fold_idx in range(n_splits):
tr_end = train_end_start + fold_idx * step
@@ -113,16 +158,18 @@ def _walk_forward_cv(
X_tr, y_tr, w_tr = X[:tr_end], y[:tr_end], w[:tr_end]
X_val, y_val = X[tr_end:val_end], y[tr_end:val_end]
# 클래스 불균형 처리: 언더샘플링 (시간 순서 유지)
pos_idx = np.where(y_tr == 1)[0]
neg_idx = np.where(y_tr == 0)[0]
if len(neg_idx) > len(pos_idx) and len(pos_idx) > 0:
rng = np.random.default_rng(42)
neg_idx = rng.choice(neg_idx, size=len(pos_idx), replace=False)
bal_idx = np.sort(np.concatenate([pos_idx, neg_idx]))
# 계층적 샘플링: signal 전수 유지, HOLD negative만 양성 수 만큼
source_tr = source[:tr_end]
bal_idx = stratified_undersample(y_tr, source_tr, seed=42)
n_pos = int(y_val.sum())
if len(bal_idx) < 20 or len(np.unique(y_val)) < 2:
fold_aucs.append(0.5)
fold_precisions.append(0.0)
fold_recalls.append(0.0)
fold_thresholds.append(0.50)
fold_n_pos.append(n_pos)
continue
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
@@ -134,14 +181,47 @@ def _walk_forward_cv(
auc = roc_auc_score(y_val, proba) if len(np.unique(y_val)) > 1 else 0.5
fold_aucs.append(float(auc))
# Optuna Pruning: 중간 값 보고
if trial is not None:
trial.report(float(np.mean(fold_aucs)), step=fold_idx)
if trial.should_prune():
raise optuna.TrialPruned()
# Precision at recall-constrained threshold
if n_pos >= 3:
prec, rec, thr = _find_best_precision_at_recall(y_val, proba, min_recall)
else:
prec, rec, thr = 0.0, 0.0, 0.50
fold_precisions.append(prec)
fold_recalls.append(rec)
fold_thresholds.append(thr)
fold_n_pos.append(n_pos)
# Pruning: 양성 충분한 fold의 score만 보고
score = prec + auc * 0.001
scores_so_far.append(score)
if trial is not None and n_pos >= 3:
valid_scores = [s for s, np_ in zip(scores_so_far, fold_n_pos) if np_ >= 3]
if valid_scores:
trial.report(float(np.mean(valid_scores)), step=fold_idx)
if trial.should_prune():
raise optuna.TrialPruned()
# 양성 충분한 fold만으로 precision 평균 계산
valid_precs = [p for p, np_ in zip(fold_precisions, fold_n_pos) if np_ >= 3]
mean_auc = float(np.mean(fold_aucs)) if fold_aucs else 0.5
return mean_auc, fold_aucs
mean_prec = float(np.mean(valid_precs)) if valid_precs else 0.0
valid_recs = [r for r, np_ in zip(fold_recalls, fold_n_pos) if np_ >= 3]
mean_rec = float(np.mean(valid_recs)) if valid_recs else 0.0
mean_score = mean_prec + mean_auc * 0.001
details = {
"fold_aucs": fold_aucs,
"fold_precisions": fold_precisions,
"fold_recalls": fold_recalls,
"fold_thresholds": fold_thresholds,
"fold_n_pos": fold_n_pos,
"mean_auc": mean_auc,
"mean_precision": mean_prec,
"mean_recall": mean_rec,
}
return mean_score, details
# ──────────────────────────────────────────────
@@ -152,8 +232,10 @@ def make_objective(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
source: np.ndarray,
n_splits: int,
train_ratio: float,
min_recall: float = 0.35,
):
"""클로저로 데이터셋을 캡처한 목적 함수를 반환한다."""
@@ -191,33 +273,43 @@ def make_objective(
"reg_lambda": reg_lambda,
}
mean_auc, fold_aucs = _walk_forward_cv(
X, y, w_scaled, params,
mean_score, details = _walk_forward_cv(
X, y, w_scaled, source, params,
n_splits=n_splits,
train_ratio=train_ratio,
min_recall=min_recall,
trial=trial,
)
# 폴드별 AUC를 user_attrs에 저장 (결과 리포트용)
trial.set_user_attr("fold_aucs", fold_aucs)
# 폴드별 상세 메트릭을 user_attrs에 저장 (결과 리포트용)
trial.set_user_attr("fold_aucs", details["fold_aucs"])
trial.set_user_attr("fold_precisions", details["fold_precisions"])
trial.set_user_attr("fold_recalls", details["fold_recalls"])
trial.set_user_attr("fold_thresholds", details["fold_thresholds"])
trial.set_user_attr("fold_n_pos", details["fold_n_pos"])
trial.set_user_attr("mean_auc", details["mean_auc"])
trial.set_user_attr("mean_precision", details["mean_precision"])
trial.set_user_attr("mean_recall", details["mean_recall"])
return mean_auc
return mean_score
return objective
# ──────────────────────────────────────────────
# 베이스라인 AUC 측정 (현재 고정 파라미터)
# 베이스라인 측정 (현재 고정 파라미터)
# ──────────────────────────────────────────────
def measure_baseline(
X: np.ndarray,
y: np.ndarray,
w: np.ndarray,
source: np.ndarray,
n_splits: int,
train_ratio: float,
) -> tuple[float, list[float]]:
"""현재 실전 파라미터(active 파일 또는 하드코딩 기본값)로 베이스라인 AUC를 측정한다."""
min_recall: float = 0.35,
) -> tuple[float, dict]:
"""현재 실전 파라미터(active 파일 또는 하드코딩 기본값)로 베이스라인을 측정한다."""
active_path = Path("models/active_lgbm_params.json")
if active_path.exists():
@@ -241,7 +333,11 @@ def measure_baseline(
}
print("베이스라인 측정 중 (active 파일 없음 → 코드 내 기본 파라미터)...")
return _walk_forward_cv(X, y, w, baseline_params, n_splits=n_splits, train_ratio=train_ratio)
return _walk_forward_cv(
X, y, w, source, baseline_params,
n_splits=n_splits, train_ratio=train_ratio,
min_recall=min_recall,
)
# ──────────────────────────────────────────────
@@ -250,17 +346,24 @@ def measure_baseline(
def print_report(
study: optuna.Study,
baseline_auc: float,
baseline_folds: list[float],
baseline_score: float,
baseline_details: dict,
elapsed_sec: float,
output_path: Path,
min_recall: float,
) -> None:
"""콘솔에 최종 리포트를 출력한다."""
best = study.best_trial
best_auc = best.value
best_folds = best.user_attrs.get("fold_aucs", [])
improvement = best_auc - baseline_auc
improvement_pct = (improvement / baseline_auc * 100) if baseline_auc > 0 else 0.0
best_score = best.value
best_prec = best.user_attrs.get("mean_precision", 0.0)
best_auc = best.user_attrs.get("mean_auc", 0.0)
best_rec = best.user_attrs.get("mean_recall", 0.0)
baseline_prec = baseline_details.get("mean_precision", 0.0)
baseline_auc = baseline_details.get("mean_auc", 0.0)
prec_improvement = best_prec - baseline_prec
prec_improvement_pct = (prec_improvement / baseline_prec * 100) if baseline_prec > 0 else 0.0
elapsed_min = int(elapsed_sec // 60)
elapsed_s = int(elapsed_sec % 60)
@@ -276,11 +379,15 @@ def print_report(
f"(완료={len(completed)}, 조기종료={len(pruned)}) | "
f"소요: {elapsed_min}{elapsed_s}")
print(sep)
print(f" Best AUC : {best_auc:.4f} (Trial #{best.number})")
if baseline_auc > 0:
sign = "+" if improvement >= 0 else ""
print(f" Baseline : {baseline_auc:.4f} (현재 train_model.py 고정값)")
print(f" 개선폭 : {sign}{improvement:.4f} ({sign}{improvement_pct:.1f}%)")
print(f" 최적화 지표: Precision (recall >= {min_recall} 제약)")
print(f" Best Prec : {best_prec:.4f} (Trial #{best.number})")
print(f" Best AUC : {best_auc:.4f}")
print(f" Best Recall: {best_rec:.4f}")
if baseline_score > 0:
sign = "+" if prec_improvement >= 0 else ""
print(dash)
print(f" Baseline : Prec={baseline_prec:.4f}, AUC={baseline_auc:.4f}")
print(f" 개선폭 : Precision {sign}{prec_improvement:.4f} ({sign}{prec_improvement_pct:.1f}%)")
print(dash)
print(" Best Parameters:")
for k, v in best.params.items():
@@ -289,19 +396,42 @@ def print_report(
else:
print(f" {k:<22}: {v}")
print(dash)
print(" Walk-Forward 폴드별 AUC (Best Trial):")
for i, auc in enumerate(best_folds, 1):
print(f" 폴드 {i}: {auc:.4f}")
if best_folds:
arr = np.array(best_folds)
print(f" 평균: {arr.mean():.4f} ± {arr.std():.4f}")
if baseline_folds:
# 폴드별 상세
fold_aucs = best.user_attrs.get("fold_aucs", [])
fold_precs = best.user_attrs.get("fold_precisions", [])
fold_recs = best.user_attrs.get("fold_recalls", [])
fold_thrs = best.user_attrs.get("fold_thresholds", [])
fold_npos = best.user_attrs.get("fold_n_pos", [])
print(" Walk-Forward 폴드별 상세 (Best Trial):")
for i, (auc, prec, rec, thr, npos) in enumerate(
zip(fold_aucs, fold_precs, fold_recs, fold_thrs, fold_npos), 1
):
print(f" 폴드 {i}: AUC={auc:.4f} Prec={prec:.3f} Rec={rec:.3f} Thr={thr:.3f} (양성={npos})")
if fold_precs:
valid_precs = [p for p, np_ in zip(fold_precs, fold_npos) if np_ >= 3]
if valid_precs:
arr_p = np.array(valid_precs)
print(f" 평균 Precision: {arr_p.mean():.4f} ± {arr_p.std():.4f}")
if fold_aucs:
arr_a = np.array(fold_aucs)
print(f" 평균 AUC: {arr_a.mean():.4f} ± {arr_a.std():.4f}")
# 베이스라인 폴드별
bl_folds = baseline_details.get("fold_aucs", [])
bl_precs = baseline_details.get("fold_precisions", [])
bl_recs = baseline_details.get("fold_recalls", [])
bl_thrs = baseline_details.get("fold_thresholds", [])
bl_npos = baseline_details.get("fold_n_pos", [])
if bl_folds:
print(dash)
print(" Baseline 폴드별 AUC:")
for i, auc in enumerate(baseline_folds, 1):
print(f" 폴드 {i}: {auc:.4f}")
arr = np.array(baseline_folds)
print(f" 평균: {arr.mean():.4f} ± {arr.std():.4f}")
print(" Baseline 폴드별 상세:")
for i, (auc, prec, rec, thr, npos) in enumerate(
zip(bl_folds, bl_precs, bl_recs, bl_thrs, bl_npos), 1
):
print(f" 폴드 {i}: AUC={auc:.4f} Prec={prec:.3f} Rec={rec:.3f} Thr={thr:.3f} (양성={npos})")
print(dash)
print(f" 결과 저장: {output_path}")
print(f" 다음 단계: python scripts/train_model.py (파라미터 수동 반영 후)")
@@ -310,10 +440,11 @@ def print_report(
def save_results(
study: optuna.Study,
baseline_auc: float,
baseline_folds: list[float],
baseline_score: float,
baseline_details: dict,
elapsed_sec: float,
data_path: str,
min_recall: float,
) -> Path:
"""결과를 JSON 파일로 저장하고 경로를 반환한다."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -327,8 +458,12 @@ def save_results(
if t.state == optuna.trial.TrialState.COMPLETE:
all_trials.append({
"number": t.number,
"auc": round(t.value, 6),
"score": round(t.value, 6),
"auc": round(t.user_attrs.get("mean_auc", 0.0), 6),
"precision": round(t.user_attrs.get("mean_precision", 0.0), 6),
"recall": round(t.user_attrs.get("mean_recall", 0.0), 6),
"fold_aucs": [round(a, 6) for a in t.user_attrs.get("fold_aucs", [])],
"fold_precisions": [round(p, 6) for p in t.user_attrs.get("fold_precisions", [])],
"params": {
k: (round(v, 6) if isinstance(v, float) else v)
for k, v in t.params.items()
@@ -336,19 +471,33 @@ def save_results(
})
result = {
"timestamp": datetime.now().isoformat(),
"data_path": data_path,
"n_trials_total": len(study.trials),
"n_trials_complete": len(all_trials),
"elapsed_sec": round(elapsed_sec, 1),
"timestamp": datetime.now().isoformat(),
"data_path": data_path,
"min_recall_constraint": min_recall,
"n_trials_total": len(study.trials),
"n_trials_complete": len(all_trials),
"elapsed_sec": round(elapsed_sec, 1),
"baseline": {
"auc": round(baseline_auc, 6),
"fold_aucs": [round(a, 6) for a in baseline_folds],
"score": round(baseline_score, 6),
"auc": round(baseline_details.get("mean_auc", 0.0), 6),
"precision": round(baseline_details.get("mean_precision", 0.0), 6),
"recall": round(baseline_details.get("mean_recall", 0.0), 6),
"fold_aucs": [round(a, 6) for a in baseline_details.get("fold_aucs", [])],
"fold_precisions": [round(p, 6) for p in baseline_details.get("fold_precisions", [])],
"fold_recalls": [round(r, 6) for r in baseline_details.get("fold_recalls", [])],
"fold_thresholds": [round(t, 6) for t in baseline_details.get("fold_thresholds", [])],
},
"best_trial": {
"number": best.number,
"auc": round(best.value, 6),
"fold_aucs": [round(a, 6) for a in best.user_attrs.get("fold_aucs", [])],
"number": best.number,
"score": round(best.value, 6),
"auc": round(best.user_attrs.get("mean_auc", 0.0), 6),
"precision": round(best.user_attrs.get("mean_precision", 0.0), 6),
"recall": round(best.user_attrs.get("mean_recall", 0.0), 6),
"fold_aucs": [round(a, 6) for a in best.user_attrs.get("fold_aucs", [])],
"fold_precisions": [round(p, 6) for p in best.user_attrs.get("fold_precisions", [])],
"fold_recalls": [round(r, 6) for r in best.user_attrs.get("fold_recalls", [])],
"fold_thresholds": [round(t, 6) for t in best.user_attrs.get("fold_thresholds", [])],
"fold_n_pos": best.user_attrs.get("fold_n_pos", []),
"params": {
k: (round(v, 6) if isinstance(v, float) else v)
for k, v in best.params.items()
@@ -373,37 +522,49 @@ def main():
parser.add_argument("--trials", type=int, default=50, help="Optuna trial 수 (기본: 50)")
parser.add_argument("--folds", type=int, default=5, help="Walk-Forward 폴드 수 (기본: 5)")
parser.add_argument("--train-ratio", type=float, default=0.6, help="학습 구간 비율 (기본: 0.6)")
parser.add_argument("--min-recall", type=float, default=0.35, help="최소 재현율 제약 (기본: 0.35)")
parser.add_argument("--no-baseline", action="store_true", help="베이스라인 측정 건너뜀")
args = parser.parse_args()
# 1. 데이터셋 로드 (1회)
X, y, w = load_dataset(args.data)
X, y, w, source = load_dataset(args.data)
# 2. 베이스라인 측정
if args.no_baseline:
baseline_auc, baseline_folds = 0.0, []
baseline_score, baseline_details = 0.0, {}
print("베이스라인 측정 건너뜀 (--no-baseline)\n")
else:
baseline_auc, baseline_folds = measure_baseline(X, y, w, args.folds, args.train_ratio)
baseline_score, baseline_details = measure_baseline(
X, y, w, source, args.folds, args.train_ratio, args.min_recall,
)
bl_prec = baseline_details.get("mean_precision", 0.0)
bl_auc = baseline_details.get("mean_auc", 0.0)
bl_rec = baseline_details.get("mean_recall", 0.0)
print(
f"베이스라인 AUC: {baseline_auc:.4f} "
f"(폴드별: {[round(a, 4) for a in baseline_folds]})\n"
f"베이스라인: Prec={bl_prec:.4f}, AUC={bl_auc:.4f}, Recall={bl_rec:.4f} "
f"(recall >= {args.min_recall} 제약)\n"
)
# 3. Optuna study 실행
optuna.logging.set_verbosity(optuna.logging.WARNING)
sampler = TPESampler(seed=42)
pruner = MedianPruner(n_startup_trials=5, n_warmup_steps=2)
pruner = MedianPruner(n_startup_trials=5, n_warmup_steps=3)
study = optuna.create_study(
direction="maximize",
sampler=sampler,
pruner=pruner,
study_name="lgbm_wf_auc",
study_name="lgbm_wf_precision",
)
objective = make_objective(X, y, w, n_splits=args.folds, train_ratio=args.train_ratio)
objective = make_objective(
X, y, w, source,
n_splits=args.folds,
train_ratio=args.train_ratio,
min_recall=args.min_recall,
)
print(f"Optuna 탐색 시작: {args.trials} trials, {args.folds}폴드 Walk-Forward")
print(f"최적화 지표: Precision (recall >= {args.min_recall} 제약)")
print("(trial 완료마다 진행 상황 출력)\n")
start_time = time.time()
@@ -411,12 +572,13 @@ def main():
def _progress_callback(study: optuna.Study, trial: optuna.trial.FrozenTrial) -> None:
if trial.state == optuna.trial.TrialState.COMPLETE:
best_so_far = study.best_value
leaves = trial.params.get("num_leaves", "?")
depth = trial.params.get("max_depth", "?")
prec = trial.user_attrs.get("mean_precision", 0.0)
auc = trial.user_attrs.get("mean_auc", 0.0)
print(
f" Trial #{trial.number:3d} | AUC={trial.value:.4f} "
f" Trial #{trial.number:3d} | Prec={prec:.4f} AUC={auc:.4f} "
f"| Best={best_so_far:.4f} "
f"| leaves={leaves} depth={depth}"
f"| leaves={trial.params.get('num_leaves', '?')} "
f"depth={trial.params.get('max_depth', '?')}"
)
elif trial.state == optuna.trial.TrialState.PRUNED:
print(f" Trial #{trial.number:3d} | PRUNED (조기 종료)")
@@ -431,21 +593,32 @@ def main():
elapsed = time.time() - start_time
# 4. 결과 저장 및 출력
output_path = save_results(study, baseline_auc, baseline_folds, elapsed, args.data)
print_report(study, baseline_auc, baseline_folds, elapsed, output_path)
output_path = save_results(
study, baseline_score, baseline_details, elapsed, args.data, args.min_recall,
)
print_report(
study, baseline_score, baseline_details, elapsed, output_path, args.min_recall,
)
# 5. 성능 개선 시 active 파일 자동 갱신
import shutil
active_path = Path("models/active_lgbm_params.json")
if not args.no_baseline and study.best_value > baseline_auc:
if not args.no_baseline and study.best_value > baseline_score:
shutil.copy(output_path, active_path)
improvement = study.best_value - baseline_auc
print(f"[MLOps] AUC +{improvement:.4f} 개선 → {active_path} 자동 갱신 완료")
best_prec = study.best_trial.user_attrs.get("mean_precision", 0.0)
bl_prec = baseline_details.get("mean_precision", 0.0)
improvement = best_prec - bl_prec
print(f"[MLOps] Precision +{improvement:.4f} 개선 → {active_path} 자동 갱신 완료")
print(f"[MLOps] 다음 train_model.py 실행 시 새 파라미터가 자동 적용됩니다.\n")
elif args.no_baseline:
print("[MLOps] --no-baseline 모드: 성능 비교 없이 active 파일 유지\n")
else:
print(f"[MLOps] 성능 개선 없음 (Best={study.best_value:.4f} ≤ Baseline={baseline_auc:.4f}) → active 파일 유지\n")
best_prec = study.best_trial.user_attrs.get("mean_precision", 0.0)
bl_prec = baseline_details.get("mean_precision", 0.0)
print(
f"[MLOps] 성능 개선 없음 (Prec={best_prec:.4f} ≤ Baseline={bl_prec:.4f}) "
f"→ active 파일 유지\n"
)
if __name__ == "__main__":

View File

@@ -362,6 +362,7 @@ def generate_dataset_vectorized(
btc_df: pd.DataFrame | None = None,
eth_df: pd.DataFrame | None = None,
time_weight_decay: float = 0.0,
negative_ratio: int = 0,
) -> pd.DataFrame:
"""
전체 시계열을 1회 계산해 학습 데이터셋을 생성한다.
@@ -372,6 +373,9 @@ def generate_dataset_vectorized(
양수일수록 최신 샘플에 더 높은 가중치를 부여한다.
예) 2.0 → 최신 샘플이 가장 오래된 샘플보다 e^2 ≈ 7.4배 높은 가중치.
결과 DataFrame에 'sample_weight' 컬럼으로 포함된다.
negative_ratio: 시그널 샘플 대비 HOLD negative 샘플 비율.
0이면 기존 동작 (시그널만). 5면 시그널의 5배만큼 HOLD 샘플 추가.
"""
print(" [1/3] 전체 시계열 지표 계산 (1회)...")
d = _calc_indicators(df)
@@ -381,41 +385,107 @@ def generate_dataset_vectorized(
feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
# 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만
# oi_change/funding_rate는 선택적 피처(컬럼 없으면 전체 nan)이므로 NaN 체크에서 제외
OPTIONAL_COLS = {"oi_change", "funding_rate"}
available_cols_for_nan_check = [
c for c in FEATURE_COLS
if c in feat_all.columns and c not in OPTIONAL_COLS
]
valid_rows = (
(signal_arr != "HOLD") &
base_valid = (
(~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) &
(np.arange(len(d)) >= WARMUP) &
(np.arange(len(d)) < len(d) - LOOKAHEAD)
)
sig_idx = np.where(valid_rows)[0]
# --- 시그널 캔들 (기존 로직) ---
sig_valid = base_valid & (signal_arr != "HOLD")
sig_idx = np.where(sig_valid)[0]
print(f" 신호 발생 인덱스: {len(sig_idx):,}")
print(" [3/3] 레이블 계산...")
labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx)
final_idx = sig_idx[valid_mask]
# btc_df/eth_df 제공 여부에 따라 실제 존재하는 피처 컬럼만 선택
final_sig_idx = sig_idx[valid_mask]
available_feature_cols = [c for c in FEATURE_COLS if c in feat_all.columns]
feat_final = feat_all.iloc[final_idx][available_feature_cols].copy()
feat_final["label"] = labels
feat_signal = feat_all.iloc[final_sig_idx][available_feature_cols].copy()
feat_signal["label"] = labels
feat_signal["source"] = "signal"
# 시간 가중치: 오래된 샘플 → 낮은 가중치, 최신 샘플 → 높은 가중치
# --- HOLD negative 캔들 ---
if negative_ratio > 0 and len(final_sig_idx) > 0:
hold_valid = base_valid & (signal_arr == "HOLD")
hold_candidates = np.where(hold_valid)[0]
n_neg = min(len(hold_candidates), len(final_sig_idx) * negative_ratio)
if n_neg > 0:
rng = np.random.default_rng(42)
hold_idx = rng.choice(hold_candidates, size=n_neg, replace=False)
hold_idx = np.sort(hold_idx)
feat_hold = feat_all.iloc[hold_idx][available_feature_cols].copy()
feat_hold["label"] = 0
feat_hold["source"] = "hold_negative"
# HOLD 캔들은 시그널이 없으므로 side를 랜덤 할당 (50:50)
sides = rng.integers(0, 2, size=len(feat_hold)).astype(np.float32)
feat_hold["side"] = sides
print(f" HOLD negative 추가: {len(feat_hold):,}"
f"(비율 1:{negative_ratio})")
feat_final = pd.concat([feat_signal, feat_hold], ignore_index=True)
# 시간 순서 복원 (원본 인덱스 기반 정렬)
original_order = np.concatenate([final_sig_idx, hold_idx])
sort_order = np.argsort(original_order)
feat_final = feat_final.iloc[sort_order].reset_index(drop=True)
else:
feat_final = feat_signal.reset_index(drop=True)
else:
feat_final = feat_signal.reset_index(drop=True)
# 시간 가중치
n = len(feat_final)
if time_weight_decay > 0 and n > 1:
weights = np.exp(time_weight_decay * np.linspace(0.0, 1.0, n)).astype(np.float32)
weights /= weights.mean() # 평균 1로 정규화해 학습률 스케일 유지
weights /= weights.mean()
print(f" 시간 가중치 적용 (decay={time_weight_decay}): "
f"min={weights.min():.3f}, max={weights.max():.3f}")
else:
weights = np.ones(n, dtype=np.float32)
feat_final = feat_final.reset_index(drop=True)
feat_final["sample_weight"] = weights
total_sig = (feat_final["source"] == "signal").sum() if "source" in feat_final.columns else len(feat_final)
total_hold = (feat_final["source"] == "hold_negative").sum() if "source" in feat_final.columns else 0
print(f" 최종 데이터셋: {n:,}개 (시그널={total_sig:,}, HOLD={total_hold:,})")
return feat_final
def stratified_undersample(
y: np.ndarray,
source: np.ndarray,
seed: int = 42,
) -> np.ndarray:
"""Signal 샘플 전수 유지 + HOLD negative만 양성 수 만큼 샘플링.
Args:
y: 라벨 배열 (0 or 1)
source: 소스 배열 ("signal" or "hold_negative")
seed: 랜덤 시드
Returns:
정렬된 인덱스 배열 (학습에 사용할 행 인덱스)
"""
pos_idx = np.where(y == 1)[0] # Signal Win
sig_neg_idx = np.where((y == 0) & (source == "signal"))[0] # Signal Loss
hold_neg_idx = np.where(source == "hold_negative")[0] # HOLD negative
# HOLD negative에서 양성 수 만큼만 샘플링
n_hold = min(len(hold_neg_idx), len(pos_idx))
rng = np.random.default_rng(seed)
if n_hold > 0:
hold_sampled = rng.choice(hold_neg_idx, size=n_hold, replace=False)
else:
hold_sampled = np.array([], dtype=np.intp)
return np.sort(np.concatenate([pos_idx, sig_neg_idx, hold_sampled]))

View File

@@ -17,6 +17,7 @@ def config():
"RISK_PER_TRADE": "0.02",
"NOTION_TOKEN": "secret_test",
"NOTION_DATABASE_ID": "db_test",
"DISCORD_WEBHOOK_URL": "",
})
return Config()

View File

@@ -1,42 +0,0 @@
import pytest
from unittest.mock import MagicMock, patch
from src.database import TradeRepository
@pytest.fixture
def mock_repo():
with patch("src.database.Client") as mock_client_cls:
mock_client = MagicMock()
mock_client_cls.return_value = mock_client
repo = TradeRepository(token="secret_test", database_id="db_test")
repo.client = mock_client
yield repo
def test_save_trade(mock_repo):
mock_repo.client.pages.create.return_value = {
"id": "abc123",
"properties": {},
}
result = mock_repo.save_trade(
symbol="XRPUSDT",
side="LONG",
entry_price=0.5,
quantity=400.0,
leverage=10,
signal_data={"rsi": 32, "macd_hist": 0.001},
)
assert result["id"] == "abc123"
def test_close_trade(mock_repo):
mock_repo.client.pages.update.return_value = {
"id": "abc123",
"properties": {
"Status": {"select": {"name": "CLOSED"}},
},
}
result = mock_repo.close_trade(
trade_id="abc123", exit_price=0.55, pnl=20.0
)
assert result["id"] == "abc123"

View File

@@ -70,7 +70,7 @@ def test_generate_dataset_vectorized_with_btc_eth_has_21_feature_cols():
result = generate_dataset_vectorized(xrp_df, btc_df=btc_df, eth_df=eth_df)
if not result.empty:
assert set(FEATURE_COLS).issubset(set(result.columns))
assert len(result.columns) == len(FEATURE_COLS) + 1 # +1 for label
assert "label" in result.columns
def test_matches_original_generate_dataset(sample_df):
@@ -208,3 +208,61 @@ def test_rs_zero_denominator():
"xrp_btc_rs에 inf가 있으면 안 됨"
assert not feat["xrp_btc_rs"].isna().all(), \
"xrp_btc_rs가 전부 nan이면 안 됨"
@pytest.fixture
def signal_producing_df():
"""시그널이 반드시 발생하는 더미 데이터. 높은 변동성 + 거래량 급증."""
rng = np.random.default_rng(7)
n = 800
trend = np.linspace(1.5, 3.0, n)
noise = np.cumsum(rng.normal(0, 0.04, n))
close = np.clip(trend + noise, 0.01, None)
high = close * (1 + rng.uniform(0, 0.015, n))
low = close * (1 - rng.uniform(0, 0.015, n))
volume = rng.uniform(1e6, 3e6, n)
volume[::30] *= 3.0 # 30봉마다 거래량 급증
return pd.DataFrame({
"open": close, "high": high, "low": low,
"close": close, "volume": volume,
})
def test_hold_negative_labels_are_all_zero(signal_producing_df):
"""HOLD negative 샘플의 label은 전부 0이어야 한다."""
result = generate_dataset_vectorized(signal_producing_df, negative_ratio=3)
assert len(result) > 0, "시그널이 발생하지 않아 테스트 불가"
assert "source" in result.columns
hold_neg = result[result["source"] == "hold_negative"]
assert len(hold_neg) > 0, "HOLD negative 샘플이 0개"
assert (hold_neg["label"] == 0).all(), \
f"HOLD negative 중 label != 0인 샘플 존재: {hold_neg['label'].value_counts().to_dict()}"
def test_signal_samples_preserved_after_sampling(signal_producing_df):
"""계층적 샘플링 후 source='signal' 샘플이 하나도 버려지지 않아야 한다."""
result_signal_only = generate_dataset_vectorized(signal_producing_df, negative_ratio=0)
result_with_hold = generate_dataset_vectorized(signal_producing_df, negative_ratio=3)
assert len(result_signal_only) > 0, "시그널이 발생하지 않아 테스트 불가"
assert "source" in result_with_hold.columns
signal_count = (result_with_hold["source"] == "signal").sum()
assert signal_count == len(result_signal_only), \
f"Signal 샘플 손실: 원본={len(result_signal_only)}, 유지={signal_count}"
def test_stratified_undersample_preserves_signal():
"""stratified_undersample은 signal 샘플을 전수 유지해야 한다."""
from src.dataset_builder import stratified_undersample
y = np.array([1, 0, 0, 0, 0, 0, 0, 0, 1, 0])
source = np.array(["signal", "signal", "signal", "hold_negative",
"hold_negative", "hold_negative", "hold_negative",
"hold_negative", "signal", "signal"])
idx = stratified_undersample(y, source, seed=42)
# signal 인덱스: 0, 1, 2, 8, 9 → 전부 포함
signal_indices = np.where(source == "signal")[0]
for si in signal_indices:
assert si in idx, f"signal 인덱스 {si}가 누락됨"