Compare commits

...

8 Commits

13 changed files with 323 additions and 37 deletions

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ venv/
models/*.pkl
data/*.parquet
.worktrees/
.DS_Store

View File

@@ -0,0 +1,203 @@
# RS np.divide 복구 / MLX NaN-Safe 통계 저장 구현 계획
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** RS(상대강도) 계산의 epsilon 폭발 이상치를 `np.divide` 방식으로 제거하고, MLXFilter의 `self._mean`/`self._std`에 NaN이 잔류하는 근본 허점을 차단한다.
**Architecture:**
- `src/dataset_builder.py`: `xrp_btc_rs_raw` / `xrp_eth_rs_raw` 계산을 `np.divide(..., where=...)` 방식으로 교체. 분모(btc_r1, eth_r1)가 0이면 결과를 0.0으로 채워 rolling zscore 윈도우 오염을 방지한다.
- `src/mlx_filter.py`: `fit()` 내부에서 `self._mean`/`self._std`를 저장하기 전에 `nan_to_num`을 적용해 전체-NaN 컬럼(OI 초반 구간 등)이 `predict_proba` 시점까지 NaN을 전파하지 않도록 한다.
**Tech Stack:** numpy, pandas, pytest, mlx(Apple Silicon 전용 — MLX 테스트는 Mac에서만 실행)
---
### Task 1: `dataset_builder.py` — RS 계산을 `np.divide` 방식으로 교체
**Files:**
- Modify: `src/dataset_builder.py:245-246`
- Test: `tests/test_dataset_builder.py`
**배경:**
`btc_r1 = 0.0`(15분 동안 BTC 가격 변동 없음)일 때 `xrp_r1 / (btc_r1 + 1e-8)`는 최대 수백만의 이상치를 만든다. 이 이상치가 288캔들 rolling zscore 윈도우에 들어가면 나머지 287개 값이 전부 0에 가깝게 압사된다.
**Step 1: 기존 테스트 실행 (기준선 확인)**
```bash
python -m pytest tests/test_dataset_builder.py -v
```
Expected: 모든 테스트 PASS (변경 전 기준선)
**Step 2: RS 제로-분모 테스트 작성**
`tests/test_dataset_builder.py` 파일 끝에 추가:
```python
def test_rs_zero_denominator():
"""btc_r1=0일 때 RS가 inf/nan이 아닌 0.0이어야 한다 (np.divide 방식 검증)."""
import numpy as np
import pandas as pd
from src.dataset_builder import _calc_features_vectorized, _calc_signals, _calc_indicators
n = 500
np.random.seed(7)
# XRP close: 약간의 변동
xrp_close = np.cumprod(1 + np.random.randn(n) * 0.001) * 1.0
xrp_df = pd.DataFrame({
"open": xrp_close * 0.999,
"high": xrp_close * 1.005,
"low": xrp_close * 0.995,
"close": xrp_close,
"volume": np.random.rand(n) * 1000 + 500,
})
# BTC close: 완전히 고정 → btc_r1 = 0.0
btc_close = np.ones(n) * 50000.0
btc_df = pd.DataFrame({
"open": btc_close,
"high": btc_close,
"low": btc_close,
"close": btc_close,
"volume": np.random.rand(n) * 1000 + 500,
})
from src.dataset_builder import generate_dataset_vectorized
result = generate_dataset_vectorized(xrp_df, btc_df=btc_df)
if result.empty:
pytest.skip("신호 없음")
assert "xrp_btc_rs" in result.columns, "xrp_btc_rs 컬럼이 있어야 함"
assert not result["xrp_btc_rs"].isin([np.inf, -np.inf]).any(), \
"xrp_btc_rs에 inf가 있으면 안 됨"
assert not result["xrp_btc_rs"].isna().all(), \
"xrp_btc_rs가 전부 nan이면 안 됨"
```
**Step 3: 테스트 실행 (FAIL 확인)**
```bash
python -m pytest tests/test_dataset_builder.py::test_rs_zero_denominator -v
```
Expected: FAIL — `xrp_btc_rs에 inf가 있으면 안 됨` (현재 epsilon 방식은 inf 대신 수백만 이상치를 만들어 rolling zscore 후 nan이 될 수 있음)
> 참고: 현재 코드는 inf를 직접 만들지 않을 수도 있다. 하지만 rolling zscore 후 nan이 생기거나 이상치가 남아있는지 확인하는 것이 목적이다. PASS가 나오더라도 Step 4를 진행한다.
**Step 4: `dataset_builder.py` 245~246줄 수정**
`src/dataset_builder.py`의 아래 두 줄을:
```python
xrp_btc_rs_raw = (xrp_r1 / (btc_r1 + 1e-8)).astype(np.float32)
xrp_eth_rs_raw = (xrp_r1 / (eth_r1 + 1e-8)).astype(np.float32)
```
다음으로 교체:
```python
xrp_btc_rs_raw = np.divide(
xrp_r1, btc_r1,
out=np.zeros_like(xrp_r1),
where=(btc_r1 != 0),
).astype(np.float32)
xrp_eth_rs_raw = np.divide(
xrp_r1, eth_r1,
out=np.zeros_like(xrp_r1),
where=(eth_r1 != 0),
).astype(np.float32)
```
**Step 5: 전체 테스트 실행 (PASS 확인)**
```bash
python -m pytest tests/test_dataset_builder.py -v
```
Expected: 모든 테스트 PASS
**Step 6: 커밋**
```bash
git add src/dataset_builder.py tests/test_dataset_builder.py
git commit -m "fix: RS 계산을 np.divide(where=) 방식으로 교체 — epsilon 이상치 폭발 차단"
```
---
### Task 2: `mlx_filter.py` — `self._mean`/`self._std` 저장 전 `nan_to_num` 적용
**Files:**
- Modify: `src/mlx_filter.py:145-146`
- Test: `tests/test_mlx_filter.py` (기존 `test_fit_with_nan_features` 활용)
**배경:**
현재 코드는 `self._mean = np.nanmean(X_np, axis=0)`으로 저장한다. 전체가 NaN인 컬럼(Walk-Forward 초반 11개월의 OI 데이터)이 있으면 `np.nanmean`은 해당 컬럼의 평균으로 NaN을 반환한다. 이 NaN이 `self._mean`에 저장되면 `predict_proba` 시점에 `(X_np - self._mean)`이 NaN이 되어 OI 데이터를 영원히 활용하지 못한다.
**Step 1: 기존 테스트 실행 (기준선 확인)**
```bash
python -m pytest tests/test_mlx_filter.py -v
```
Expected: 모든 테스트 PASS (MLX 없는 환경에서는 전체 SKIP)
**Step 2: `mlx_filter.py` 145~146줄 수정**
`src/mlx_filter.py`의 아래 두 줄을:
```python
self._mean = np.nanmean(X_np, axis=0)
self._std = np.nanstd(X_np, axis=0) + 1e-8
```
다음으로 교체:
```python
mean_vals = np.nanmean(X_np, axis=0)
self._mean = np.nan_to_num(mean_vals, nan=0.0) # 전체-NaN 컬럼 → 평균 0.0
std_vals = np.nanstd(X_np, axis=0)
self._std = np.nan_to_num(std_vals, nan=1.0) + 1e-8 # 전체-NaN 컬럼 → std 1.0
```
**Step 3: 테스트 실행 (PASS 확인)**
```bash
python -m pytest tests/test_mlx_filter.py::test_fit_with_nan_features -v
```
Expected: PASS (MLX 없는 환경에서는 SKIP)
**Step 4: 전체 테스트 실행**
```bash
python -m pytest tests/test_mlx_filter.py -v
```
Expected: 모든 테스트 PASS (또는 SKIP)
**Step 5: 커밋**
```bash
git add src/mlx_filter.py
git commit -m "fix: MLXFilter self._mean/std 저장 전 nan_to_num 적용 — 전체-NaN 컬럼 predict_proba 오염 차단"
```
---
### Task 3: 전체 테스트 통과 확인
**Step 1: 전체 테스트 실행**
```bash
python -m pytest tests/ -v --tb=short 2>&1 | tail -40
```
Expected: 모든 테스트 PASS (MLX 관련은 SKIP 허용)
**Step 2: 최종 커밋 (필요 시)**
```bash
git add -A
git commit -m "chore: RS epsilon 폭발 차단 + MLX NaN-Safe 통계 저장 통합"
```

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -216,5 +216,29 @@
"train_sec": 0.1,
"time_weight_decay": 2.0,
"model_path": "models/mlx_filter.weights"
},
{
"date": "2026-03-01T23:59:27.956019",
"backend": "mlx",
"auc": 0.5595,
"best_threshold": 0.9538,
"best_precision": 0.462,
"best_recall": 0.171,
"samples": 533,
"train_sec": 0.2,
"time_weight_decay": 2.0,
"model_path": "models/mlx_filter.weights"
},
{
"date": "2026-03-02T00:40:15.931055",
"backend": "mlx",
"auc": 0.5829,
"best_threshold": 0.9609,
"best_precision": 0.6,
"best_recall": 0.171,
"samples": 534,
"train_sec": 0.2,
"time_weight_decay": 2.0,
"model_path": "models/mlx_filter.weights"
}
]

View File

@@ -242,8 +242,16 @@ def _calc_features_vectorized(
eth_r5 = _align(eth_ret_5, n).astype(np.float32)
xrp_r1 = ret_1.astype(np.float32)
xrp_btc_rs_raw = (xrp_r1 / (btc_r1 + 1e-8)).astype(np.float32)
xrp_eth_rs_raw = (xrp_r1 / (eth_r1 + 1e-8)).astype(np.float32)
xrp_btc_rs_raw = np.divide(
xrp_r1, btc_r1,
out=np.zeros_like(xrp_r1),
where=(btc_r1 != 0),
).astype(np.float32)
xrp_eth_rs_raw = np.divide(
xrp_r1, eth_r1,
out=np.zeros_like(xrp_r1),
where=(eth_r1 != 0),
).astype(np.float32)
extra = pd.DataFrame({
"btc_ret_1": _rolling_zscore(btc_r1),

View File

@@ -142,8 +142,10 @@ class MLXFilter:
# nan-safe 정규화: nanmean/nanstd로 통계 계산 후 nan → 0.0 대치
# (z-score 후 0.0 = 평균값, 신경망에 줄 수 있는 가장 무난한 결측 대치값)
self._mean = np.nanmean(X_np, axis=0)
self._std = np.nanstd(X_np, axis=0) + 1e-8
mean_vals = np.nanmean(X_np, axis=0)
self._mean = np.nan_to_num(mean_vals, nan=0.0) # 전체-NaN 컬럼 → 평균 0.0
std_vals = np.nanstd(X_np, axis=0)
self._std = np.nan_to_num(std_vals, nan=1.0) + 1e-8 # 전체-NaN 컬럼 → std 1.0
X_np = (X_np - self._mean) / self._std
X_np = np.nan_to_num(X_np, nan=0.0)

View File

@@ -23,6 +23,7 @@ def test_multi_symbol_stream_get_dataframe_returns_none_when_empty():
def test_multi_symbol_stream_get_dataframe_returns_df_when_full():
import pandas as pd
from src.data_stream import _MIN_CANDLES_FOR_SIGNAL
stream = MultiSymbolStream(
symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"],
interval="1m",
@@ -32,13 +33,13 @@ def test_multi_symbol_stream_get_dataframe_returns_df_when_full():
"timestamp": 1000, "open": 1.0, "high": 1.1,
"low": 0.9, "close": 1.05, "volume": 100.0, "is_closed": True,
}
for i in range(50):
for i in range(_MIN_CANDLES_FOR_SIGNAL):
c = candle.copy()
c["timestamp"] = 1000 + i
stream.buffers["xrpusdt"].append(c)
df = stream.get_dataframe("XRPUSDT")
assert df is not None
assert len(df) == 50
assert len(df) == _MIN_CANDLES_FOR_SIGNAL
@pytest.mark.asyncio

View File

@@ -160,3 +160,51 @@ def test_oi_nan_masking_with_zeros():
feat = _calc_features_vectorized(d, sig)
assert feat["oi_change"].iloc[50:].notna().any(), "실제 OI 값 구간에 유한값이 있어야 함"
def test_rs_zero_denominator():
"""btc_r1=0일 때 RS가 inf/nan이 아닌 0.0이어야 한다 (np.divide 방식 검증)."""
import numpy as np
import pandas as pd
from src.dataset_builder import _calc_features_vectorized, _calc_signals, _calc_indicators
n = 500
np.random.seed(7)
# XRP close: 약간의 변동
xrp_close = np.cumprod(1 + np.random.randn(n) * 0.001) * 1.0
xrp_df = pd.DataFrame({
"open": xrp_close * 0.999,
"high": xrp_close * 1.005,
"low": xrp_close * 0.995,
"close": xrp_close,
"volume": np.random.rand(n) * 1000 + 500,
})
# BTC close: 완전히 고정 → btc_r1 = 0.0
btc_close = np.ones(n) * 50000.0
btc_df = pd.DataFrame({
"open": btc_close,
"high": btc_close,
"low": btc_close,
"close": btc_close,
"volume": np.random.rand(n) * 1000 + 500,
})
# ETH close: 약간의 변동 (eth_df 없으면 BTC 피처 자체가 계산 안 됨)
eth_close = np.cumprod(1 + np.random.randn(n) * 0.001) * 3000.0
eth_df = pd.DataFrame({
"open": eth_close * 0.999,
"high": eth_close * 1.005,
"low": eth_close * 0.995,
"close": eth_close,
"volume": np.random.rand(n) * 1000 + 500,
})
# _calc_features_vectorized를 직접 호출해 BTC/ETH 피처를 포함한 전체 피처를 검증
d = _calc_indicators(xrp_df)
signal_arr = _calc_signals(d)
feat = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
assert "xrp_btc_rs" in feat.columns, "xrp_btc_rs 컬럼이 있어야 함"
assert not feat["xrp_btc_rs"].isin([np.inf, -np.inf]).any(), \
"xrp_btc_rs에 inf가 있으면 안 됨"
assert not feat["xrp_btc_rs"].isna().all(), \
"xrp_btc_rs가 전부 nan이면 안 됨"

View File

@@ -49,9 +49,9 @@ def test_build_features_rs_zero_when_btc_ret_zero():
features = build_features(xrp_df, "LONG", btc_df=btc_df, eth_df=eth_df)
assert features["xrp_btc_rs"] == 0.0
def test_feature_cols_has_21_items():
def test_feature_cols_has_23_items():
from src.ml_features import FEATURE_COLS
assert len(FEATURE_COLS) == 21
assert len(FEATURE_COLS) == 23
def make_df(n=100):

View File

@@ -12,13 +12,19 @@ def make_features(side="LONG") -> pd.Series:
def test_no_model_file_is_not_loaded(tmp_path):
f = MLFilter(model_path=str(tmp_path / "nonexistent.pkl"))
f = MLFilter(
onnx_path=str(tmp_path / "nonexistent.onnx"),
lgbm_path=str(tmp_path / "nonexistent.pkl"),
)
assert not f.is_model_loaded()
def test_no_model_should_enter_returns_true(tmp_path):
"""모델 없으면 항상 진입 허용 (폴백)"""
f = MLFilter(model_path=str(tmp_path / "nonexistent.pkl"))
f = MLFilter(
onnx_path=str(tmp_path / "nonexistent.onnx"),
lgbm_path=str(tmp_path / "nonexistent.pkl"),
)
features = make_features()
assert f.should_enter(features) is True
@@ -28,7 +34,7 @@ def test_should_enter_above_threshold():
f = MLFilter(threshold=0.60)
mock_model = MagicMock()
mock_model.predict_proba.return_value = np.array([[0.35, 0.65]])
f._model = mock_model
f._lgbm_model = mock_model
features = make_features()
assert f.should_enter(features) is True
@@ -38,7 +44,7 @@ def test_should_enter_below_threshold():
f = MLFilter(threshold=0.60)
mock_model = MagicMock()
mock_model.predict_proba.return_value = np.array([[0.55, 0.45]])
f._model = mock_model
f._lgbm_model = mock_model
features = make_features()
assert f.should_enter(features) is False
@@ -48,16 +54,18 @@ def test_reload_model(tmp_path):
import joblib
# 모델 파일이 없는 상태에서 시작
model_path = tmp_path / "lgbm_filter.pkl"
f = MLFilter(model_path=str(model_path))
f = MLFilter(
onnx_path=str(tmp_path / "nonexistent.onnx"),
lgbm_path=str(tmp_path / "lgbm_filter.pkl"),
)
assert not f.is_model_loaded()
# _model을 직접 주입해서 is_model_loaded가 True인지 확인
# _lgbm_model을 직접 주입해서 is_model_loaded가 True인지 확인
mock_model = MagicMock()
f._model = mock_model
f._lgbm_model = mock_model
assert f.is_model_loaded()
# reload_model 호출 시 파일이 없으면 _try_load가 _model을 변경하지 않음
# (기존 동작 유지 - 파일 없으면 None으로 초기화하지 않음)
# reload_model은 항상 _lgbm_model/_onnx_session을 초기화 후 재로드한다.
# 파일 없으면 None으로 리셋되어 폴백 상태가 된다.
f.reload_model()
assert f.is_model_loaded() # mock_model이 유지됨
assert not f.is_model_loaded() # 파일 없으므로 폴백 상태

View File

@@ -10,23 +10,11 @@ mlx = pytest.importorskip("mlx.core", reason="MLX 미설치")
def _make_X(n: int = 4) -> pd.DataFrame:
from src.ml_features import FEATURE_COLS
rng = np.random.default_rng(0)
return pd.DataFrame(
{
"rsi": rng.uniform(20, 80, n),
"macd_hist": rng.uniform(-0.1, 0.1, n),
"bb_pct": rng.uniform(0, 1, n),
"ema_align": rng.choice([-1.0, 0.0, 1.0], n),
"stoch_k": rng.uniform(0, 100, n),
"stoch_d": rng.uniform(0, 100, n),
"atr_pct": rng.uniform(0.001, 0.05, n),
"vol_ratio": rng.uniform(0.5, 3.0, n),
"ret_1": rng.uniform(-0.01, 0.01, n),
"ret_3": rng.uniform(-0.02, 0.02, n),
"ret_5": rng.uniform(-0.03, 0.03, n),
"signal_strength": rng.integers(0, 6, n).astype(float),
"side": rng.choice([0.0, 1.0], n),
}
rng.uniform(-1.0, 1.0, (n, len(FEATURE_COLS))).astype(np.float32),
columns=FEATURE_COLS,
)
@@ -41,9 +29,10 @@ def test_mlx_gpu_device():
def test_mlx_filter_predict_shape_untrained():
"""학습 전에도 predict_proba가 (N,) 형태를 반환해야 한다."""
from src.mlx_filter import MLXFilter
from src.ml_features import FEATURE_COLS
X = _make_X(4)
model = MLXFilter(input_dim=13, hidden_dim=32)
model = MLXFilter(input_dim=len(FEATURE_COLS), hidden_dim=32)
proba = model.predict_proba(X)
assert proba.shape == (4,)
assert np.all((proba >= 0.0) & (proba <= 1.0))
@@ -52,12 +41,13 @@ def test_mlx_filter_predict_shape_untrained():
def test_mlx_filter_fit_and_predict():
"""학습 후 predict_proba가 유효한 확률값을 반환해야 한다."""
from src.mlx_filter import MLXFilter
from src.ml_features import FEATURE_COLS
n = 100
X = _make_X(n)
y = pd.Series(np.random.randint(0, 2, n))
model = MLXFilter(input_dim=13, hidden_dim=32, epochs=5, batch_size=32)
model = MLXFilter(input_dim=len(FEATURE_COLS), hidden_dim=32, epochs=5, batch_size=32)
model.fit(X, y)
proba = model.predict_proba(X)
@@ -93,12 +83,13 @@ def test_fit_with_nan_features():
def test_mlx_filter_save_load(tmp_path):
"""저장 후 로드한 모델이 동일한 예측값을 반환해야 한다."""
from src.mlx_filter import MLXFilter
from src.ml_features import FEATURE_COLS
n = 50
X = _make_X(n)
y = pd.Series(np.random.randint(0, 2, n))
model = MLXFilter(input_dim=13, hidden_dim=32, epochs=3, batch_size=32)
model = MLXFilter(input_dim=len(FEATURE_COLS), hidden_dim=32, epochs=3, batch_size=32)
model.fit(X, y)
proba_before = model.predict_proba(X)