diff --git a/README.md b/README.md new file mode 100644 index 0000000..1ca59af --- /dev/null +++ b/README.md @@ -0,0 +1,167 @@ +# CoinTrader + +Binance Futures 자동매매 봇. 복합 기술 지표와 LightGBM ML 필터를 결합하여 XRPUSDT(기본) 선물 포지션을 자동으로 진입·청산하며, Discord로 실시간 알림을 전송합니다. + +--- + +## 주요 기능 + +- **복합 기술 지표 신호**: RSI, MACD 크로스, 볼린저 밴드, EMA 정/역배열, Stochastic RSI, 거래량 급증 — 3개 이상 일치 시 진입 +- **ML 필터 (LightGBM)**: 기술 지표 신호를 한 번 더 검증하여 오진입 차단 (모델 없으면 자동 폴백) +- **ATR 기반 손절/익절**: 변동성에 따라 동적으로 SL/TP 계산 (1.5× / 3.0× ATR) +- **리스크 관리**: 트레이드당 리스크 비율, 최대 포지션 수, 일일 손실 한도 제어 +- **포지션 복구**: 봇 재시작 시 기존 포지션 자동 감지 및 상태 복원 +- **자동 재학습**: 매일 새벽 3시 ML 모델 재학습 및 핫 리로드 +- **Discord 알림**: 진입·청산·오류 이벤트 실시간 웹훅 알림 +- **CI/CD**: Jenkins + Gitea Container Registry 기반 Docker 이미지 자동 빌드·배포 + +--- + +## 프로젝트 구조 + +``` +cointrader/ +├── main.py # 진입점 +├── src/ +│ ├── bot.py # 메인 트레이딩 루프 +│ ├── config.py # 환경변수 기반 설정 +│ ├── exchange.py # Binance Futures API 클라이언트 +│ ├── data_stream.py # WebSocket 1분봉 스트림 +│ ├── indicators.py # 기술 지표 계산 및 신호 생성 +│ ├── ml_filter.py # LightGBM 진입 필터 +│ ├── ml_features.py # ML 피처 빌더 +│ ├── label_builder.py # 학습 레이블 생성 +│ ├── retrainer.py # 모델 자동 재학습 스케줄러 +│ ├── risk_manager.py # 리스크 관리 +│ ├── notifier.py # Discord 웹훅 알림 +│ └── logger_setup.py # Loguru 로거 설정 +├── scripts/ +│ ├── fetch_history.py # 과거 데이터 수집 +│ └── train_model.py # ML 모델 수동 학습 +├── models/ # 학습된 모델 저장 (.pkl) +├── data/ # 과거 데이터 캐시 +├── logs/ # 로그 파일 +├── tests/ # 테스트 코드 +├── Dockerfile +├── docker-compose.yml +├── Jenkinsfile +└── requirements.txt +``` + +--- + +## 빠른 시작 + +### 1. 환경변수 설정 + +```bash +cp .env.example .env +``` + +`.env` 파일을 열어 아래 값을 채웁니다. + +```env +BINANCE_API_KEY=your_api_key +BINANCE_API_SECRET=your_api_secret +SYMBOL=XRPUSDT +LEVERAGE=10 +RISK_PER_TRADE=0.02 +DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/... +``` + +### 2. 로컬 실행 + +```bash +pip install -r requirements.txt +python main.py +``` + +### 3. Docker Compose로 실행 + +```bash +docker compose up -d +``` + +로그 확인: + +```bash +docker compose logs -f cointrader +``` + +--- + +## ML 모델 학습 + +봇은 모델 파일(`models/lgbm_filter.pkl`)이 없으면 ML 필터 없이 동작합니다. 최초 실행 전 또는 수동 재학습 시 아래 순서로 진행합니다. + +```bash +# 1. 과거 데이터 수집 +python scripts/fetch_history.py + +# 2. 모델 학습 +python scripts/train_model.py +``` + +학습된 모델은 `models/lgbm_filter.pkl`에 저장되며, 봇이 실행 중이면 매일 새벽 3시에 자동으로 재학습·리로드됩니다. + +--- + +## 매매 전략 + +| 지표 | 롱 조건 | 숏 조건 | 가중치 | +|------|---------|---------|--------| +| RSI (14) | < 35 | > 65 | 1 | +| MACD 크로스 | 골든크로스 | 데드크로스 | 2 | +| 볼린저 밴드 | 하단 이탈 | 상단 돌파 | 1 | +| EMA 정배열 (9/21/50) | 정배열 | 역배열 | 1 | +| Stochastic RSI | < 20 + K>D | > 80 + K **이 봇은 실제 자산을 거래합니다.** 운영 전 반드시 Binance Testnet에서 충분히 검증하세요. +> 과거 수익이 미래 수익을 보장하지 않습니다. 투자 손실에 대한 책임은 사용자 본인에게 있습니다. diff --git a/scripts/fetch_history.py b/scripts/fetch_history.py index 510d46b..d3404b3 100644 --- a/scripts/fetch_history.py +++ b/scripts/fetch_history.py @@ -2,6 +2,10 @@ 바이낸스 선물 REST API로 과거 캔들 데이터를 수집해 parquet으로 저장한다. 사용법: python scripts/fetch_history.py --symbol XRPUSDT --interval 1m --days 90 """ +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) + import asyncio import argparse from datetime import datetime, timedelta diff --git a/scripts/train_model.py b/scripts/train_model.py index 828ed41..516041d 100644 --- a/scripts/train_model.py +++ b/scripts/train_model.py @@ -2,9 +2,15 @@ 과거 캔들 데이터로 LightGBM 필터 모델을 학습하고 저장한다. 사용법: python scripts/train_model.py --data data/xrpusdt_1m.parquet """ +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) + import argparse import json +import os from datetime import datetime +from multiprocessing import Pool, cpu_count from pathlib import Path import joblib @@ -12,7 +18,6 @@ import lightgbm as lgb import numpy as np import pandas as pd from sklearn.metrics import roc_auc_score, classification_report -from sklearn.model_selection import TimeSeriesSplit from src.indicators import Indicators from src.ml_features import build_features, FEATURE_COLS @@ -26,61 +31,100 @@ PREV_MODEL_PATH = Path("models/lgbm_filter_prev.pkl") LOG_PATH = Path("models/training_log.json") -def generate_dataset(df: pd.DataFrame) -> pd.DataFrame: - """신호 발생 시점마다 피처와 레이블을 생성한다.""" - rows = [] +def _process_index(args: tuple) -> dict | None: + """단일 인덱스에 대해 피처+레이블을 계산한다. Pool worker 함수.""" + i, df_values, df_columns = args + df = pd.DataFrame(df_values, columns=df_columns) + + window = df.iloc[i - 60: i + 1].copy() + ind = Indicators(window) + df_ind = ind.calculate_all() + + if df_ind.isna().any().any(): + return None + + signal = ind.get_signal(df_ind) + if signal == "HOLD": + return None + + entry_price = float(df_ind["close"].iloc[-1]) + atr = float(df_ind["atr"].iloc[-1]) + if atr <= 0: + return None + + stop_loss = entry_price - atr * ATR_SL_MULT if signal == "LONG" else entry_price + atr * ATR_SL_MULT + take_profit = entry_price + atr * ATR_TP_MULT if signal == "LONG" else entry_price - atr * ATR_TP_MULT + + future = df.iloc[i + 1: i + 1 + LOOKAHEAD] + label = build_labels( + future_closes=future["close"].tolist(), + future_highs=future["high"].tolist(), + future_lows=future["low"].tolist(), + take_profit=take_profit, + stop_loss=stop_loss, + side=signal, + ) + if label is None: + return None + + features = build_features(df_ind, signal) + row = features.to_dict() + row["label"] = label + return row + + +def generate_dataset(df: pd.DataFrame, n_jobs: int | None = None) -> pd.DataFrame: + """신호 발생 시점마다 피처와 레이블을 병렬로 생성한다.""" total = len(df) + indices = range(60, total - LOOKAHEAD) - for i in range(60, total - LOOKAHEAD): - window = df.iloc[i - 60: i + 1].copy() - ind = Indicators(window) - df_ind = ind.calculate_all() + workers = n_jobs or max(1, cpu_count() - 1) + print(f" 병렬 처리: {workers}코어 사용 (총 {len(indices):,}개 인덱스)") - if df_ind.isna().any().any(): - continue + # DataFrame을 numpy로 변환해서 worker 간 전달 비용 최소화 + df_values = df.values + df_columns = list(df.columns) + task_args = [(i, df_values, df_columns) for i in indices] - signal = ind.get_signal(df_ind) - if signal == "HOLD": - continue + rows = [] + errors = [] + chunk = max(1, len(task_args) // (workers * 10)) + with Pool(processes=workers) as pool: + for idx, result in enumerate(pool.imap(_process_index, task_args, chunksize=chunk)): + if isinstance(result, dict): + rows.append(result) + elif result is not None: + errors.append(result) + if (idx + 1) % 10000 == 0: + print(f" 진행: {idx + 1:,}/{len(task_args):,} | 샘플: {len(rows):,}개") - entry_price = float(df_ind["close"].iloc[-1]) - atr = float(df_ind["atr"].iloc[-1]) - if atr <= 0: - continue + if errors: + print(f" [경고] worker 오류 {len(errors)}건: {errors[0]}") - stop_loss = entry_price - atr * ATR_SL_MULT if signal == "LONG" else entry_price + atr * ATR_SL_MULT - take_profit = entry_price + atr * ATR_TP_MULT if signal == "LONG" else entry_price - atr * ATR_TP_MULT - - future = df.iloc[i + 1: i + 1 + LOOKAHEAD] - label = build_labels( - future_closes=future["close"].tolist(), - future_highs=future["high"].tolist(), - future_lows=future["low"].tolist(), - take_profit=take_profit, - stop_loss=stop_loss, - side=signal, - ) - if label is None: - continue - - features = build_features(df_ind, signal) - row = features.to_dict() - row["label"] = label - rows.append(row) - - if len(rows) % 500 == 0: - print(f" 샘플 생성 중: {len(rows)}개 (인덱스 {i}/{total})") + if not rows: + print(" [오류] 생성된 샘플이 없습니다. worker 예외 여부를 확인합니다...") + # 단일 프로세스로 첫 번째 인덱스를 직접 실행해서 예외 확인 + try: + test_result = _process_index(task_args[0]) + print(f" 단일 실행 결과: {test_result}") + except Exception as e: + import traceback + print(f" 단일 실행 예외:\n{traceback.format_exc()}") return pd.DataFrame(rows) -def train(data_path: str): +def train(data_path: str, n_jobs: int | None = None): print(f"데이터 로드: {data_path}") df = pd.read_parquet(data_path) print(f"캔들 수: {len(df)}") print("데이터셋 생성 중...") - dataset = generate_dataset(df) + dataset = generate_dataset(df, n_jobs=n_jobs) + + if dataset.empty or "label" not in dataset.columns: + raise ValueError(f"데이터셋 생성 실패: 샘플 0개. 위 오류 메시지를 확인하세요.") + print(f"학습 샘플: {len(dataset)}개 (양성={dataset['label'].sum():.0f}, 음성={(dataset['label']==0).sum():.0f})") if len(dataset) < 200: @@ -143,8 +187,10 @@ def train(data_path: str): def main(): parser = argparse.ArgumentParser() parser.add_argument("--data", default="data/xrpusdt_1m.parquet") + parser.add_argument("--jobs", type=int, default=None, + help="병렬 worker 수 (기본: CPU 수 - 1)") args = parser.parse_args() - train(args.data) + train(args.data, n_jobs=args.jobs) if __name__ == "__main__":