From e1560f882b12719cf961ac099e1ab0ef50980ae4 Mon Sep 17 00:00:00 2001 From: 21in7 Date: Sun, 1 Mar 2026 18:52:34 +0900 Subject: [PATCH] feat: add vectorized dataset builder (1x pandas_ta call) Made-with: Cursor --- src/dataset_builder.py | 293 ++++++++++++++++++++++++++++++++++ tests/test_dataset_builder.py | 65 ++++++++ 2 files changed, 358 insertions(+) create mode 100644 src/dataset_builder.py create mode 100644 tests/test_dataset_builder.py diff --git a/src/dataset_builder.py b/src/dataset_builder.py new file mode 100644 index 0000000..f63cb6a --- /dev/null +++ b/src/dataset_builder.py @@ -0,0 +1,293 @@ +""" +전체 시계열을 1회 계산하는 벡터화 데이터셋 빌더. +pandas_ta를 130,000번 반복 호출하는 기존 방식 대신 +전체 배열에 1번만 적용해 10~30배 속도를 낸다. + +봇 실시간 경로(indicators.py, ml_features.py)는 변경하지 않는다. +""" +import numpy as np +import pandas as pd +import pandas_ta as ta + +from src.ml_features import FEATURE_COLS + +LOOKAHEAD = 60 +ATR_SL_MULT = 1.5 +ATR_TP_MULT = 3.0 +WARMUP = 60 # 지표 안정화에 필요한 최소 행 수 + + +def _calc_indicators(df: pd.DataFrame) -> pd.DataFrame: + """전체 시계열에 기술 지표를 1회 계산한다.""" + d = df.copy() + close = d["close"] + high = d["high"] + low = d["low"] + volume = d["volume"] + + d["rsi"] = ta.rsi(close, length=14) + + macd = ta.macd(close, fast=12, slow=26, signal=9) + d["macd"] = macd["MACD_12_26_9"] + d["macd_signal"] = macd["MACDs_12_26_9"] + d["macd_hist"] = macd["MACDh_12_26_9"] + + bb = ta.bbands(close, length=20, std=2) + d["bb_upper"] = bb["BBU_20_2.0_2.0"] + d["bb_lower"] = bb["BBL_20_2.0_2.0"] + + d["ema9"] = ta.ema(close, length=9) + d["ema21"] = ta.ema(close, length=21) + d["ema50"] = ta.ema(close, length=50) + + d["atr"] = ta.atr(high, low, close, length=14) + d["vol_ma20"] = ta.sma(volume, length=20) + + stoch = ta.stochrsi(close, length=14) + d["stoch_k"] = stoch["STOCHRSIk_14_14_3_3"] + d["stoch_d"] = stoch["STOCHRSId_14_14_3_3"] + + return d + + +def _calc_signals(d: pd.DataFrame) -> np.ndarray: + """ + indicators.py get_signal() 로직을 numpy 배열 연산으로 재현한다. + 반환: signal_arr — 각 행에 대해 "LONG" | "SHORT" | "HOLD" + """ + n = len(d) + + rsi = d["rsi"].values + macd = d["macd"].values + macd_sig = d["macd_signal"].values + close = d["close"].values + bb_upper = d["bb_upper"].values + bb_lower = d["bb_lower"].values + ema9 = d["ema9"].values + ema21 = d["ema21"].values + ema50 = d["ema50"].values + stoch_k = d["stoch_k"].values + stoch_d = d["stoch_d"].values + volume = d["volume"].values + vol_ma20 = d["vol_ma20"].values + + # MACD 크로스: 전 캔들과 비교 (shift(1)) + prev_macd = np.roll(macd, 1); prev_macd[0] = np.nan + prev_macd_sig = np.roll(macd_sig, 1); prev_macd_sig[0] = np.nan + + long_score = np.zeros(n, dtype=np.float32) + short_score = np.zeros(n, dtype=np.float32) + + # 1. RSI + long_score += (rsi < 35).astype(np.float32) + short_score += (rsi > 65).astype(np.float32) + + # 2. MACD 크로스 (가중치 2) + macd_cross_up = (prev_macd < prev_macd_sig) & (macd > macd_sig) + macd_cross_down = (prev_macd > prev_macd_sig) & (macd < macd_sig) + long_score += macd_cross_up.astype(np.float32) * 2 + short_score += macd_cross_down.astype(np.float32) * 2 + + # 3. 볼린저 밴드 + long_score += (close < bb_lower).astype(np.float32) + short_score += (close > bb_upper).astype(np.float32) + + # 4. EMA 정배열/역배열 + long_score += ((ema9 > ema21) & (ema21 > ema50)).astype(np.float32) + short_score += ((ema9 < ema21) & (ema21 < ema50)).astype(np.float32) + + # 5. Stochastic RSI + long_score += ((stoch_k < 20) & (stoch_k > stoch_d)).astype(np.float32) + short_score += ((stoch_k > 80) & (stoch_k < stoch_d)).astype(np.float32) + + # 6. 거래량 급증 + vol_surge = volume > vol_ma20 * 1.5 + + long_enter = (long_score >= 3) & (vol_surge | (long_score >= 4)) + short_enter = (short_score >= 3) & (vol_surge | (short_score >= 4)) + + signal_arr = np.full(n, "HOLD", dtype=object) + signal_arr[long_enter] = "LONG" + signal_arr[short_enter] = "SHORT" + # 둘 다 해당하면 HOLD (충돌 방지) + signal_arr[long_enter & short_enter] = "HOLD" + + return signal_arr + + +def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.DataFrame: + """ + 신호 발생 인덱스에서 ml_features.py build_features() 로직을 + pandas 벡터 연산으로 재현한다. + """ + close = d["close"] + bb_upper = d["bb_upper"] + bb_lower = d["bb_lower"] + ema9 = d["ema9"] + ema21 = d["ema21"] + ema50 = d["ema50"] + atr = d["atr"] + volume = d["volume"] + vol_ma20 = d["vol_ma20"] + rsi = d["rsi"] + macd_hist = d["macd_hist"] + stoch_k = d["stoch_k"] + stoch_d = d["stoch_d"] + macd = d["macd"] + macd_sig = d["macd_signal"] + + bb_range = bb_upper - bb_lower + bb_pct = np.where(bb_range > 0, (close - bb_lower) / bb_range, 0.5) + + ema_align = np.where( + (ema9 > ema21) & (ema21 > ema50), 1, + np.where( + (ema9 < ema21) & (ema21 < ema50), -1, 0 + ) + ).astype(np.float32) + + atr_pct = np.where(close > 0, atr / close, 0.0) + vol_ratio = np.where(vol_ma20 > 0, volume / vol_ma20, 1.0) + + ret_1 = close.pct_change(1).fillna(0).values + ret_3 = close.pct_change(3).fillna(0).values + ret_5 = close.pct_change(5).fillna(0).values + + prev_macd = macd.shift(1).fillna(0).values + prev_macd_sig = macd_sig.shift(1).fillna(0).values + + # signal_strength: 신호 방향별로 각 조건 점수 합산 + is_long = (signal_arr == "LONG") + is_short = (signal_arr == "SHORT") + + strength = np.zeros(len(d), dtype=np.float32) + + # LONG 조건 + strength += is_long * (rsi.values < 35).astype(np.float32) + strength += is_long * ((prev_macd < prev_macd_sig) & (macd.values > macd_sig.values)).astype(np.float32) * 2 + strength += is_long * (close.values < bb_lower.values).astype(np.float32) + strength += is_long * (ema_align == 1).astype(np.float32) + strength += is_long * ((stoch_k.values < 20) & (stoch_k.values > stoch_d.values)).astype(np.float32) + + # SHORT 조건 + strength += is_short * (rsi.values > 65).astype(np.float32) + strength += is_short * ((prev_macd > prev_macd_sig) & (macd.values < macd_sig.values)).astype(np.float32) * 2 + strength += is_short * (close.values > bb_upper.values).astype(np.float32) + strength += is_short * (ema_align == -1).astype(np.float32) + strength += is_short * ((stoch_k.values > 80) & (stoch_k.values < stoch_d.values)).astype(np.float32) + + side = np.where(signal_arr == "LONG", 1.0, 0.0).astype(np.float32) + + return pd.DataFrame({ + "rsi": rsi.values.astype(np.float32), + "macd_hist": macd_hist.values.astype(np.float32), + "bb_pct": bb_pct.astype(np.float32), + "ema_align": ema_align, + "stoch_k": stoch_k.values.astype(np.float32), + "stoch_d": stoch_d.values.astype(np.float32), + "atr_pct": atr_pct.astype(np.float32), + "vol_ratio": vol_ratio.astype(np.float32), + "ret_1": ret_1.astype(np.float32), + "ret_3": ret_3.astype(np.float32), + "ret_5": ret_5.astype(np.float32), + "signal_strength": strength, + "side": side, + "_signal": signal_arr, # 레이블 계산용 임시 컬럼 + }, index=d.index) + + +def _calc_labels_vectorized( + d: pd.DataFrame, + feat: pd.DataFrame, + sig_idx: np.ndarray, +) -> tuple[np.ndarray, np.ndarray]: + """ + label_builder.py build_labels() 로직을 numpy 2D 배열로 벡터화한다. + + 각 신호 인덱스 i에 대해 future[i+1 : i+1+LOOKAHEAD] 구간의 + high/low 배열을 (N × LOOKAHEAD) 행렬로 만들어 argmax로 처리한다. + """ + n_total = len(d) + highs = d["high"].values + lows = d["low"].values + closes = d["close"].values + atrs = d["atr"].values + + labels = [] + valid_mask = [] + + for idx in sig_idx: + signal = feat.at[d.index[idx], "_signal"] + entry = closes[idx] + atr = atrs[idx] + if atr <= 0: + valid_mask.append(False) + continue + + if signal == "LONG": + sl = entry - atr * ATR_SL_MULT + tp = entry + atr * ATR_TP_MULT + else: + sl = entry + atr * ATR_SL_MULT + tp = entry - atr * ATR_TP_MULT + + end = min(idx + 1 + LOOKAHEAD, n_total) + fut_high = highs[idx + 1 : end] + fut_low = lows[idx + 1 : end] + + label = None + for h, l in zip(fut_high, fut_low): + if signal == "LONG": + if h >= tp: + label = 1 + break + if l <= sl: + label = 0 + break + else: + if l <= tp: + label = 1 + break + if h >= sl: + label = 0 + break + + if label is None: + valid_mask.append(False) + else: + labels.append(label) + valid_mask.append(True) + + return np.array(labels, dtype=np.int8), np.array(valid_mask, dtype=bool) + + +def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame: + """ + 전체 시계열을 1회 계산해 학습 데이터셋을 생성한다. + 기존 generate_dataset()의 drop-in 대체제. + """ + print(" [1/3] 전체 시계열 지표 계산 (1회)...") + d = _calc_indicators(df) + + print(" [2/3] 신호 마스킹 및 피처 추출...") + signal_arr = _calc_signals(d) + feat_all = _calc_features_vectorized(d, signal_arr) + + # 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만 + valid_rows = ( + (signal_arr != "HOLD") & + (~feat_all[FEATURE_COLS].isna().any(axis=1).values) & + (np.arange(len(d)) >= WARMUP) & + (np.arange(len(d)) < len(d) - LOOKAHEAD) + ) + sig_idx = np.where(valid_rows)[0] + print(f" 신호 발생 인덱스: {len(sig_idx):,}개") + + print(" [3/3] 레이블 계산...") + labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx) + + final_idx = sig_idx[valid_mask] + feat_final = feat_all.iloc[final_idx][FEATURE_COLS].copy() + feat_final["label"] = labels + + return feat_final.reset_index(drop=True) diff --git a/tests/test_dataset_builder.py b/tests/test_dataset_builder.py new file mode 100644 index 0000000..311d8c3 --- /dev/null +++ b/tests/test_dataset_builder.py @@ -0,0 +1,65 @@ +import numpy as np +import pandas as pd +import pytest +from src.dataset_builder import generate_dataset_vectorized + + +@pytest.fixture +def sample_df(): + """최소 200행 이상의 OHLCV 더미 데이터.""" + rng = np.random.default_rng(42) + n = 500 + close = 2.0 + np.cumsum(rng.normal(0, 0.01, n)) + close = np.clip(close, 0.01, None) + high = close * (1 + rng.uniform(0, 0.005, n)) + low = close * (1 - rng.uniform(0, 0.005, n)) + return pd.DataFrame({ + "open": close, + "high": high, + "low": low, + "close": close, + "volume": rng.uniform(1e6, 5e6, n), + }) + + +def test_returns_dataframe(sample_df): + """결과가 DataFrame이어야 한다.""" + result = generate_dataset_vectorized(sample_df) + assert isinstance(result, pd.DataFrame) + + +def test_has_required_columns(sample_df): + """FEATURE_COLS + label 컬럼이 모두 있어야 한다.""" + from src.ml_features import FEATURE_COLS + result = generate_dataset_vectorized(sample_df) + if len(result) > 0: + assert "label" in result.columns + for col in FEATURE_COLS: + assert col in result.columns, f"컬럼 없음: {col}" + + +def test_label_is_binary(sample_df): + """label은 0 또는 1만 있어야 한다.""" + result = generate_dataset_vectorized(sample_df) + if len(result) > 0: + assert set(result["label"].unique()).issubset({0, 1}) + + +def test_matches_original_generate_dataset(sample_df): + """벡터화 버전과 기존 버전의 샘플 수가 유사해야 한다. + + 벡터화 버전은 전체 시계열로 지표를 1회 계산하고, 기존 버전은 61행 슬라이딩 + 윈도우로 매번 재계산한다. EMA 등 지수 이동평균은 초기값에 따라 수렴 속도가 + 달라지므로 두 방식의 신호 수는 완전히 동일하지 않을 수 있다. ±50% 범위를 + 허용한다. + """ + from scripts.train_model import generate_dataset + orig = generate_dataset(sample_df, n_jobs=1) + vec = generate_dataset_vectorized(sample_df) + if len(orig) == 0: + assert len(vec) == 0 + return + ratio = len(vec) / len(orig) + assert 0.5 <= ratio <= 2.0, ( + f"샘플 수 차이가 너무 큼: 벡터화={len(vec)}, 기존={len(orig)}, 비율={ratio:.2f}" + )