C1: /api/reset에 API key 인증 추가 (DASHBOARD_RESET_KEY 환경변수) C2: /proc 스캐닝 제거, PID file + SIGHUP 기반 파서 재파싱으로 교체 C3: daily_pnl 업데이트를 trades 테이블에서 재계산하여 idempotent하게 변경 I1: CORS origins를 CORS_ORIGINS 환경변수로 설정 가능하게 변경 I2: offset 파라미터에 ge=0 검증 추가 I3: 매 줄 commit → 파일 단위 배치 commit으로 성능 개선 I4: _pending_candles 크기 제한으로 메모리 누적 방지 I5: bot.log glob 중복 파싱 제거 (sorted(set(...))) I6: /api/health 에러 메시지에서 내부 경로 미노출 I7: RSI 차트(데이터 없음)를 OI 변화율 차트로 교체 M1: pnlColor 변수 shadowing 수정 (posPnlColor) M2: 거래 목록에 API total 필드 사용 M3: dashboard/ui/.dockerignore 추가 M4: API Dockerfile Python 3.11→3.12 M5: 테스트 fixture에서 temp DB cleanup 추가 M6: 누락 테스트 9건 추가 (health, daily, reset 인증, offset, pagination) M7: 파서 SIGTERM graceful shutdown + entrypoint.sh signal forwarding DB: 양쪽 busy_timeout=5000 + WAL pragma 설정 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
596 lines
23 KiB
Python
596 lines
23 KiB
Python
"""
|
|
log_parser.py — 봇 로그 파일을 감시하고 파싱하여 SQLite에 저장
|
|
봇 코드 수정 없이 동작. logs/ 디렉토리만 마운트하면 됨.
|
|
|
|
실행: python log_parser.py
|
|
"""
|
|
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
import glob
|
|
import os
|
|
import json
|
|
import signal
|
|
import sys
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
|
|
# ── 설정 ──────────────────────────────────────────────────────────
|
|
LOG_DIR = os.environ.get("LOG_DIR", "/app/logs")
|
|
DB_PATH = os.environ.get("DB_PATH", "/app/data/dashboard.db")
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5")) # 초
|
|
PID_FILE = os.environ.get("PARSER_PID_FILE", "/tmp/parser.pid")
|
|
|
|
# ── 정규식 패턴 (멀티심볼 [SYMBOL] 프리픽스 포함) ─────────────────
|
|
PATTERNS = {
|
|
"signal": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 신호: (?P<signal>\w+) \|.*현재가: (?P<price>[\d.]+)"
|
|
),
|
|
|
|
"adx": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] ADX: (?P<adx>[\d.]+)"
|
|
),
|
|
|
|
"microstructure": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] OI=(?P<oi>[\d.]+), OI변화율=(?P<oi_change>[-\d.]+), 펀딩비=(?P<funding>[-\d.]+)"
|
|
),
|
|
|
|
"position_recover": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 기존 포지션 복구: (?P<direction>\w+) \| 진입가=(?P<entry_price>[\d.]+) \| 수량=(?P<qty>[\d.]+)"
|
|
),
|
|
|
|
"entry": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] (?P<direction>SHORT|LONG) 진입: "
|
|
r"가격=(?P<entry_price>[\d.]+), "
|
|
r"수량=(?P<qty>[\d.]+), "
|
|
r"SL=(?P<sl>[\d.]+), "
|
|
r"TP=(?P<tp>[\d.]+)"
|
|
r"(?:, RSI=(?P<rsi>[\d.]+))?"
|
|
r"(?:, MACD_H=(?P<macd_hist>[+\-\d.]+))?"
|
|
r"(?:, ATR=(?P<atr>[\d.]+))?"
|
|
),
|
|
|
|
"close_detect": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 청산 감지\((?P<reason>\w+)\):\s*"
|
|
r"exit=(?P<exit_price>[\d.]+),\s*"
|
|
r"rp=(?P<expected>[+\-\d.]+),\s*"
|
|
r"commission=(?P<commission>[\d.]+),\s*"
|
|
r"net_pnl=(?P<net_pnl>[+\-\d.]+)"
|
|
),
|
|
|
|
"daily_pnl": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 오늘 누적 PnL: (?P<pnl>[+\-\d.]+) USDT"
|
|
),
|
|
|
|
"position_monitor": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 포지션 모니터 \| (?P<direction>\w+) \| "
|
|
r"현재가=(?P<price>[\d.]+) \| PnL=(?P<pnl>[+\-\d.]+) USDT \((?P<pnl_pct>[+\-\d.]+)%\)"
|
|
),
|
|
|
|
"bot_start": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 봇 시작, 레버리지 (?P<leverage>\d+)x"
|
|
),
|
|
|
|
"balance": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*\[(?P<symbol>\w+)\] 기준 잔고 설정: (?P<balance>[\d.]+) USDT"
|
|
),
|
|
|
|
"ml_filter": re.compile(
|
|
r"(?P<ts>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
|
|
r".*ML 필터 로드.*임계값=(?P<threshold>[\d.]+)"
|
|
),
|
|
}
|
|
|
|
|
|
class LogParser:
|
|
def __init__(self):
|
|
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
|
|
self.conn = sqlite3.connect(DB_PATH, timeout=10)
|
|
self.conn.row_factory = sqlite3.Row
|
|
self.conn.execute("PRAGMA journal_mode=WAL")
|
|
self.conn.execute("PRAGMA busy_timeout=5000")
|
|
self._init_db()
|
|
|
|
self._file_positions = {}
|
|
self._current_positions = {} # {symbol: position_dict}
|
|
self._pending_candles = {} # {symbol: {ts_key: {data}}}
|
|
self._balance = 0
|
|
self._shutdown = False
|
|
self._dirty = False # batch commit 플래그
|
|
|
|
# PID 파일 기록
|
|
with open(PID_FILE, "w") as f:
|
|
f.write(str(os.getpid()))
|
|
|
|
# 시그널 핸들러
|
|
signal.signal(signal.SIGTERM, self._handle_sigterm)
|
|
signal.signal(signal.SIGHUP, self._handle_sighup)
|
|
|
|
def _init_db(self):
|
|
self.conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS trades (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL,
|
|
direction TEXT NOT NULL,
|
|
entry_time TEXT NOT NULL,
|
|
exit_time TEXT,
|
|
entry_price REAL NOT NULL,
|
|
exit_price REAL,
|
|
quantity REAL,
|
|
leverage INTEGER DEFAULT 10,
|
|
sl REAL,
|
|
tp REAL,
|
|
rsi REAL,
|
|
macd_hist REAL,
|
|
atr REAL,
|
|
adx REAL,
|
|
expected_pnl REAL,
|
|
actual_pnl REAL,
|
|
commission REAL,
|
|
net_pnl REAL,
|
|
status TEXT NOT NULL DEFAULT 'OPEN',
|
|
close_reason TEXT,
|
|
extra TEXT,
|
|
UNIQUE(symbol, entry_time, direction)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS candles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL,
|
|
ts TEXT NOT NULL,
|
|
price REAL NOT NULL,
|
|
signal TEXT,
|
|
adx REAL,
|
|
oi REAL,
|
|
oi_change REAL,
|
|
funding_rate REAL,
|
|
UNIQUE(symbol, ts)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS daily_pnl (
|
|
symbol TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
cumulative_pnl REAL DEFAULT 0,
|
|
trade_count INTEGER DEFAULT 0,
|
|
wins INTEGER DEFAULT 0,
|
|
losses INTEGER DEFAULT 0,
|
|
last_updated TEXT,
|
|
PRIMARY KEY(symbol, date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS bot_status (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT,
|
|
updated_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS parse_state (
|
|
filepath TEXT PRIMARY KEY,
|
|
position INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_candles_symbol_ts ON candles(symbol, ts);
|
|
CREATE INDEX IF NOT EXISTS idx_trades_status ON trades(status);
|
|
CREATE INDEX IF NOT EXISTS idx_trades_symbol ON trades(symbol);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_trades_unique
|
|
ON trades(symbol, entry_time, direction);
|
|
""")
|
|
self.conn.commit()
|
|
self._migrate_deduplicate()
|
|
self._load_state()
|
|
|
|
def _migrate_deduplicate(self):
|
|
"""기존 DB에 중복 trades가 있으면 제거 (가장 오래된 id만 유지)."""
|
|
dupes = self.conn.execute("""
|
|
SELECT symbol, entry_time, direction, MIN(id) AS keep_id, COUNT(*) AS cnt
|
|
FROM trades
|
|
GROUP BY symbol, entry_time, direction
|
|
HAVING cnt > 1
|
|
""").fetchall()
|
|
if not dupes:
|
|
return
|
|
for row in dupes:
|
|
self.conn.execute(
|
|
"DELETE FROM trades WHERE symbol=? AND entry_time=? AND direction=? AND id!=?",
|
|
(row["symbol"], row["entry_time"], row["direction"], row["keep_id"]),
|
|
)
|
|
self.conn.commit()
|
|
total = sum(r["cnt"] - 1 for r in dupes)
|
|
print(f"[LogParser] 마이그레이션: 중복 trades {total}건 제거")
|
|
|
|
def _load_state(self):
|
|
rows = self.conn.execute("SELECT filepath, position FROM parse_state").fetchall()
|
|
self._file_positions = {r["filepath"]: r["position"] for r in rows}
|
|
|
|
# 심볼별 열린 포지션 복원
|
|
open_trades = self.conn.execute(
|
|
"SELECT * FROM trades WHERE status='OPEN' ORDER BY id DESC"
|
|
).fetchall()
|
|
for row in open_trades:
|
|
sym = row["symbol"]
|
|
if sym not in self._current_positions:
|
|
self._current_positions[sym] = dict(row)
|
|
|
|
def _save_position(self, filepath, pos):
|
|
self.conn.execute(
|
|
"INSERT INTO parse_state(filepath, position) VALUES(?,?) "
|
|
"ON CONFLICT(filepath) DO UPDATE SET position=?",
|
|
(filepath, pos, pos)
|
|
)
|
|
self._dirty = True
|
|
|
|
def _handle_sigterm(self, signum, frame):
|
|
"""Graceful shutdown — DB 커넥션을 안전하게 닫음."""
|
|
print("[LogParser] SIGTERM 수신 — 종료")
|
|
self._shutdown = True
|
|
try:
|
|
if self._dirty:
|
|
self.conn.commit()
|
|
self.conn.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.unlink(PID_FILE)
|
|
except OSError:
|
|
pass
|
|
sys.exit(0)
|
|
|
|
def _handle_sighup(self, signum, frame):
|
|
"""SIGHUP → 파싱 상태 초기화, 처음부터 재파싱."""
|
|
print("[LogParser] SIGHUP 수신 — 상태 초기화, 재파싱 시작")
|
|
self._file_positions = {}
|
|
self._current_positions = {}
|
|
self._pending_candles = {}
|
|
self.conn.execute("DELETE FROM parse_state")
|
|
self.conn.commit()
|
|
|
|
def _batch_commit(self):
|
|
"""배치 커밋 — _dirty 플래그가 설정된 경우에만 커밋."""
|
|
if self._dirty:
|
|
self.conn.commit()
|
|
self._dirty = False
|
|
|
|
def _cleanup_pending_candles(self, max_per_symbol=50):
|
|
"""오래된 pending candle 데이터 정리 (I4: 메모리 누적 방지)."""
|
|
for symbol in list(self._pending_candles):
|
|
pending = self._pending_candles[symbol]
|
|
if len(pending) > max_per_symbol:
|
|
keys = sorted(pending.keys())
|
|
for k in keys[:-max_per_symbol]:
|
|
del pending[k]
|
|
|
|
def _set_status(self, key, value):
|
|
now = datetime.now().isoformat()
|
|
self.conn.execute(
|
|
"INSERT INTO bot_status(key, value, updated_at) VALUES(?,?,?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value=?, updated_at=?",
|
|
(key, str(value), now, str(value), now)
|
|
)
|
|
self._dirty = True
|
|
|
|
# ── 메인 루프 ────────────────────────────────────────────────
|
|
def run(self):
|
|
print(f"[LogParser] 시작 — LOG_DIR={LOG_DIR}, DB={DB_PATH}, 폴링={POLL_INTERVAL}s")
|
|
while not self._shutdown:
|
|
try:
|
|
self._scan_logs()
|
|
except Exception as e:
|
|
print(f"[LogParser] 에러: {e}")
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
def _scan_logs(self):
|
|
log_files = sorted(set(glob.glob(os.path.join(LOG_DIR, "bot*.log"))))
|
|
for filepath in log_files:
|
|
self._parse_file(filepath)
|
|
self._batch_commit()
|
|
self._cleanup_pending_candles()
|
|
|
|
def _parse_file(self, filepath):
|
|
last_pos = self._file_positions.get(filepath, 0)
|
|
|
|
try:
|
|
file_size = os.path.getsize(filepath)
|
|
except OSError:
|
|
return
|
|
|
|
if file_size < last_pos:
|
|
last_pos = 0
|
|
|
|
if file_size == last_pos:
|
|
return
|
|
|
|
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
|
|
f.seek(last_pos)
|
|
new_lines = f.readlines()
|
|
new_pos = f.tell()
|
|
|
|
for line in new_lines:
|
|
self._parse_line(line.strip())
|
|
|
|
self._file_positions[filepath] = new_pos
|
|
self._save_position(filepath, new_pos)
|
|
|
|
# ── 한 줄 파싱 ──────────────────────────────────────────────
|
|
def _parse_line(self, line):
|
|
if not line:
|
|
return
|
|
|
|
# 봇 시작
|
|
m = PATTERNS["bot_start"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
self._set_status(f"{symbol}:leverage", m.group("leverage"))
|
|
self._set_status(f"{symbol}:last_start", m.group("ts"))
|
|
return
|
|
|
|
# 잔고
|
|
m = PATTERNS["balance"].search(line)
|
|
if m:
|
|
self._balance = float(m.group("balance"))
|
|
self._set_status("balance", m.group("balance"))
|
|
return
|
|
|
|
# ML 필터
|
|
m = PATTERNS["ml_filter"].search(line)
|
|
if m:
|
|
self._set_status("ml_threshold", m.group("threshold"))
|
|
return
|
|
|
|
# 포지션 모니터 (5분 간격 현재가·PnL 갱신)
|
|
m = PATTERNS["position_monitor"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
self._set_status(f"{symbol}:current_price", m.group("price"))
|
|
self._set_status(f"{symbol}:unrealized_pnl", m.group("pnl"))
|
|
self._set_status(f"{symbol}:unrealized_pnl_pct", m.group("pnl_pct"))
|
|
return
|
|
|
|
# 포지션 복구 (재시작 시)
|
|
m = PATTERNS["position_recover"].search(line)
|
|
if m:
|
|
self._handle_entry(
|
|
ts=m.group("ts"),
|
|
symbol=m.group("symbol"),
|
|
direction=m.group("direction"),
|
|
entry_price=float(m.group("entry_price")),
|
|
qty=float(m.group("qty")),
|
|
is_recovery=True,
|
|
)
|
|
return
|
|
|
|
# 포지션 진입
|
|
m = PATTERNS["entry"].search(line)
|
|
if m:
|
|
self._handle_entry(
|
|
ts=m.group("ts"),
|
|
symbol=m.group("symbol"),
|
|
direction=m.group("direction"),
|
|
entry_price=float(m.group("entry_price")),
|
|
qty=float(m.group("qty")),
|
|
sl=float(m.group("sl")),
|
|
tp=float(m.group("tp")),
|
|
rsi=float(m.group("rsi")) if m.group("rsi") else None,
|
|
macd_hist=float(m.group("macd_hist")) if m.group("macd_hist") else None,
|
|
atr=float(m.group("atr")) if m.group("atr") else None,
|
|
)
|
|
return
|
|
|
|
# OI/펀딩비 (캔들 데이터에 합침)
|
|
m = PATTERNS["microstructure"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
ts_key = m.group("ts")[:16]
|
|
if symbol not in self._pending_candles:
|
|
self._pending_candles[symbol] = {}
|
|
if ts_key not in self._pending_candles[symbol]:
|
|
self._pending_candles[symbol][ts_key] = {}
|
|
self._pending_candles[symbol][ts_key].update({
|
|
"oi": float(m.group("oi")),
|
|
"oi_change": float(m.group("oi_change")),
|
|
"funding": float(m.group("funding")),
|
|
})
|
|
return
|
|
|
|
# ADX
|
|
m = PATTERNS["adx"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
ts_key = m.group("ts")[:16]
|
|
if symbol not in self._pending_candles:
|
|
self._pending_candles[symbol] = {}
|
|
if ts_key not in self._pending_candles[symbol]:
|
|
self._pending_candles[symbol][ts_key] = {}
|
|
self._pending_candles[symbol][ts_key]["adx"] = float(m.group("adx"))
|
|
return
|
|
|
|
# 신호 + 현재가 → 캔들 저장
|
|
m = PATTERNS["signal"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
ts = m.group("ts")
|
|
ts_key = ts[:16]
|
|
price = float(m.group("price"))
|
|
signal = m.group("signal")
|
|
extra = self._pending_candles.get(symbol, {}).pop(ts_key, {})
|
|
|
|
self._set_status(f"{symbol}:current_price", str(price))
|
|
self._set_status(f"{symbol}:current_signal", signal)
|
|
self._set_status(f"{symbol}:last_candle_time", ts)
|
|
|
|
try:
|
|
self.conn.execute(
|
|
"""INSERT INTO candles(symbol, ts, price, signal, adx, oi, oi_change, funding_rate)
|
|
VALUES(?,?,?,?,?,?,?,?)
|
|
ON CONFLICT(symbol, ts) DO UPDATE SET
|
|
price=?, signal=?, adx=?, oi=?, oi_change=?, funding_rate=?""",
|
|
(symbol, ts, price, signal,
|
|
extra.get("adx"), extra.get("oi"), extra.get("oi_change"), extra.get("funding"),
|
|
price, signal,
|
|
extra.get("adx"), extra.get("oi"), extra.get("oi_change"), extra.get("funding")),
|
|
)
|
|
self._dirty = True
|
|
except Exception as e:
|
|
print(f"[LogParser] 캔들 저장 에러: {e}")
|
|
return
|
|
|
|
# 청산 감지
|
|
m = PATTERNS["close_detect"].search(line)
|
|
if m:
|
|
self._handle_close(
|
|
ts=m.group("ts"),
|
|
symbol=m.group("symbol"),
|
|
exit_price=float(m.group("exit_price")),
|
|
expected_pnl=float(m.group("expected")),
|
|
commission=float(m.group("commission")),
|
|
net_pnl=float(m.group("net_pnl")),
|
|
reason=m.group("reason"),
|
|
)
|
|
return
|
|
|
|
# 일일 누적 PnL
|
|
m = PATTERNS["daily_pnl"].search(line)
|
|
if m:
|
|
symbol = m.group("symbol")
|
|
ts = m.group("ts")
|
|
day = ts[:10]
|
|
pnl = float(m.group("pnl"))
|
|
self.conn.execute(
|
|
"""INSERT INTO daily_pnl(symbol, date, cumulative_pnl, last_updated)
|
|
VALUES(?,?,?,?)
|
|
ON CONFLICT(symbol, date) DO UPDATE SET cumulative_pnl=?, last_updated=?""",
|
|
(symbol, day, pnl, ts, pnl, ts)
|
|
)
|
|
self._dirty = True
|
|
self._set_status(f"{symbol}:daily_pnl", str(pnl))
|
|
return
|
|
|
|
# ── 포지션 진입 핸들러 ───────────────────────────────────────
|
|
def _handle_entry(self, ts, symbol, direction, entry_price, qty,
|
|
leverage=None, sl=None, tp=None, is_recovery=False,
|
|
rsi=None, macd_hist=None, atr=None):
|
|
if leverage is None:
|
|
row = self.conn.execute(
|
|
"SELECT value FROM bot_status WHERE key=?",
|
|
(f"{symbol}:leverage",),
|
|
).fetchone()
|
|
leverage = int(row["value"]) if row else 10
|
|
|
|
# 중복 체크 — 같은 심볼+방향의 OPEN 포지션이 이미 있으면 스킵
|
|
current = self._current_positions.get(symbol)
|
|
if current and current.get("direction") == direction:
|
|
return
|
|
|
|
existing = self.conn.execute(
|
|
"SELECT id, entry_price FROM trades WHERE status='OPEN' AND symbol=? AND direction=?",
|
|
(symbol, direction),
|
|
).fetchone()
|
|
if existing:
|
|
self._current_positions[symbol] = {
|
|
"id": existing["id"],
|
|
"direction": direction,
|
|
"entry_price": existing["entry_price"],
|
|
"entry_time": ts,
|
|
}
|
|
return
|
|
|
|
cur = self.conn.execute(
|
|
"""INSERT OR IGNORE INTO trades(symbol, direction, entry_time, entry_price,
|
|
quantity, leverage, sl, tp, status, extra, rsi, macd_hist, atr)
|
|
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(symbol, direction, ts,
|
|
entry_price, qty, leverage, sl, tp, "OPEN",
|
|
json.dumps({"recovery": is_recovery}),
|
|
rsi, macd_hist, atr),
|
|
)
|
|
self._dirty = True
|
|
self._current_positions[symbol] = {
|
|
"id": cur.lastrowid,
|
|
"direction": direction,
|
|
"entry_price": entry_price,
|
|
"entry_time": ts,
|
|
}
|
|
self._set_status(f"{symbol}:position_status", "OPEN")
|
|
self._set_status(f"{symbol}:position_direction", direction)
|
|
self._set_status(f"{symbol}:position_entry_price", str(entry_price))
|
|
print(f"[LogParser] {symbol} 포지션 진입: {direction} @ {entry_price} (recovery={is_recovery})")
|
|
|
|
# ── 포지션 청산 핸들러 ───────────────────────────────────────
|
|
def _handle_close(self, ts, symbol, exit_price, expected_pnl, commission, net_pnl, reason):
|
|
# 해당 심볼의 OPEN 거래만 닫음
|
|
open_trades = self.conn.execute(
|
|
"SELECT id FROM trades WHERE status='OPEN' AND symbol=? ORDER BY id DESC",
|
|
(symbol,),
|
|
).fetchall()
|
|
|
|
if not open_trades:
|
|
print(f"[LogParser] 경고: {symbol} 청산 감지했으나 열린 포지션 없음")
|
|
self._current_positions.pop(symbol, None)
|
|
return
|
|
|
|
primary_id = open_trades[0]["id"]
|
|
self.conn.execute(
|
|
"""UPDATE trades SET
|
|
exit_time=?, exit_price=?, expected_pnl=?,
|
|
actual_pnl=?, commission=?, net_pnl=?,
|
|
status='CLOSED', close_reason=?
|
|
WHERE id=?""",
|
|
(ts, exit_price, expected_pnl,
|
|
expected_pnl, commission, net_pnl,
|
|
reason, primary_id)
|
|
)
|
|
|
|
self._dirty = True
|
|
|
|
if len(open_trades) > 1:
|
|
stale_ids = [r["id"] for r in open_trades[1:]]
|
|
self.conn.execute(
|
|
f"DELETE FROM trades WHERE id IN ({','.join('?' * len(stale_ids))})",
|
|
stale_ids,
|
|
)
|
|
print(f"[LogParser] {symbol} 중복 OPEN 거래 {len(stale_ids)}건 삭제")
|
|
|
|
# 심볼별 일별 요약 (trades 테이블에서 재계산 — idempotent)
|
|
day = ts[:10]
|
|
row = self.conn.execute(
|
|
"""SELECT COUNT(*) as cnt,
|
|
SUM(CASE WHEN net_pnl > 0 THEN 1 ELSE 0 END) as wins,
|
|
SUM(CASE WHEN net_pnl <= 0 THEN 1 ELSE 0 END) as losses
|
|
FROM trades WHERE status='CLOSED' AND symbol=? AND exit_time LIKE ?""",
|
|
(symbol, f"{day}%"),
|
|
).fetchone()
|
|
self.conn.execute(
|
|
"""INSERT INTO daily_pnl(symbol, date, trade_count, wins, losses, last_updated)
|
|
VALUES(?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(symbol, date) DO UPDATE SET
|
|
trade_count=?, wins=?, losses=?, last_updated=?""",
|
|
(symbol, day, row["cnt"], row["wins"], row["losses"], ts,
|
|
row["cnt"], row["wins"], row["losses"], ts),
|
|
)
|
|
self._dirty = True
|
|
|
|
self._set_status(f"{symbol}:position_status", "NONE")
|
|
print(f"[LogParser] {symbol} 포지션 청산: {reason} @ {exit_price}, PnL={net_pnl}")
|
|
self._current_positions.pop(symbol, None)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = LogParser()
|
|
try:
|
|
parser.run()
|
|
finally:
|
|
try:
|
|
os.unlink(PID_FILE)
|
|
except OSError:
|
|
pass
|