feat: OI nan 마스킹 / epsilon 통일 / 정밀도 우선 임계값 #1

Merged
gihyeon merged 5 commits from feature/oi-nan-epsilon-precision-threshold into main 2026-03-01 23:57:32 +09:00
6 changed files with 172 additions and 28 deletions

View File

@@ -118,8 +118,26 @@ def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float:
val_proba = model.predict_proba(X_val) val_proba = model.predict_proba(X_val)
auc = roc_auc_score(y_val, val_proba) auc = roc_auc_score(y_val, val_proba)
print(f"\n검증 AUC: {auc:.4f}")
print(classification_report(y_val, (val_proba >= 0.60).astype(int))) # 최적 임계값 탐색: 최소 재현율(0.15) 조건부 정밀도 최대화
from sklearn.metrics import precision_recall_curve
precisions, recalls, thresholds = precision_recall_curve(y_val, val_proba)
precisions, recalls = precisions[:-1], recalls[:-1]
MIN_RECALL = 0.15
valid_idx = np.where(recalls >= MIN_RECALL)[0]
if len(valid_idx) > 0:
best_idx = valid_idx[np.argmax(precisions[valid_idx])]
best_thr = float(thresholds[best_idx])
best_prec = float(precisions[best_idx])
best_rec = float(recalls[best_idx])
else:
best_thr, best_prec, best_rec = 0.50, 0.0, 0.0
print(f" [경고] recall >= {MIN_RECALL} 조건 만족 임계값 없음 → 기본값 0.50 사용")
print(f"\n검증 AUC: {auc:.4f} | 최적 임계값: {best_thr:.4f} "
f"(Precision={best_prec:.3f}, Recall={best_rec:.3f})")
print(classification_report(y_val, (val_proba >= best_thr).astype(int), zero_division=0))
MLX_MODEL_PATH.parent.mkdir(exist_ok=True) MLX_MODEL_PATH.parent.mkdir(exist_ok=True)
model.save(MLX_MODEL_PATH) model.save(MLX_MODEL_PATH)
@@ -133,6 +151,9 @@ def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float:
"date": datetime.now().isoformat(), "date": datetime.now().isoformat(),
"backend": "mlx", "backend": "mlx",
"auc": round(auc, 4), "auc": round(auc, 4),
"best_threshold": round(best_thr, 4),
"best_precision": round(best_prec, 3),
"best_recall": round(best_rec, 3),
"samples": len(dataset), "samples": len(dataset),
"train_sec": round(t3 - t2, 1), "train_sec": round(t3 - t2, 1),
"time_weight_decay": time_weight_decay, "time_weight_decay": time_weight_decay,

View File

@@ -233,16 +233,26 @@ def train(data_path: str, time_weight_decay: float = 2.0):
val_proba = model.predict_proba(X_val)[:, 1] val_proba = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, val_proba) auc = roc_auc_score(y_val, val_proba)
# 최적 임계값 탐색 (F1 기준)
thresholds = np.arange(0.40, 0.70, 0.05) # 최적 임계값 탐색: 최소 재현율(0.15) 조건부 정밀도 최대화
best_thr, best_f1 = 0.50, 0.0 from sklearn.metrics import precision_recall_curve
for thr in thresholds: precisions, recalls, thresholds = precision_recall_curve(y_val, val_proba)
pred = (val_proba >= thr).astype(int) # precision_recall_curve의 마지막 원소는 (1.0, 0.0)이므로 제외
from sklearn.metrics import f1_score precisions, recalls = precisions[:-1], recalls[:-1]
f1 = f1_score(y_val, pred, zero_division=0)
if f1 > best_f1: MIN_RECALL = 0.15
best_f1, best_thr = f1, thr valid_idx = np.where(recalls >= MIN_RECALL)[0]
print(f"\n검증 AUC: {auc:.4f} | 최적 임계값: {best_thr:.2f} (F1={best_f1:.3f})") if len(valid_idx) > 0:
best_idx = valid_idx[np.argmax(precisions[valid_idx])]
best_thr = float(thresholds[best_idx])
best_prec = float(precisions[best_idx])
best_rec = float(recalls[best_idx])
else:
best_thr, best_prec, best_rec = 0.50, 0.0, 0.0
print(f" [경고] recall >= {MIN_RECALL} 조건 만족 임계값 없음 → 기본값 0.50 사용")
print(f"\n검증 AUC: {auc:.4f} | 최적 임계값: {best_thr:.4f} "
f"(Precision={best_prec:.3f}, Recall={best_rec:.3f})")
print(classification_report(y_val, (val_proba >= best_thr).astype(int), zero_division=0)) print(classification_report(y_val, (val_proba >= best_thr).astype(int), zero_division=0))
if MODEL_PATH.exists(): if MODEL_PATH.exists():
@@ -262,6 +272,9 @@ def train(data_path: str, time_weight_decay: float = 2.0):
"date": datetime.now().isoformat(), "date": datetime.now().isoformat(),
"backend": "lgbm", "backend": "lgbm",
"auc": round(auc, 4), "auc": round(auc, 4),
"best_threshold": round(best_thr, 4),
"best_precision": round(best_prec, 3),
"best_recall": round(best_rec, 3),
"samples": len(dataset), "samples": len(dataset),
"features": len(actual_feature_cols), "features": len(actual_feature_cols),
"time_weight_decay": time_weight_decay, "time_weight_decay": time_weight_decay,

View File

@@ -116,12 +116,11 @@ def _calc_signals(d: pd.DataFrame) -> np.ndarray:
def _rolling_zscore(arr: np.ndarray, window: int = 288) -> np.ndarray: def _rolling_zscore(arr: np.ndarray, window: int = 288) -> np.ndarray:
"""rolling window z-score 정규화. 15분봉 기준 3일(288캔들) 윈도우. """rolling window z-score 정규화. nan은 전파된다(nan-safe).
절대값 피처(ATR, 수익률 등)를 레짐 변화에 무관하게 만든다. 15분봉 기준 3일(288캔들) 윈도우. min_periods=1로 초반 데이터도 활용."""
min_periods=1로 초반 데이터도 활용하며, ddof=0(모표준편차)으로 계산한다."""
s = pd.Series(arr.astype(np.float64)) s = pd.Series(arr.astype(np.float64))
r = s.rolling(window=window, min_periods=1) r = s.rolling(window=window, min_periods=1)
mean = r.mean() mean = r.mean() # pandas rolling은 nan을 자동으로 건너뜀
std = r.std(ddof=0) std = r.std(ddof=0)
std = std.where(std >= 1e-8, other=1e-8) std = std.where(std >= 1e-8, other=1e-8)
z = (s - mean) / std z = (s - mean) / std
@@ -155,7 +154,7 @@ def _calc_features_vectorized(
macd_sig = d["macd_signal"] macd_sig = d["macd_signal"]
bb_range = bb_upper - bb_lower bb_range = bb_upper - bb_lower
bb_pct = np.where(bb_range > 0, (close - bb_lower) / bb_range, 0.5) bb_pct = (close - bb_lower) / (bb_range + 1e-8)
ema_align = np.where( ema_align = np.where(
(ema9 > ema21) & (ema21 > ema50), 1, (ema9 > ema21) & (ema21 > ema50), 1,
@@ -164,8 +163,8 @@ def _calc_features_vectorized(
) )
).astype(np.float32) ).astype(np.float32)
atr_pct = np.where(close > 0, atr / close, 0.0) atr_pct = atr / (close + 1e-8)
vol_ratio = np.where(vol_ma20 > 0, volume / vol_ma20, 1.0) vol_ratio = volume / (vol_ma20 + 1e-8)
ret_1 = close.pct_change(1).fillna(0).values ret_1 = close.pct_change(1).fillna(0).values
ret_3 = close.pct_change(3).fillna(0).values ret_3 = close.pct_change(3).fillna(0).values
@@ -243,8 +242,8 @@ def _calc_features_vectorized(
eth_r5 = _align(eth_ret_5, n).astype(np.float32) eth_r5 = _align(eth_ret_5, n).astype(np.float32)
xrp_r1 = ret_1.astype(np.float32) xrp_r1 = ret_1.astype(np.float32)
xrp_btc_rs_raw = np.where(btc_r1 != 0, xrp_r1 / btc_r1, 0.0).astype(np.float32) xrp_btc_rs_raw = (xrp_r1 / (btc_r1 + 1e-8)).astype(np.float32)
xrp_eth_rs_raw = np.where(eth_r1 != 0, xrp_r1 / eth_r1, 0.0).astype(np.float32) xrp_eth_rs_raw = (xrp_r1 / (eth_r1 + 1e-8)).astype(np.float32)
extra = pd.DataFrame({ extra = pd.DataFrame({
"btc_ret_1": _rolling_zscore(btc_r1), "btc_ret_1": _rolling_zscore(btc_r1),
@@ -258,10 +257,18 @@ def _calc_features_vectorized(
}, index=d.index) }, index=d.index)
result = pd.concat([result, extra], axis=1) result = pd.concat([result, extra], axis=1)
# OI 변화율 / 펀딩비 피처 (parquet에 컬럼이 있으면 z-score, 없으면 0) # OI 변화율 / 펀딩비 피처
# OI는 최근 30일치만 제공되므로 이전 구간은 0으로 채워진 채로 들어옴 # 컬럼 없으면 전체 nan, 있으면 0.0 구간(데이터 미제공 구간)을 nan으로 마스킹
oi_raw = d["oi_change"].values if "oi_change" in d.columns else np.zeros(len(d)) # LightGBM은 nan을 자체 처리; MLX는 fit()에서 nanmean/nanstd + nan_to_num 처리
fr_raw = d["funding_rate"].values if "funding_rate" in d.columns else np.zeros(len(d)) if "oi_change" in d.columns:
oi_raw = np.where(d["oi_change"].values == 0.0, np.nan, d["oi_change"].values)
else:
oi_raw = np.full(len(d), np.nan)
if "funding_rate" in d.columns:
fr_raw = np.where(d["funding_rate"].values == 0.0, np.nan, d["funding_rate"].values)
else:
fr_raw = np.full(len(d), np.nan)
result["oi_change"] = _rolling_zscore(oi_raw.astype(np.float64)) result["oi_change"] = _rolling_zscore(oi_raw.astype(np.float64))
result["funding_rate"] = _rolling_zscore(fr_raw.astype(np.float64)) result["funding_rate"] = _rolling_zscore(fr_raw.astype(np.float64))
@@ -356,7 +363,12 @@ def generate_dataset_vectorized(
feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df) feat_all = _calc_features_vectorized(d, signal_arr, btc_df=btc_df, eth_df=eth_df)
# 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만 # 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만
available_cols_for_nan_check = [c for c in FEATURE_COLS if c in feat_all.columns] # oi_change/funding_rate는 선택적 피처(컬럼 없으면 전체 nan)이므로 NaN 체크에서 제외
OPTIONAL_COLS = {"oi_change", "funding_rate"}
available_cols_for_nan_check = [
c for c in FEATURE_COLS
if c in feat_all.columns and c not in OPTIONAL_COLS
]
valid_rows = ( valid_rows = (
(signal_arr != "HOLD") & (signal_arr != "HOLD") &
(~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) & (~feat_all[available_cols_for_nan_check].isna().any(axis=1).values) &

View File

@@ -140,9 +140,12 @@ class MLXFilter:
X_np = X[FEATURE_COLS].values.astype(np.float32) X_np = X[FEATURE_COLS].values.astype(np.float32)
y_np = y.values.astype(np.float32) y_np = y.values.astype(np.float32)
self._mean = X_np.mean(axis=0) # nan-safe 정규화: nanmean/nanstd로 통계 계산 후 nan → 0.0 대치
self._std = X_np.std(axis=0) + 1e-8 # (z-score 후 0.0 = 평균값, 신경망에 줄 수 있는 가장 무난한 결측 대치값)
self._mean = np.nanmean(X_np, axis=0)
self._std = np.nanstd(X_np, axis=0) + 1e-8
X_np = (X_np - self._mean) / self._std X_np = (X_np - self._mean) / self._std
X_np = np.nan_to_num(X_np, nan=0.0)
w_np = sample_weight.astype(np.float32) if sample_weight is not None else None w_np = sample_weight.astype(np.float32) if sample_weight is not None else None
@@ -186,6 +189,7 @@ class MLXFilter:
X_np = X[FEATURE_COLS].values.astype(np.float32) X_np = X[FEATURE_COLS].values.astype(np.float32)
if self._trained and self._mean is not None: if self._trained and self._mean is not None:
X_np = (X_np - self._mean) / self._std X_np = (X_np - self._mean) / self._std
X_np = np.nan_to_num(X_np, nan=0.0)
x = mx.array(X_np) x = mx.array(X_np)
self._model.eval() self._model.eval()
logits = self._model(x) logits = self._model(x)

View File

@@ -91,3 +91,72 @@ def test_matches_original_generate_dataset(sample_df):
assert 0.5 <= ratio <= 2.0, ( assert 0.5 <= ratio <= 2.0, (
f"샘플 수 차이가 너무 큼: 벡터화={len(vec)}, 기존={len(orig)}, 비율={ratio:.2f}" f"샘플 수 차이가 너무 큼: 벡터화={len(vec)}, 기존={len(orig)}, 비율={ratio:.2f}"
) )
def test_epsilon_no_division_by_zero():
"""bb_range=0, close=0, vol_ma20=0 극단값에서 nan/inf가 발생하지 않아야 한다."""
import numpy as np
import pandas as pd
from src.dataset_builder import _calc_features_vectorized, _calc_signals, _calc_indicators
n = 100
# close를 모두 같은 값으로 → bb_range=0 유발
df = pd.DataFrame({
"open": np.ones(n),
"high": np.ones(n),
"low": np.ones(n),
"close": np.ones(n),
"volume": np.ones(n),
})
d = _calc_indicators(df)
sig = _calc_signals(d)
feat = _calc_features_vectorized(d, sig)
numeric_cols = feat.select_dtypes(include=[np.number]).columns
assert not feat[numeric_cols].isin([np.inf, -np.inf]).any().any(), \
"inf 값이 있으면 안 됨"
def test_oi_nan_masking_no_column():
"""oi_change 컬럼이 없으면 전체가 nan이어야 한다."""
import numpy as np
import pandas as pd
from src.dataset_builder import _calc_features_vectorized, _calc_signals, _calc_indicators
n = 100
np.random.seed(0)
df = pd.DataFrame({
"open": np.random.uniform(1, 2, n),
"high": np.random.uniform(2, 3, n),
"low": np.random.uniform(0.5, 1, n),
"close": np.random.uniform(1, 2, n),
"volume": np.random.uniform(1000, 5000, n),
})
d = _calc_indicators(df)
sig = _calc_signals(d)
feat = _calc_features_vectorized(d, sig)
assert feat["oi_change"].isna().all(), "oi_change 컬럼 없을 때 전부 nan이어야 함"
def test_oi_nan_masking_with_zeros():
"""oi_change 컬럼이 있어도 0.0 구간은 nan으로 마스킹되어야 한다."""
import numpy as np
import pandas as pd
from src.dataset_builder import _calc_features_vectorized, _calc_signals, _calc_indicators
n = 100
np.random.seed(0)
df = pd.DataFrame({
"open": np.random.uniform(1, 2, n),
"high": np.random.uniform(2, 3, n),
"low": np.random.uniform(0.5, 1, n),
"close": np.random.uniform(1, 2, n),
"volume": np.random.uniform(1000, 5000, n),
"oi_change": np.concatenate([np.zeros(50), np.random.uniform(-0.1, 0.1, 50)]),
})
d = _calc_indicators(df)
sig = _calc_signals(d)
feat = _calc_features_vectorized(d, sig)
assert feat["oi_change"].iloc[50:].notna().any(), "실제 OI 값 구간에 유한값이 있어야 함"

View File

@@ -65,6 +65,31 @@ def test_mlx_filter_fit_and_predict():
assert np.all((proba >= 0.0) & (proba <= 1.0)) assert np.all((proba >= 0.0) & (proba <= 1.0))
def test_fit_with_nan_features():
"""oi_change 피처에 nan이 포함된 경우 학습이 정상 완료되어야 한다."""
import numpy as np
import pandas as pd
from src.mlx_filter import MLXFilter
from src.ml_features import FEATURE_COLS
n = 300
np.random.seed(42)
X = pd.DataFrame(
np.random.randn(n, len(FEATURE_COLS)).astype(np.float32),
columns=FEATURE_COLS,
)
# oi_change 앞 절반을 nan으로
X["oi_change"] = np.where(np.arange(n) < n // 2, np.nan, X["oi_change"])
y = pd.Series((np.random.rand(n) > 0.5).astype(np.float32))
model = MLXFilter(input_dim=len(FEATURE_COLS), hidden_dim=32, epochs=3)
model.fit(X, y) # nan 있어도 예외 없이 완료되어야 함
proba = model.predict_proba(X)
assert not np.any(np.isnan(proba)), "예측 확률에 nan이 없어야 함"
assert proba.min() >= 0.0 and proba.max() <= 1.0
def test_mlx_filter_save_load(tmp_path): def test_mlx_filter_save_load(tmp_path):
"""저장 후 로드한 모델이 동일한 예측값을 반환해야 한다.""" """저장 후 로드한 모델이 동일한 예측값을 반환해야 한다."""
from src.mlx_filter import MLXFilter from src.mlx_filter import MLXFilter