diff --git a/models/training_log.json b/models/training_log.json index 58ec1dc..82ba125 100644 --- a/models/training_log.json +++ b/models/training_log.json @@ -31,5 +31,19 @@ "samples": 1696, "features": 21, "model_path": "models/lgbm_filter.pkl" + }, + { + "date": "2026-03-01T21:03:56.314547", + "auc": 0.5406, + "samples": 1707, + "features": 21, + "model_path": "models/lgbm_filter.pkl" + }, + { + "date": "2026-03-01T21:12:23.866860", + "auc": 0.502, + "samples": 3269, + "features": 21, + "model_path": "models/lgbm_filter.pkl" } ] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 07979a3..6011d22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ lightgbm>=4.3.0 scikit-learn>=1.4.0 joblib>=1.3.0 pyarrow>=15.0.0 +onnxruntime>=1.18.0 diff --git a/scripts/deploy_model.sh b/scripts/deploy_model.sh index f11a457..8d74296 100755 --- a/scripts/deploy_model.sh +++ b/scripts/deploy_model.sh @@ -1,65 +1,77 @@ #!/usr/bin/env bash # 맥미니에서 학습한 모델을 LXC 컨테이너 볼륨 경로로 전송한다. -# 사용법: bash scripts/deploy_model.sh [LXC_HOST] [LXC_MODELS_PATH] +# 사용법: bash scripts/deploy_model.sh [lgbm|mlx] # # 예시: -# bash scripts/deploy_model.sh 10.1.10.28 /path/to/cointrader/models -# bash scripts/deploy_model.sh root@10.1.10.28 /root/cointrader/models +# bash scripts/deploy_model.sh # LightGBM (기본값) +# bash scripts/deploy_model.sh mlx # MLX 신경망 set -euo pipefail -LXC_HOST="${1:-root@10.1.10.24}" -LXC_MODELS_PATH="${2:-/root/cointrader/models}" -LOCAL_MODEL="models/lgbm_filter.pkl" +BACKEND="${1:-lgbm}" +LXC_HOST="root@10.1.10.24" +LXC_MODELS_PATH="/root/cointrader/models" LOCAL_LOG="models/training_log.json" -if [[ ! -f "$LOCAL_MODEL" ]]; then - echo "[오류] 모델 파일 없음: $LOCAL_MODEL" - echo "먼저 python scripts/train_model.py 를 실행하세요." - exit 1 +# ── 백엔드별 파일 목록 설정 ────────────────────────────────────────────────── +# mlx: ONNX 파일만 전송 (Linux 서버는 onnxruntime으로 추론) +# lgbm: pkl 파일 전송 +RELOAD_CMD="from src.ml_filter import MLFilter; f=MLFilter(); f.reload_model(); print('리로드 완료')" +if [ "$BACKEND" = "mlx" ]; then + LOCAL_FILES=("models/mlx_filter.weights.onnx") +else + LOCAL_FILES=("models/lgbm_filter.pkl") fi -echo "=== 모델 전송 시작 ===" -echo " 대상: ${LXC_HOST}:${LXC_MODELS_PATH}" -echo " 파일: $LOCAL_MODEL" - -# 기존 모델을 prev로 백업 (원격) -ssh "${LXC_HOST}" " - if [ -f '${LXC_MODELS_PATH}/lgbm_filter.pkl' ]; then - cp '${LXC_MODELS_PATH}/lgbm_filter.pkl' '${LXC_MODELS_PATH}/lgbm_filter_prev.pkl' - echo ' 기존 모델 백업 완료' +# ── 파일 존재 확인 ──────────────────────────────────────────────────────────── +for f in "${LOCAL_FILES[@]}"; do + if [[ ! -f "$f" ]]; then + echo "[오류] 모델 파일 없음: $f" + exit 1 fi +done + +echo "=== 모델 전송 시작 (백엔드: ${BACKEND}) ===" +echo " 대상: ${LXC_HOST}:${LXC_MODELS_PATH}" + +# ── 원격 디렉터리 생성 + lgbm 기존 모델 백업 ───────────────────────────────── +ssh "${LXC_HOST}" " mkdir -p '${LXC_MODELS_PATH}' + if [ '$BACKEND' = 'lgbm' ] && [ -f '${LXC_MODELS_PATH}/lgbm_filter.pkl' ]; then + cp '${LXC_MODELS_PATH}/lgbm_filter.pkl' '${LXC_MODELS_PATH}/lgbm_filter_prev.pkl' + echo ' 기존 lgbm 모델 백업 완료' + fi " -# 모델 파일 전송 (rsync 우선, 없으면 scp 폴백) -if command -v rsync &>/dev/null && ssh "${LXC_HOST}" "command -v rsync" &>/dev/null; then - rsync -avz --progress \ - "$LOCAL_MODEL" \ - "${LXC_HOST}:${LXC_MODELS_PATH}/lgbm_filter.pkl" -else - echo " rsync 없음 → scp 사용" - scp "$LOCAL_MODEL" "${LXC_HOST}:${LXC_MODELS_PATH}/lgbm_filter.pkl" -fi - -# 학습 로그도 함께 전송 (있을 경우) -if [[ -f "$LOCAL_LOG" ]]; then +# ── 파일 전송 헬퍼 (rsync 우선, scp 폴백) ──────────────────────────────────── +_send() { + local src="$1" dst="$2" + echo " 전송: $src → ${LXC_HOST}:$dst" if command -v rsync &>/dev/null && ssh "${LXC_HOST}" "command -v rsync" &>/dev/null; then - rsync -avz "$LOCAL_LOG" "${LXC_HOST}:${LXC_MODELS_PATH}/training_log.json" + rsync -avz --progress "$src" "${LXC_HOST}:$dst" else - scp "$LOCAL_LOG" "${LXC_HOST}:${LXC_MODELS_PATH}/training_log.json" + scp "$src" "${LXC_HOST}:$dst" fi +} + +# ── 모델 파일 전송 ──────────────────────────────────────────────────────────── +for f in "${LOCAL_FILES[@]}"; do + _send "$f" "${LXC_MODELS_PATH}/$(basename "$f")" +done + +# ── 학습 로그 전송 ──────────────────────────────────────────────────────────── +if [[ -f "$LOCAL_LOG" ]]; then + _send "$LOCAL_LOG" "${LXC_MODELS_PATH}/training_log.json" echo " 학습 로그 전송 완료" fi echo "=== 전송 완료 ===" echo "" -# 봇 컨테이너가 실행 중이면 모델 핫리로드, 아니면 건너뜀 +# ── 핫리로드 ───────────────────────────────────────────────────────────────── echo "=== 핫리로드 시도 ===" if ssh "${LXC_HOST}" "docker inspect -f '{{.State.Running}}' cointrader 2>/dev/null | grep -q true"; then - ssh "${LXC_HOST}" "docker exec cointrader python -c \ - \"from src.ml_filter import MLFilter; f=MLFilter(); f.reload_model(); print('리로드 완료')\"" + ssh "${LXC_HOST}" "docker exec cointrader python -c \"${RELOAD_CMD}\"" echo "=== 핫리로드 완료 ===" else echo " cointrader 컨테이너가 실행 중이 아닙니다. 건너뜁니다." diff --git a/scripts/train_and_deploy.sh b/scripts/train_and_deploy.sh index 4d31cf9..f84260b 100755 --- a/scripts/train_and_deploy.sh +++ b/scripts/train_and_deploy.sh @@ -1,10 +1,10 @@ #!/usr/bin/env bash # 맥미니에서 전체 학습 파이프라인을 실행하고 LXC로 배포한다. -# 사용법: bash scripts/train_and_deploy.sh [LXC_HOST] [LXC_MODELS_PATH] +# 사용법: bash scripts/train_and_deploy.sh [mlx|lgbm] # # 예시: -# bash scripts/train_and_deploy.sh -# bash scripts/train_and_deploy.sh root@10.1.10.24 /root/cointrader/models +# bash scripts/train_and_deploy.sh # LightGBM (기본값) +# bash scripts/train_and_deploy.sh mlx # MLX GPU 학습 set -euo pipefail @@ -19,37 +19,35 @@ else echo "경고: 가상환경을 찾을 수 없습니다 ($VENV_PATH). 시스템 Python을 사용합니다." >&2 fi -LXC_HOST="${1:-root@10.1.10.24}" -LXC_MODELS_PATH="${2:-/root/cointrader/models}" +BACKEND="${1:-lgbm}" cd "$PROJECT_ROOT" -echo "=== [1/3] 데이터 수집 (XRP + BTC + ETH 3심볼) ===" +echo "=== [1/3] 데이터 수집 (XRP + BTC + ETH 3심볼, 1년치) ===" python scripts/fetch_history.py \ --symbols XRPUSDT BTCUSDT ETHUSDT \ --interval 1m \ - --days 90 \ + --days 365 \ --output data/xrpusdt_1m.parquet # 결과: data/combined_1m.parquet (타임스탬프 기준 병합) echo "" echo "=== [2/3] 모델 학습 (21개 피처: XRP 13 + BTC/ETH 상관관계 8) ===" -# TRAIN_BACKEND=mlx 로 설정하면 Apple Silicon GPU(Metal)를 사용한다 (기본: lgbm) -BACKEND="${TRAIN_BACKEND:-lgbm}" +DECAY="${TIME_WEIGHT_DECAY:-2.0}" if [ "$BACKEND" = "mlx" ]; then - echo " 백엔드: MLX (Apple Silicon GPU)" - python scripts/train_mlx_model.py --data data/combined_1m.parquet + echo " 백엔드: MLX (Apple Silicon GPU), decay=${DECAY}" + python scripts/train_mlx_model.py --data data/combined_1m.parquet --decay "$DECAY" else - echo " 백엔드: LightGBM (CPU)" - python scripts/train_model.py --data data/combined_1m.parquet + echo " 백엔드: LightGBM (CPU), decay=${DECAY}" + python scripts/train_model.py --data data/combined_1m.parquet --decay "$DECAY" fi echo "" echo "=== [3/3] LXC 배포 ===" -bash scripts/deploy_model.sh "$LXC_HOST" "$LXC_MODELS_PATH" +bash scripts/deploy_model.sh "$BACKEND" echo "" echo "=== 전체 파이프라인 완료 ===" echo "" echo "봇 재시작이 필요하면:" -echo " ssh ${LXC_HOST} 'cd /root/cointrader && docker compose restart cointrader'" +echo " ssh root@10.1.10.24 'cd /root/cointrader && docker compose restart cointrader'" diff --git a/scripts/train_mlx_model.py b/scripts/train_mlx_model.py index 1da1369..a00da07 100644 --- a/scripts/train_mlx_model.py +++ b/scripts/train_mlx_model.py @@ -25,14 +25,14 @@ MLX_MODEL_PATH = Path("models/mlx_filter.weights") LOG_PATH = Path("models/training_log.json") -def train_mlx(data_path: str) -> float: +def train_mlx(data_path: str, time_weight_decay: float = 2.0) -> float: print(f"데이터 로드: {data_path}") df = pd.read_parquet(data_path) print(f"캔들 수: {len(df)}") print("\n데이터셋 생성 중...") t0 = time.perf_counter() - dataset = generate_dataset_vectorized(df) + dataset = generate_dataset_vectorized(df, time_weight_decay=time_weight_decay) t1 = time.perf_counter() print(f"데이터셋 생성 완료: {t1 - t0:.1f}초, {len(dataset)}개 샘플") @@ -46,10 +46,30 @@ def train_mlx(data_path: str) -> float: X = dataset[FEATURE_COLS] y = dataset["label"] + w = dataset["sample_weight"].values split = int(len(X) * 0.8) X_train, X_val = X.iloc[:split], X.iloc[split:] y_train, y_val = y.iloc[:split], y.iloc[split:] + w_train = w[:split] + + # --- 클래스 불균형 처리: 언더샘플링 (가중치 인덱스 보존) --- + pos_idx = np.where(y_train == 1)[0] + neg_idx = np.where(y_train == 0)[0] + + if len(neg_idx) > len(pos_idx): + np.random.seed(42) + neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False) + + balanced_idx = np.concatenate([pos_idx, neg_idx]) + np.random.shuffle(balanced_idx) + + X_train = X_train.iloc[balanced_idx] + y_train = y_train.iloc[balanced_idx] + w_train = w_train[balanced_idx] + + print(f"\n언더샘플링 적용 후 학습 데이터: {len(X_train)}개 (양성={y_train.sum()}, 음성={(y_train==0).sum()})") + # -------------------------------------- print("\nMLX 신경망 학습 시작 (GPU)...") t2 = time.perf_counter() @@ -60,7 +80,7 @@ def train_mlx(data_path: str) -> float: epochs=100, batch_size=256, ) - model.fit(X_train, y_train) + model.fit(X_train, y_train, sample_weight=w_train) t3 = time.perf_counter() print(f"학습 완료: {t3 - t2:.1f}초") @@ -83,6 +103,7 @@ def train_mlx(data_path: str) -> float: "auc": round(auc, 4), "samples": len(dataset), "train_sec": round(t3 - t2, 1), + "time_weight_decay": time_weight_decay, "model_path": str(MLX_MODEL_PATH), }) with open(LOG_PATH, "w") as f: @@ -94,8 +115,12 @@ def train_mlx(data_path: str) -> float: def main(): parser = argparse.ArgumentParser() parser.add_argument("--data", default="data/xrpusdt_1m.parquet") + parser.add_argument( + "--decay", type=float, default=2.0, + help="시간 가중치 감쇠 강도 (0=균등, 2.0=최신이 ~7.4배 높음)", + ) args = parser.parse_args() - train_mlx(args.data) + train_mlx(args.data, time_weight_decay=args.decay) if __name__ == "__main__": diff --git a/scripts/train_model.py b/scripts/train_model.py index bd6226a..59d6d65 100644 --- a/scripts/train_model.py +++ b/scripts/train_model.py @@ -146,7 +146,7 @@ def generate_dataset(df: pd.DataFrame, n_jobs: int | None = None) -> pd.DataFram return pd.DataFrame(rows) -def train(data_path: str): +def train(data_path: str, time_weight_decay: float = 2.0): print(f"데이터 로드: {data_path}") df_raw = pd.read_parquet(data_path) print(f"캔들 수: {len(df_raw)}, 컬럼: {list(df_raw.columns)}") @@ -169,7 +169,7 @@ def train(data_path: str): df = df_raw[base_cols].copy() print("데이터셋 생성 중...") - dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df) + dataset = generate_dataset_vectorized(df, btc_df=btc_df, eth_df=eth_df, time_weight_decay=time_weight_decay) if dataset.empty or "label" not in dataset.columns: raise ValueError(f"데이터셋 생성 실패: 샘플 0개. 위 오류 메시지를 확인하세요.") @@ -183,10 +183,30 @@ def train(data_path: str): print(f"사용 피처: {len(actual_feature_cols)}개 {actual_feature_cols}") X = dataset[actual_feature_cols] y = dataset["label"] + w = dataset["sample_weight"].values split = int(len(X) * 0.8) X_train, X_val = X.iloc[:split], X.iloc[split:] y_train, y_val = y.iloc[:split], y.iloc[split:] + w_train = w[:split] + + # --- 클래스 불균형 처리: 언더샘플링 (가중치 인덱스 보존) --- + pos_idx = np.where(y_train == 1)[0] + neg_idx = np.where(y_train == 0)[0] + + if len(neg_idx) > len(pos_idx): + np.random.seed(42) + neg_idx = np.random.choice(neg_idx, size=len(pos_idx), replace=False) + + balanced_idx = np.concatenate([pos_idx, neg_idx]) + np.random.shuffle(balanced_idx) + + X_train = X_train.iloc[balanced_idx] + y_train = y_train.iloc[balanced_idx] + w_train = w_train[balanced_idx] + + print(f"\n언더샘플링 적용 후 학습 데이터: {len(X_train)}개 (양성={y_train.sum()}, 음성={(y_train==0).sum()})") + # -------------------------------------- model = lgb.LGBMClassifier( n_estimators=300, @@ -201,6 +221,7 @@ def train(data_path: str): ) model.fit( X_train, y_train, + sample_weight=w_train, eval_set=[(X_val, y_val)], callbacks=[lgb.early_stopping(30, verbose=False), lgb.log_evaluation(50)], ) @@ -225,9 +246,11 @@ def train(data_path: str): log = json.load(f) log.append({ "date": datetime.now().isoformat(), + "backend": "lgbm", "auc": round(auc, 4), "samples": len(dataset), "features": len(actual_feature_cols), + "time_weight_decay": time_weight_decay, "model_path": str(MODEL_PATH), }) with open(LOG_PATH, "w") as f: @@ -239,8 +262,12 @@ def train(data_path: str): def main(): parser = argparse.ArgumentParser() parser.add_argument("--data", default="data/xrpusdt_1m.parquet") + parser.add_argument( + "--decay", type=float, default=2.0, + help="시간 가중치 감쇠 강도 (0=균등, 2.0=최신이 ~7.4배 높음)", + ) args = parser.parse_args() - train(args.data) + train(args.data, time_weight_decay=args.decay) if __name__ == "__main__": diff --git a/src/dataset_builder.py b/src/dataset_builder.py index eb8b44c..098b432 100644 --- a/src/dataset_builder.py +++ b/src/dataset_builder.py @@ -275,28 +275,26 @@ def _calc_labels_vectorized( fut_high = highs[idx + 1 : end] fut_low = lows[idx + 1 : end] - label = None + label = 0 # 미도달(타임아웃) 시 실패로 간주 + for h, l in zip(fut_high, fut_low): if signal == "LONG": - if h >= tp: - label = 1 - break if l <= sl: label = 0 break - else: - if l <= tp: + if h >= tp: label = 1 break + else: # SHORT if h >= sl: label = 0 break + if l <= tp: + label = 1 + break - if label is None: - valid_mask.append(False) - else: - labels.append(label) - valid_mask.append(True) + labels.append(label) + valid_mask.append(True) return np.array(labels, dtype=np.int8), np.array(valid_mask, dtype=bool) @@ -305,11 +303,17 @@ def generate_dataset_vectorized( df: pd.DataFrame, btc_df: pd.DataFrame | None = None, eth_df: pd.DataFrame | None = None, + time_weight_decay: float = 0.0, ) -> pd.DataFrame: """ 전체 시계열을 1회 계산해 학습 데이터셋을 생성한다. 기존 generate_dataset()의 drop-in 대체제. btc_df, eth_df가 제공되면 21개 피처로 확장한다. + + time_weight_decay: 지수 감쇠 강도. 0이면 균등 가중치. + 양수일수록 최신 샘플에 더 높은 가중치를 부여한다. + 예) 2.0 → 최신 샘플이 가장 오래된 샘플보다 e^2 ≈ 7.4배 높은 가중치. + 결과 DataFrame에 'sample_weight' 컬럼으로 포함된다. """ print(" [1/3] 전체 시계열 지표 계산 (1회)...") d = _calc_indicators(df) @@ -338,4 +342,17 @@ def generate_dataset_vectorized( feat_final = feat_all.iloc[final_idx][available_feature_cols].copy() feat_final["label"] = labels - return feat_final.reset_index(drop=True) + # 시간 가중치: 오래된 샘플 → 낮은 가중치, 최신 샘플 → 높은 가중치 + n = len(feat_final) + if time_weight_decay > 0 and n > 1: + weights = np.exp(time_weight_decay * np.linspace(0.0, 1.0, n)).astype(np.float32) + weights /= weights.mean() # 평균 1로 정규화해 학습률 스케일 유지 + print(f" 시간 가중치 적용 (decay={time_weight_decay}): " + f"min={weights.min():.3f}, max={weights.max():.3f}") + else: + weights = np.ones(n, dtype=np.float32) + + feat_final = feat_final.reset_index(drop=True) + feat_final["sample_weight"] = weights + + return feat_final diff --git a/src/label_builder.py b/src/label_builder.py index 1427ad0..7c763e0 100644 --- a/src/label_builder.py +++ b/src/label_builder.py @@ -9,21 +9,17 @@ def build_labels( stop_loss: float, side: str, ) -> Optional[int]: - """ - 진입 이후 미래 캔들을 순서대로 확인해 TP/SL 도달 여부를 판단한다. - LONG: high >= TP → 1, low <= SL → 0 - SHORT: low <= TP → 1, high >= SL → 0 - 둘 다 미도달 → None (학습 데이터에서 제외) - """ for high, low in zip(future_highs, future_lows): if side == "LONG": - if high >= take_profit: - return 1 + # 보수적 접근: 손절(SL)을 먼저 체크 if low <= stop_loss: return 0 - else: # SHORT - if low <= take_profit: + if high >= take_profit: return 1 + else: # SHORT + # 보수적 접근: 손절(SL)을 먼저 체크 if high >= stop_loss: return 0 + if low <= take_profit: + return 1 return None diff --git a/src/ml_filter.py b/src/ml_filter.py index 7d8aa3e..cbe02c9 100644 --- a/src/ml_filter.py +++ b/src/ml_filter.py @@ -1,32 +1,63 @@ from pathlib import Path import joblib +import numpy as np import pandas as pd from loguru import logger +from src.ml_features import FEATURE_COLS + +ONNX_MODEL_PATH = Path("models/mlx_filter.weights.onnx") +LGBM_MODEL_PATH = Path("models/lgbm_filter.pkl") + class MLFilter: """ - LightGBM 모델을 로드하고 진입 여부를 판단한다. - 모델 파일이 없으면 항상 진입을 허용한다 (폴백). + ML 필터. ONNX(MLX 신경망) 우선 로드, 없으면 LightGBM으로 폴백한다. + 둘 다 없으면 항상 진입을 허용한다. + + 우선순위: ONNX > LightGBM > 폴백(항상 허용) """ - def __init__(self, model_path: str = "models/lgbm_filter.pkl", threshold: float = 0.60): - self._model_path = Path(model_path) + def __init__( + self, + onnx_path: str = str(ONNX_MODEL_PATH), + lgbm_path: str = str(LGBM_MODEL_PATH), + threshold: float = 0.60, + ): + self._onnx_path = Path(onnx_path) + self._lgbm_path = Path(lgbm_path) self._threshold = threshold - self._model = None + self._onnx_session = None + self._lgbm_model = None self._try_load() def _try_load(self): - if self._model_path.exists(): + # ONNX 우선 시도 + if self._onnx_path.exists(): try: - self._model = joblib.load(self._model_path) - logger.info(f"ML 필터 모델 로드 완료: {self._model_path}") + import onnxruntime as ort + self._onnx_session = ort.InferenceSession( + str(self._onnx_path), + providers=["CPUExecutionProvider"], + ) + self._lgbm_model = None + logger.info(f"ML 필터 ONNX 모델 로드 완료: {self._onnx_path}") + return except Exception as e: - logger.warning(f"ML 필터 모델 로드 실패: {e}") - self._model = None + logger.warning(f"ONNX 모델 로드 실패: {e}") + self._onnx_session = None + + # LightGBM 폴백 + if self._lgbm_path.exists(): + try: + self._lgbm_model = joblib.load(self._lgbm_path) + logger.info(f"ML 필터 LightGBM 모델 로드 완료: {self._lgbm_path}") + except Exception as e: + logger.warning(f"LightGBM 모델 로드 실패: {e}") + self._lgbm_model = None def is_model_loaded(self) -> bool: - return self._model is not None + return self._onnx_session is not None or self._lgbm_model is not None def should_enter(self, features: pd.Series) -> bool: """ @@ -36,8 +67,13 @@ class MLFilter: if not self.is_model_loaded(): return True try: - X = features.to_frame().T - proba = self._model.predict_proba(X)[0][1] + if self._onnx_session is not None: + input_name = self._onnx_session.get_inputs()[0].name + X = features[FEATURE_COLS].values.astype(np.float32).reshape(1, -1) + proba = float(self._onnx_session.run(None, {input_name: X})[0][0]) + else: + X = features.to_frame().T + proba = float(self._lgbm_model.predict_proba(X)[0][1]) logger.debug(f"ML 필터 확률: {proba:.3f} (임계값: {self._threshold})") return bool(proba >= self._threshold) except Exception as e: @@ -46,5 +82,7 @@ class MLFilter: def reload_model(self): """재학습 후 모델을 핫 리로드한다.""" + self._onnx_session = None + self._lgbm_model = None self._try_load() logger.info("ML 필터 모델 리로드 완료") diff --git a/src/mlx_filter.py b/src/mlx_filter.py index 2698ff3..4545986 100644 --- a/src/mlx_filter.py +++ b/src/mlx_filter.py @@ -1,6 +1,7 @@ """ Apple MLX 기반 경량 신경망 필터. M4의 통합 GPU를 자동으로 활용한다. +학습 후 ONNX로 export해 Linux 서버에서 onnxruntime으로 추론한다. """ import numpy as np import pandas as pd @@ -12,6 +13,83 @@ from pathlib import Path from src.ml_features import FEATURE_COLS +def _export_onnx( + weights_npz: Path, + meta_npz: Path, + onnx_path: Path, +) -> None: + """ + MLX 가중치(.npz)를 읽어 ONNX 그래프로 변환한다. + 네트워크 구조: fc1(ReLU) → dropout(추론 시 비활성) → fc2(ReLU) → fc3 → sigmoid + """ + import onnx + from onnx import helper, TensorProto, numpy_helper + + meta = np.load(meta_npz) + mean: np.ndarray = meta["mean"].astype(np.float32) + std: np.ndarray = meta["std"].astype(np.float32) + input_dim = int(meta["input_dim"]) + hidden_dim = int(meta["hidden_dim"]) + + w = np.load(weights_npz) + # MLX save_weights 키 패턴: fc1.weight, fc1.bias, ... + fc1_w = w["fc1.weight"].astype(np.float32) # (hidden, input) + fc1_b = w["fc1.bias"].astype(np.float32) + fc2_w = w["fc2.weight"].astype(np.float32) # (hidden//2, hidden) + fc2_b = w["fc2.bias"].astype(np.float32) + fc3_w = w["fc3.weight"].astype(np.float32) # (1, hidden//2) + fc3_b = w["fc3.bias"].astype(np.float32) + + def _t(name: str, arr: np.ndarray) -> onnx.TensorProto: + return numpy_helper.from_array(arr, name=name) + + initializers = [ + _t("mean", mean), + _t("std", std), + _t("fc1_w", fc1_w), + _t("fc1_b", fc1_b), + _t("fc2_w", fc2_w), + _t("fc2_b", fc2_b), + _t("fc3_w", fc3_w), + _t("fc3_b", fc3_b), + ] + + nodes = [ + # 정규화: (x - mean) / std + helper.make_node("Sub", ["X", "mean"], ["x_sub"]), + helper.make_node("Div", ["x_sub", "std"], ["x_norm"]), + # fc1: x_norm @ fc1_w.T + fc1_b + helper.make_node("Gemm", ["x_norm", "fc1_w", "fc1_b"], ["fc1_out"], + transB=1), + helper.make_node("Relu", ["fc1_out"], ["relu1"]), + # fc2: relu1 @ fc2_w.T + fc2_b + helper.make_node("Gemm", ["relu1", "fc2_w", "fc2_b"], ["fc2_out"], + transB=1), + helper.make_node("Relu", ["fc2_out"], ["relu2"]), + # fc3: relu2 @ fc3_w.T + fc3_b → (N, 1) + helper.make_node("Gemm", ["relu2", "fc3_w", "fc3_b"], ["logits"], + transB=1), + # sigmoid → (N, 1) + helper.make_node("Sigmoid", ["logits"], ["proba_2d"]), + # squeeze: (N, 1) → (N,) + helper.make_node("Flatten", ["proba_2d"], ["proba"], axis=0), + ] + + graph = helper.make_graph( + nodes, + "mlx_filter", + inputs=[helper.make_tensor_value_info("X", TensorProto.FLOAT, [None, input_dim])], + outputs=[helper.make_tensor_value_info("proba", TensorProto.FLOAT, [None])], + initializer=initializers, + ) + model_proto = helper.make_model(graph, opset_imports=[helper.make_opsetid("", 17)]) + model_proto.ir_version = 8 + onnx.checker.check_model(model_proto) + onnx_path.parent.mkdir(exist_ok=True) + onnx.save(model_proto, str(onnx_path)) + print(f" ONNX export 완료: {onnx_path}") + + class _Net(nn.Module): """3층 MLP 이진 분류기.""" @@ -53,7 +131,12 @@ class MLXFilter: self._std: np.ndarray | None = None self._trained = False - def fit(self, X: pd.DataFrame, y: pd.Series) -> "MLXFilter": + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + sample_weight: np.ndarray | None = None, + ) -> "MLXFilter": X_np = X[FEATURE_COLS].values.astype(np.float32) y_np = y.values.astype(np.float32) @@ -61,11 +144,20 @@ class MLXFilter: self._std = X_np.std(axis=0) + 1e-8 X_np = (X_np - self._mean) / self._std + w_np = sample_weight.astype(np.float32) if sample_weight is not None else None + optimizer = optim.Adam(learning_rate=self.lr) - def loss_fn(model: _Net, x: mx.array, y: mx.array) -> mx.array: + def loss_fn( + model: _Net, x: mx.array, y: mx.array, w: mx.array | None + ) -> mx.array: logits = model(x) - return nn.losses.binary_cross_entropy(logits, y, with_logits=True) + per_sample = nn.losses.binary_cross_entropy( + logits, y, with_logits=True, reduction="none" + ) + if w is not None: + return (per_sample * w).sum() / w.sum() + return per_sample.mean() loss_and_grad = nn.value_and_grad(self._model, loss_fn) @@ -78,7 +170,8 @@ class MLXFilter: batch_idx = idx[start : start + self.batch_size] x_batch = mx.array(X_np[batch_idx]) y_batch = mx.array(y_np[batch_idx]) - loss, grads = loss_and_grad(self._model, x_batch, y_batch) + w_batch = mx.array(w_np[batch_idx]) if w_np is not None else None + loss, grads = loss_and_grad(self._model, x_batch, y_batch, w_batch) optimizer.update(self._model, grads) mx.eval(self._model.parameters(), optimizer.state) epoch_loss += loss.item() @@ -114,6 +207,12 @@ class MLXFilter: input_dim=np.array(self.input_dim), hidden_dim=np.array(self.hidden_dim), ) + # ONNX export: Linux 서버에서 onnxruntime으로 추론하기 위해 변환 + try: + onnx_path = path.with_suffix(".onnx") + _export_onnx(weights_path, meta_path, onnx_path) + except ImportError: + print(" [경고] onnx 패키지 없음 → ONNX export 생략 (pip install onnx)") @classmethod def load(cls, path: str | Path) -> "MLXFilter":