feat: add vectorized dataset builder (1x pandas_ta call)
Made-with: Cursor
This commit is contained in:
293
src/dataset_builder.py
Normal file
293
src/dataset_builder.py
Normal file
@@ -0,0 +1,293 @@
|
||||
"""
|
||||
전체 시계열을 1회 계산하는 벡터화 데이터셋 빌더.
|
||||
pandas_ta를 130,000번 반복 호출하는 기존 방식 대신
|
||||
전체 배열에 1번만 적용해 10~30배 속도를 낸다.
|
||||
|
||||
봇 실시간 경로(indicators.py, ml_features.py)는 변경하지 않는다.
|
||||
"""
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pandas_ta as ta
|
||||
|
||||
from src.ml_features import FEATURE_COLS
|
||||
|
||||
LOOKAHEAD = 60
|
||||
ATR_SL_MULT = 1.5
|
||||
ATR_TP_MULT = 3.0
|
||||
WARMUP = 60 # 지표 안정화에 필요한 최소 행 수
|
||||
|
||||
|
||||
def _calc_indicators(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""전체 시계열에 기술 지표를 1회 계산한다."""
|
||||
d = df.copy()
|
||||
close = d["close"]
|
||||
high = d["high"]
|
||||
low = d["low"]
|
||||
volume = d["volume"]
|
||||
|
||||
d["rsi"] = ta.rsi(close, length=14)
|
||||
|
||||
macd = ta.macd(close, fast=12, slow=26, signal=9)
|
||||
d["macd"] = macd["MACD_12_26_9"]
|
||||
d["macd_signal"] = macd["MACDs_12_26_9"]
|
||||
d["macd_hist"] = macd["MACDh_12_26_9"]
|
||||
|
||||
bb = ta.bbands(close, length=20, std=2)
|
||||
d["bb_upper"] = bb["BBU_20_2.0_2.0"]
|
||||
d["bb_lower"] = bb["BBL_20_2.0_2.0"]
|
||||
|
||||
d["ema9"] = ta.ema(close, length=9)
|
||||
d["ema21"] = ta.ema(close, length=21)
|
||||
d["ema50"] = ta.ema(close, length=50)
|
||||
|
||||
d["atr"] = ta.atr(high, low, close, length=14)
|
||||
d["vol_ma20"] = ta.sma(volume, length=20)
|
||||
|
||||
stoch = ta.stochrsi(close, length=14)
|
||||
d["stoch_k"] = stoch["STOCHRSIk_14_14_3_3"]
|
||||
d["stoch_d"] = stoch["STOCHRSId_14_14_3_3"]
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def _calc_signals(d: pd.DataFrame) -> np.ndarray:
|
||||
"""
|
||||
indicators.py get_signal() 로직을 numpy 배열 연산으로 재현한다.
|
||||
반환: signal_arr — 각 행에 대해 "LONG" | "SHORT" | "HOLD"
|
||||
"""
|
||||
n = len(d)
|
||||
|
||||
rsi = d["rsi"].values
|
||||
macd = d["macd"].values
|
||||
macd_sig = d["macd_signal"].values
|
||||
close = d["close"].values
|
||||
bb_upper = d["bb_upper"].values
|
||||
bb_lower = d["bb_lower"].values
|
||||
ema9 = d["ema9"].values
|
||||
ema21 = d["ema21"].values
|
||||
ema50 = d["ema50"].values
|
||||
stoch_k = d["stoch_k"].values
|
||||
stoch_d = d["stoch_d"].values
|
||||
volume = d["volume"].values
|
||||
vol_ma20 = d["vol_ma20"].values
|
||||
|
||||
# MACD 크로스: 전 캔들과 비교 (shift(1))
|
||||
prev_macd = np.roll(macd, 1); prev_macd[0] = np.nan
|
||||
prev_macd_sig = np.roll(macd_sig, 1); prev_macd_sig[0] = np.nan
|
||||
|
||||
long_score = np.zeros(n, dtype=np.float32)
|
||||
short_score = np.zeros(n, dtype=np.float32)
|
||||
|
||||
# 1. RSI
|
||||
long_score += (rsi < 35).astype(np.float32)
|
||||
short_score += (rsi > 65).astype(np.float32)
|
||||
|
||||
# 2. MACD 크로스 (가중치 2)
|
||||
macd_cross_up = (prev_macd < prev_macd_sig) & (macd > macd_sig)
|
||||
macd_cross_down = (prev_macd > prev_macd_sig) & (macd < macd_sig)
|
||||
long_score += macd_cross_up.astype(np.float32) * 2
|
||||
short_score += macd_cross_down.astype(np.float32) * 2
|
||||
|
||||
# 3. 볼린저 밴드
|
||||
long_score += (close < bb_lower).astype(np.float32)
|
||||
short_score += (close > bb_upper).astype(np.float32)
|
||||
|
||||
# 4. EMA 정배열/역배열
|
||||
long_score += ((ema9 > ema21) & (ema21 > ema50)).astype(np.float32)
|
||||
short_score += ((ema9 < ema21) & (ema21 < ema50)).astype(np.float32)
|
||||
|
||||
# 5. Stochastic RSI
|
||||
long_score += ((stoch_k < 20) & (stoch_k > stoch_d)).astype(np.float32)
|
||||
short_score += ((stoch_k > 80) & (stoch_k < stoch_d)).astype(np.float32)
|
||||
|
||||
# 6. 거래량 급증
|
||||
vol_surge = volume > vol_ma20 * 1.5
|
||||
|
||||
long_enter = (long_score >= 3) & (vol_surge | (long_score >= 4))
|
||||
short_enter = (short_score >= 3) & (vol_surge | (short_score >= 4))
|
||||
|
||||
signal_arr = np.full(n, "HOLD", dtype=object)
|
||||
signal_arr[long_enter] = "LONG"
|
||||
signal_arr[short_enter] = "SHORT"
|
||||
# 둘 다 해당하면 HOLD (충돌 방지)
|
||||
signal_arr[long_enter & short_enter] = "HOLD"
|
||||
|
||||
return signal_arr
|
||||
|
||||
|
||||
def _calc_features_vectorized(d: pd.DataFrame, signal_arr: np.ndarray) -> pd.DataFrame:
|
||||
"""
|
||||
신호 발생 인덱스에서 ml_features.py build_features() 로직을
|
||||
pandas 벡터 연산으로 재현한다.
|
||||
"""
|
||||
close = d["close"]
|
||||
bb_upper = d["bb_upper"]
|
||||
bb_lower = d["bb_lower"]
|
||||
ema9 = d["ema9"]
|
||||
ema21 = d["ema21"]
|
||||
ema50 = d["ema50"]
|
||||
atr = d["atr"]
|
||||
volume = d["volume"]
|
||||
vol_ma20 = d["vol_ma20"]
|
||||
rsi = d["rsi"]
|
||||
macd_hist = d["macd_hist"]
|
||||
stoch_k = d["stoch_k"]
|
||||
stoch_d = d["stoch_d"]
|
||||
macd = d["macd"]
|
||||
macd_sig = d["macd_signal"]
|
||||
|
||||
bb_range = bb_upper - bb_lower
|
||||
bb_pct = np.where(bb_range > 0, (close - bb_lower) / bb_range, 0.5)
|
||||
|
||||
ema_align = np.where(
|
||||
(ema9 > ema21) & (ema21 > ema50), 1,
|
||||
np.where(
|
||||
(ema9 < ema21) & (ema21 < ema50), -1, 0
|
||||
)
|
||||
).astype(np.float32)
|
||||
|
||||
atr_pct = np.where(close > 0, atr / close, 0.0)
|
||||
vol_ratio = np.where(vol_ma20 > 0, volume / vol_ma20, 1.0)
|
||||
|
||||
ret_1 = close.pct_change(1).fillna(0).values
|
||||
ret_3 = close.pct_change(3).fillna(0).values
|
||||
ret_5 = close.pct_change(5).fillna(0).values
|
||||
|
||||
prev_macd = macd.shift(1).fillna(0).values
|
||||
prev_macd_sig = macd_sig.shift(1).fillna(0).values
|
||||
|
||||
# signal_strength: 신호 방향별로 각 조건 점수 합산
|
||||
is_long = (signal_arr == "LONG")
|
||||
is_short = (signal_arr == "SHORT")
|
||||
|
||||
strength = np.zeros(len(d), dtype=np.float32)
|
||||
|
||||
# LONG 조건
|
||||
strength += is_long * (rsi.values < 35).astype(np.float32)
|
||||
strength += is_long * ((prev_macd < prev_macd_sig) & (macd.values > macd_sig.values)).astype(np.float32) * 2
|
||||
strength += is_long * (close.values < bb_lower.values).astype(np.float32)
|
||||
strength += is_long * (ema_align == 1).astype(np.float32)
|
||||
strength += is_long * ((stoch_k.values < 20) & (stoch_k.values > stoch_d.values)).astype(np.float32)
|
||||
|
||||
# SHORT 조건
|
||||
strength += is_short * (rsi.values > 65).astype(np.float32)
|
||||
strength += is_short * ((prev_macd > prev_macd_sig) & (macd.values < macd_sig.values)).astype(np.float32) * 2
|
||||
strength += is_short * (close.values > bb_upper.values).astype(np.float32)
|
||||
strength += is_short * (ema_align == -1).astype(np.float32)
|
||||
strength += is_short * ((stoch_k.values > 80) & (stoch_k.values < stoch_d.values)).astype(np.float32)
|
||||
|
||||
side = np.where(signal_arr == "LONG", 1.0, 0.0).astype(np.float32)
|
||||
|
||||
return pd.DataFrame({
|
||||
"rsi": rsi.values.astype(np.float32),
|
||||
"macd_hist": macd_hist.values.astype(np.float32),
|
||||
"bb_pct": bb_pct.astype(np.float32),
|
||||
"ema_align": ema_align,
|
||||
"stoch_k": stoch_k.values.astype(np.float32),
|
||||
"stoch_d": stoch_d.values.astype(np.float32),
|
||||
"atr_pct": atr_pct.astype(np.float32),
|
||||
"vol_ratio": vol_ratio.astype(np.float32),
|
||||
"ret_1": ret_1.astype(np.float32),
|
||||
"ret_3": ret_3.astype(np.float32),
|
||||
"ret_5": ret_5.astype(np.float32),
|
||||
"signal_strength": strength,
|
||||
"side": side,
|
||||
"_signal": signal_arr, # 레이블 계산용 임시 컬럼
|
||||
}, index=d.index)
|
||||
|
||||
|
||||
def _calc_labels_vectorized(
|
||||
d: pd.DataFrame,
|
||||
feat: pd.DataFrame,
|
||||
sig_idx: np.ndarray,
|
||||
) -> tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
label_builder.py build_labels() 로직을 numpy 2D 배열로 벡터화한다.
|
||||
|
||||
각 신호 인덱스 i에 대해 future[i+1 : i+1+LOOKAHEAD] 구간의
|
||||
high/low 배열을 (N × LOOKAHEAD) 행렬로 만들어 argmax로 처리한다.
|
||||
"""
|
||||
n_total = len(d)
|
||||
highs = d["high"].values
|
||||
lows = d["low"].values
|
||||
closes = d["close"].values
|
||||
atrs = d["atr"].values
|
||||
|
||||
labels = []
|
||||
valid_mask = []
|
||||
|
||||
for idx in sig_idx:
|
||||
signal = feat.at[d.index[idx], "_signal"]
|
||||
entry = closes[idx]
|
||||
atr = atrs[idx]
|
||||
if atr <= 0:
|
||||
valid_mask.append(False)
|
||||
continue
|
||||
|
||||
if signal == "LONG":
|
||||
sl = entry - atr * ATR_SL_MULT
|
||||
tp = entry + atr * ATR_TP_MULT
|
||||
else:
|
||||
sl = entry + atr * ATR_SL_MULT
|
||||
tp = entry - atr * ATR_TP_MULT
|
||||
|
||||
end = min(idx + 1 + LOOKAHEAD, n_total)
|
||||
fut_high = highs[idx + 1 : end]
|
||||
fut_low = lows[idx + 1 : end]
|
||||
|
||||
label = None
|
||||
for h, l in zip(fut_high, fut_low):
|
||||
if signal == "LONG":
|
||||
if h >= tp:
|
||||
label = 1
|
||||
break
|
||||
if l <= sl:
|
||||
label = 0
|
||||
break
|
||||
else:
|
||||
if l <= tp:
|
||||
label = 1
|
||||
break
|
||||
if h >= sl:
|
||||
label = 0
|
||||
break
|
||||
|
||||
if label is None:
|
||||
valid_mask.append(False)
|
||||
else:
|
||||
labels.append(label)
|
||||
valid_mask.append(True)
|
||||
|
||||
return np.array(labels, dtype=np.int8), np.array(valid_mask, dtype=bool)
|
||||
|
||||
|
||||
def generate_dataset_vectorized(df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
전체 시계열을 1회 계산해 학습 데이터셋을 생성한다.
|
||||
기존 generate_dataset()의 drop-in 대체제.
|
||||
"""
|
||||
print(" [1/3] 전체 시계열 지표 계산 (1회)...")
|
||||
d = _calc_indicators(df)
|
||||
|
||||
print(" [2/3] 신호 마스킹 및 피처 추출...")
|
||||
signal_arr = _calc_signals(d)
|
||||
feat_all = _calc_features_vectorized(d, signal_arr)
|
||||
|
||||
# 신호 발생 + NaN 없음 + 미래 데이터 충분한 인덱스만
|
||||
valid_rows = (
|
||||
(signal_arr != "HOLD") &
|
||||
(~feat_all[FEATURE_COLS].isna().any(axis=1).values) &
|
||||
(np.arange(len(d)) >= WARMUP) &
|
||||
(np.arange(len(d)) < len(d) - LOOKAHEAD)
|
||||
)
|
||||
sig_idx = np.where(valid_rows)[0]
|
||||
print(f" 신호 발생 인덱스: {len(sig_idx):,}개")
|
||||
|
||||
print(" [3/3] 레이블 계산...")
|
||||
labels, valid_mask = _calc_labels_vectorized(d, feat_all, sig_idx)
|
||||
|
||||
final_idx = sig_idx[valid_mask]
|
||||
feat_final = feat_all.iloc[final_idx][FEATURE_COLS].copy()
|
||||
feat_final["label"] = labels
|
||||
|
||||
return feat_final.reset_index(drop=True)
|
||||
Reference in New Issue
Block a user