feat: replace GLM API with local Ollama structured output, remove ~400 lines of JSON repair code
This commit is contained in:
593
ai_planner.py
593
ai_planner.py
@@ -8,21 +8,20 @@ ai_planner.py — 순수 AI 플레이 버전
|
||||
- 건설은 건설 거리 내에서만 가능 → 배치 전 move 필수
|
||||
- AI가 이 제약을 이해하고 행동 순서를 계획해야 함
|
||||
|
||||
JSON 파싱 강화:
|
||||
- GLM 응답이 잘리거나 마크다운으로 감싸져도 복구
|
||||
- 최대 2회 재시도
|
||||
LLM 백엔드: 로컬 Ollama (structured output으로 JSON 스키마 강제)
|
||||
- OLLAMA_HOST: Ollama 서버 주소 (기본값: http://192.168.50.67:11434)
|
||||
- OLLAMA_MODEL: 사용할 모델 (기본값: qwen3:14b)
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import traceback
|
||||
import ollama
|
||||
|
||||
|
||||
GLM_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions"
|
||||
GLM_MODEL = "GLM-4.5-Air"
|
||||
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "qwen3:14b")
|
||||
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://192.168.50.67:11434")
|
||||
|
||||
|
||||
SYSTEM_PROMPT = """당신은 팩토리오 게임을 순수하게 플레이하는 AI 에이전트입니다.
|
||||
@@ -56,17 +55,6 @@ state_reader가 상태 요약에 포함하는 `마지막 행동(기억)` 섹션
|
||||
- 자동화 연구팩: iron-gear-wheel + iron-plate → assembling-machine
|
||||
- 건물 배치 전 반드시: 1) 인벤토리에 아이템 있는지 2) 가까이 있는지 확인
|
||||
|
||||
## 응답 형식 — 반드시 순수 JSON만 반환, 다른 텍스트 절대 금지
|
||||
{
|
||||
"thinking": "현재 상태 분석. 인벤토리/위치/자원 확인 후 판단 (자유롭게 서술)",
|
||||
"current_goal": "지금 달성하려는 목표",
|
||||
"actions": [
|
||||
{"action": "행동유형", "params": {...}, "reason": "이 행동이 필요한 이유"},
|
||||
최대 8개
|
||||
],
|
||||
"after_this": "이 시퀀스 완료 후 다음 계획"
|
||||
}
|
||||
|
||||
## 전체 action 목록
|
||||
|
||||
### 탐색 (★ 자원 없을 때 최우선! 걸으면서 자원 스캔)
|
||||
@@ -101,6 +89,12 @@ state_reader가 상태 요약에 포함하는 `마지막 행동(기억)` 섹션
|
||||
- "place_entity" → {"name": str, "x": int, "y": int, "direction": "north|south|east|west"}
|
||||
주의: 1) 인벤토리에 아이템 필요 2) 가까이 있어야 함 (약 10칸 내)
|
||||
|
||||
### 건설 자동화 (Blueprint)
|
||||
- "build_smelting_line" → {"ore": "iron-ore|copper-ore", "x": int, "y": int, "furnace_count": int}
|
||||
★ 제련소 라인을 한 번에 배치. furnace_count개의 stone-furnace를 y축 방향으로 일렬 배치.
|
||||
★ 각 furnace에 석탄 + 광석 자동 투입. 인벤토리에 stone-furnace와 연료가 있어야 함.
|
||||
★ 예: {"action": "build_smelting_line", "params": {"ore": "iron-ore", "x": -90, "y": -70, "furnace_count": 4}}
|
||||
|
||||
### 벨트 라인 (걸어다니면서 하나씩 배치 — 시간 많이 걸림)
|
||||
- "place_belt_line" → {"from_x": int, "from_y": int, "to_x": int, "to_y": int}
|
||||
|
||||
@@ -114,71 +108,18 @@ state_reader가 상태 요약에 포함하는 `마지막 행동(기억)` 섹션
|
||||
- "start_research" → {"tech": "automation"}
|
||||
|
||||
### 대기
|
||||
- "wait" → {"seconds": int}
|
||||
|
||||
## 절대 중요: 순수 JSON만 출력하세요. ```json 같은 마크다운 블록, 설명 텍스트, 주석 없이 오직 { } 만. 또한 응답의 첫 비공백 문자는 반드시 `{` 입니다."""
|
||||
- "wait" → {"seconds": int}"""
|
||||
|
||||
|
||||
def _glm_debug_enabled() -> bool:
|
||||
v = os.environ.get("GLM_DEBUG", "").strip().lower()
|
||||
def _debug_enabled() -> bool:
|
||||
v = os.environ.get("AI_DEBUG", "").strip().lower()
|
||||
return v in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
def describe_glm_exception(exc: BaseException) -> str:
|
||||
"""
|
||||
GLM HTTP 호출 실패 시 원인 분류용 문자열.
|
||||
- URLError.reason 안의 SSL/소켓 예외, errno 등을 함께 적어
|
||||
'TimeoutError' 한 줄만으로는 알 수 없는 정보를 남긴다.
|
||||
"""
|
||||
parts: list[str] = [type(exc).__name__]
|
||||
msg = str(exc).strip()
|
||||
if msg:
|
||||
parts.append(msg[:800])
|
||||
|
||||
if isinstance(exc, urllib.error.URLError):
|
||||
r = exc.reason
|
||||
parts.append(f"URLError.reason={type(r).__name__}: {r!s}"[:500])
|
||||
if isinstance(r, OSError):
|
||||
parts.append(f"reason.errno={getattr(r, 'errno', None)!r}")
|
||||
we = getattr(r, "winerror", None)
|
||||
if we is not None:
|
||||
parts.append(f"reason.winerror={we!r}")
|
||||
inner = getattr(r, "__cause__", None) or getattr(r, "__context__", None)
|
||||
if inner is not None:
|
||||
parts.append(f"chained={type(inner).__name__}: {inner!s}"[:300])
|
||||
|
||||
if isinstance(exc, OSError) and not isinstance(exc, urllib.error.URLError):
|
||||
parts.append(f"errno={exc.errno!r}")
|
||||
if getattr(exc, "winerror", None) is not None:
|
||||
parts.append(f"winerror={exc.winerror!r}")
|
||||
|
||||
# 힌트(자주 나오는 케이스)
|
||||
low = " ".join(parts).lower()
|
||||
if "timed out" in low or "timeout" in low:
|
||||
parts.append(
|
||||
"hint=응답 수신 지연/차단 — GLM_HTTP_TIMEOUT_SECONDS 증가, "
|
||||
"서버·Docker 네트워크 egress, 프록시(HTTPS_PROXY) 확인"
|
||||
)
|
||||
if "connection refused" in low or "econnrefused" in low:
|
||||
parts.append("hint=대상 포트 닫힘 또는 잘못된 호스트(프록시/방화벽)")
|
||||
if "certificate" in low or "ssl" in low or "tls" in low:
|
||||
parts.append("hint=SSL 검증 실패 — 기업 프록시/자체 CA, 시스템 시간 오차")
|
||||
if "name or service not known" in low or "getaddrinfo" in low:
|
||||
parts.append("hint=DNS 실패 — 컨테이너/호스트 resolv.conf")
|
||||
|
||||
return " | ".join(parts)
|
||||
|
||||
|
||||
class AIPlanner:
|
||||
def __init__(self):
|
||||
self.api_key = os.environ.get("ZAI_API_KEY", "")
|
||||
if not self.api_key:
|
||||
raise ValueError("ZAI_API_KEY 환경변수를 설정하세요.")
|
||||
|
||||
self.step = 0
|
||||
self.feedback_log: list[dict] = []
|
||||
self._last_glm_finish_reason: str | None = None
|
||||
# GLM 전부 실패 시 explore 방향 순환 (동일 방향 탐색 루프 완화)
|
||||
self._fallback_explore_turn = 0
|
||||
self.long_term_goal = (
|
||||
"완전 자동화 달성: "
|
||||
@@ -190,113 +131,25 @@ class AIPlanner:
|
||||
self.step += 1
|
||||
feedback_text = self._format_feedback()
|
||||
|
||||
user_message_base = (
|
||||
user_message = (
|
||||
f"## 스텝 {self.step}\n\n"
|
||||
f"### 현재 게임 상태\n{state_summary}\n\n"
|
||||
f"{feedback_text}"
|
||||
f"### 장기 목표\n{self.long_term_goal}\n\n"
|
||||
"현재 상태를 분석하고, 장기 목표를 향해 지금 해야 할 행동 시퀀스를 계획하세요.\n"
|
||||
"⚠️ 순수 플레이입니다. 건설/채굴/삽입 전에 반드시 move로 가까이 이동하세요.\n"
|
||||
"⚠️ 제작은 재료가 있어야 합니다. 인벤토리를 확인하세요.\n"
|
||||
"반드시 JSON만 반환하세요. 마크다운 블록(```)이나 설명 텍스트 없이 순수 JSON만. 그리고 응답의 첫 비공백 문자는 반드시 `{` 입니다."
|
||||
"⚠️ 제작은 재료가 있어야 합니다. 인벤토리를 확인하세요."
|
||||
)
|
||||
|
||||
print(f"\n[GLM] 생각 중...")
|
||||
print(f"\n[AI] 생각 중... (model={OLLAMA_MODEL}, host={OLLAMA_HOST})")
|
||||
|
||||
# JSON 스키마를 위반한 경우(예: 분석 텍스트만 반환)에는 재시도 프롬프트를 강화한다.
|
||||
repair_suffix = ""
|
||||
repair_applied_once = False
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
raw = self._call_glm(user_message_base + repair_suffix, attempt=attempt)
|
||||
plan = self._parse_json(raw)
|
||||
break
|
||||
except (
|
||||
ValueError,
|
||||
json.JSONDecodeError,
|
||||
TimeoutError,
|
||||
ConnectionError,
|
||||
urllib.error.URLError,
|
||||
OSError,
|
||||
) as e:
|
||||
detail = describe_glm_exception(e)
|
||||
parse_no_brace = isinstance(e, ValueError) and "JSON 파싱 실패 ('{' 없음)" in str(e)
|
||||
|
||||
# 첫 시도에서 이미 finish_reason=length + 비JSON 텍스트면,
|
||||
# repair 재시도도 같은 패턴으로 반복되는 경우가 많아 즉시 폴백한다.
|
||||
if parse_no_brace and self._last_glm_finish_reason == "length":
|
||||
if _glm_debug_enabled():
|
||||
print("[GLM][디버그] finish_reason=length + 비JSON -> 즉시 휴리스틱 폴백")
|
||||
plan = self._fallback_plan_from_summary(
|
||||
state_summary,
|
||||
last_error=detail,
|
||||
)
|
||||
break
|
||||
|
||||
# JSON-only repair를 이미 한 번 적용했는데도 여전히 비JSON 텍스트만 오는 경우:
|
||||
# 추가 API 호출을 반복하지 말고 즉시 상태 기반 휴리스틱으로 진행한다.
|
||||
if parse_no_brace and repair_applied_once:
|
||||
if _glm_debug_enabled():
|
||||
print("[GLM][디버그] repair 후에도 비JSON 응답 -> 즉시 휴리스틱 폴백")
|
||||
plan = self._fallback_plan_from_summary(
|
||||
state_summary,
|
||||
last_error=detail,
|
||||
)
|
||||
break
|
||||
|
||||
if attempt < 2:
|
||||
if isinstance(e, ValueError) and (
|
||||
str(e).startswith("JSON 파싱 실패")
|
||||
or "JSON 파싱 실패" in str(e)
|
||||
or "actions 스키마" in str(e)
|
||||
):
|
||||
repair_suffix = (
|
||||
"\n\n[중요] JSON-only. 분석/설명/마크다운 금지.\n"
|
||||
"응답은 반드시 첫 비공백 문자가 `{` 이고, 마지막도 `}` 입니다.\n"
|
||||
"아래 JSON 스키마를 그대로(키/구조 동일) 반환하세요:\n"
|
||||
"{\"thinking\":\"\",\"current_goal\":\"\",\"actions\":[{\"action\":\"wait\",\"params\":{\"seconds\":1},\"reason\":\"repair\"}],\"after_this\":\"재시도\"}"
|
||||
)
|
||||
repair_applied_once = True
|
||||
if _glm_debug_enabled():
|
||||
print("[GLM][디버그] JSON-only repair 프롬프트 적용")
|
||||
# 429 Rate limit이면 prompt 길이를 늘리면 안 되므로 repair_suffix를 끈다.
|
||||
if isinstance(e, ConnectionError) and (
|
||||
"429" in detail or "rate limit" in detail.lower() or "Too Many Requests" in detail
|
||||
):
|
||||
repair_suffix = ""
|
||||
|
||||
print(
|
||||
f"[경고] GLM 처리 실패 (시도 {attempt+1}/3): "
|
||||
f"{type(e).__name__} 재시도..."
|
||||
)
|
||||
print(f" [GLM 원인] {detail}")
|
||||
if _glm_debug_enabled():
|
||||
traceback.print_exc()
|
||||
sleep_s = 2 + attempt * 3
|
||||
if isinstance(e, ConnectionError) and (
|
||||
"429" in detail or "rate limit" in detail.lower() or "Too Many Requests" in detail
|
||||
):
|
||||
# retry_after=...를 우선 사용하고, 없으면 기본 백오프를 적용한다.
|
||||
m = re.search(r"retry_after=([0-9]+(?:\\.[0-9]+)?)s", detail)
|
||||
ra = float(m.group(1)) if m else None
|
||||
if ra is not None:
|
||||
sleep_s = max(sleep_s, ra)
|
||||
else:
|
||||
# Rate limit은 보통 짧게 끝나지 않으므로 더 길게 기다린다.
|
||||
sleep_s = max(sleep_s, 20 + attempt * 10)
|
||||
if _glm_debug_enabled():
|
||||
print(f"[GLM][디버그] 429 백오프 대기: {sleep_s:.1f}s")
|
||||
time.sleep(sleep_s)
|
||||
continue
|
||||
print(f"[오류] GLM 처리 3회 실패. 상태 요약 기반 휴리스틱 폴백 사용.")
|
||||
print(f" [GLM 원인] {detail}")
|
||||
if _glm_debug_enabled():
|
||||
traceback.print_exc()
|
||||
plan = self._fallback_plan_from_summary(
|
||||
state_summary,
|
||||
last_error=detail,
|
||||
)
|
||||
try:
|
||||
plan = self._call_ollama(user_message)
|
||||
except Exception as e:
|
||||
print(f"[오류] Ollama 호출 실패: {e}")
|
||||
if _debug_enabled():
|
||||
traceback.print_exc()
|
||||
plan = self._fallback_plan_from_summary(state_summary, last_error=str(e))
|
||||
|
||||
thinking = plan.get("thinking", "")
|
||||
if thinking:
|
||||
@@ -306,8 +159,6 @@ class AIPlanner:
|
||||
if not isinstance(actions, list):
|
||||
actions = []
|
||||
if not actions:
|
||||
# 모델이 형식은 맞췄지만 actions가 비어 있으면 main 루프가 10초 대기 재시도만 반복한다.
|
||||
# 이런 경우도 즉시 휴리스틱 플랜으로 전환해 진행을 유지한다.
|
||||
plan = self._fallback_plan_from_summary(
|
||||
state_summary,
|
||||
last_error="LLM returned empty actions",
|
||||
@@ -324,6 +175,45 @@ class AIPlanner:
|
||||
print(f"[AI] {len(actions)}개 행동 계획됨")
|
||||
return actions
|
||||
|
||||
def _call_ollama(self, user_message: str) -> dict:
|
||||
t0 = time.perf_counter()
|
||||
client = ollama.Client(host=OLLAMA_HOST)
|
||||
response = client.chat(
|
||||
model=OLLAMA_MODEL,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_message},
|
||||
],
|
||||
format={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"thinking": {"type": "string"},
|
||||
"current_goal": {"type": "string"},
|
||||
"actions": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {"type": "string"},
|
||||
"params": {"type": "object"},
|
||||
"reason": {"type": "string"},
|
||||
},
|
||||
"required": ["action", "params"],
|
||||
},
|
||||
},
|
||||
"after_this": {"type": "string"},
|
||||
},
|
||||
"required": ["actions"],
|
||||
},
|
||||
options={"temperature": 0.3},
|
||||
)
|
||||
dt = time.perf_counter() - t0
|
||||
content = response.message.content
|
||||
print(f"[AI] 응답 수신 ({dt:.2f}s, {len(content)}자)")
|
||||
if _debug_enabled():
|
||||
print(f"[AI][디버그] raw={content[:300]}")
|
||||
return json.loads(content)
|
||||
|
||||
@staticmethod
|
||||
def _ensure_move_before_build_actions(actions: list[dict]) -> list[dict]:
|
||||
"""
|
||||
@@ -386,345 +276,6 @@ class AIPlanner:
|
||||
)
|
||||
return "\n".join(lines) + "\n\n"
|
||||
|
||||
def _call_glm(self, user_message: str, attempt: int) -> str:
|
||||
# attempt가 올라갈수록 "분석 텍스트"가 길어지면서 JSON이 잘리는 패턴이 있어
|
||||
# retry에서는 출력 길이를 줄인다.
|
||||
repair_mode = "JSON 요구사항을 위반했습니다" in user_message or "응답의 첫 비공백 문자는 반드시" in user_message
|
||||
|
||||
base_max_tokens = int(os.environ.get("GLM_MAX_TOKENS", "2000"))
|
||||
retry_max_tokens = int(os.environ.get("GLM_RETRY_MAX_TOKENS", "900"))
|
||||
repair_max_tokens = int(os.environ.get("GLM_REPAIR_MAX_TOKENS", "250"))
|
||||
|
||||
if repair_mode:
|
||||
max_tokens = repair_max_tokens
|
||||
else:
|
||||
max_tokens = base_max_tokens if attempt == 0 else min(base_max_tokens, retry_max_tokens)
|
||||
|
||||
temperature = 0.3
|
||||
if repair_mode:
|
||||
# JSON 포맷 준수율을 높이기 위해 변동성을 낮춘다.
|
||||
temperature = float(os.environ.get("GLM_REPAIR_TEMPERATURE", "0.0"))
|
||||
|
||||
payload = json.dumps({
|
||||
"model": GLM_MODEL,
|
||||
"messages": [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": user_message},
|
||||
],
|
||||
"temperature": temperature,
|
||||
"max_tokens": max_tokens,
|
||||
}).encode("utf-8")
|
||||
|
||||
prompt_chars = len(user_message)
|
||||
system_chars = len(SYSTEM_PROMPT)
|
||||
http_timeout = float(os.environ.get("GLM_HTTP_TIMEOUT_SECONDS", "120"))
|
||||
|
||||
if _glm_debug_enabled():
|
||||
print(
|
||||
f"[GLM][디버그] POST {GLM_API_URL} | "
|
||||
f"timeout={http_timeout}s | payload_bytes={len(payload)} | "
|
||||
f"model={GLM_MODEL}"
|
||||
)
|
||||
|
||||
req = urllib.request.Request(
|
||||
GLM_API_URL,
|
||||
data = payload,
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
},
|
||||
method = "POST",
|
||||
)
|
||||
try:
|
||||
t_total0 = time.perf_counter()
|
||||
t_payload0 = time.perf_counter()
|
||||
# payload 직렬화 직후(대략)부터 타임라인 측정
|
||||
_t0 = time.perf_counter()
|
||||
with urllib.request.urlopen(req, timeout=http_timeout) as resp:
|
||||
raw_text = resp.read().decode("utf-8")
|
||||
t_read_done = time.perf_counter()
|
||||
|
||||
t_json0 = time.perf_counter()
|
||||
data = json.loads(raw_text)
|
||||
if _glm_debug_enabled():
|
||||
finish_reason = data.get("choices", [{}])[0].get("finish_reason")
|
||||
print(f"[GLM][디버그] finish_reason={finish_reason!r}")
|
||||
finish_reason = data.get("choices", [{}])[0].get("finish_reason")
|
||||
self._last_glm_finish_reason = str(finish_reason) if finish_reason is not None else None
|
||||
content = self._extract_glm_assistant_text(data).strip()
|
||||
if not content and _glm_debug_enabled():
|
||||
# content가 비어있으면 아래 파서에서 원인 추적이 어려워지므로 raw 일부를 남긴다.
|
||||
finish_reason = data.get("choices", [{}])[0].get("finish_reason")
|
||||
print(
|
||||
"[경고] GLM 응답 assistant text 비어있음 | "
|
||||
f"finish_reason={finish_reason!r} | "
|
||||
f"raw_preview={raw_text[:600]!r}"
|
||||
)
|
||||
t_json_done = time.perf_counter()
|
||||
|
||||
dt_total = time.perf_counter() - t_total0
|
||||
dt_payload = t_payload0 - t_total0
|
||||
dt_read = t_read_done - _t0
|
||||
dt_json = t_json_done - t_json0
|
||||
|
||||
print(
|
||||
"[GLM] 타이밍 | "
|
||||
f"attempt {attempt+1}/3 | "
|
||||
f"total {dt_total:.2f}s | "
|
||||
f"http_read {dt_read:.2f}s | "
|
||||
f"json_parse {dt_json:.2f}s | "
|
||||
f"prompt_chars {prompt_chars} | "
|
||||
f"system_chars {system_chars} | "
|
||||
f"max_tokens {max_tokens} | "
|
||||
f"resp_chars {len(raw_text)}"
|
||||
)
|
||||
return content
|
||||
except urllib.error.HTTPError as e:
|
||||
self._last_glm_finish_reason = None
|
||||
body = ""
|
||||
try:
|
||||
body = e.read().decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
body = ""
|
||||
|
||||
retry_after: float | None = None
|
||||
try:
|
||||
# urllib.error.HTTPError는 headers를 들고 있는 경우가 많다.
|
||||
ra = None
|
||||
try:
|
||||
ra = getattr(e, "headers", None).get("Retry-After")
|
||||
except Exception:
|
||||
ra = None
|
||||
if ra:
|
||||
ra_s = str(ra).strip()
|
||||
retry_after = float(ra_s)
|
||||
except Exception:
|
||||
retry_after = None
|
||||
|
||||
raise ConnectionError(
|
||||
f"GLM API HTTP {e.code}: {body[:1200]}"
|
||||
+ (f" | retry_after={retry_after:.1f}s" if retry_after is not None else "")
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def _extract_glm_assistant_text(data: dict) -> str:
|
||||
"""
|
||||
GLM 응답에서 사용자가 기대하는 assistant 텍스트를 뽑는다.
|
||||
|
||||
관찰 케이스:
|
||||
- finish_reason='length' 인데 message.content가 ''로 오고,
|
||||
message.reasoning_content에 실제 출력(JSON)이 포함되는 패턴이 있다.
|
||||
"""
|
||||
choices = data.get("choices") if isinstance(data, dict) else None
|
||||
if not choices or not isinstance(choices, list):
|
||||
return ""
|
||||
choice0 = choices[0] if choices else {}
|
||||
if not isinstance(choice0, dict):
|
||||
return ""
|
||||
msg = choice0.get("message", {})
|
||||
if not isinstance(msg, dict):
|
||||
return ""
|
||||
|
||||
content = msg.get("content") or ""
|
||||
reasoning = msg.get("reasoning_content") or ""
|
||||
|
||||
if not isinstance(content, str):
|
||||
content = ""
|
||||
if not isinstance(reasoning, str):
|
||||
reasoning = ""
|
||||
|
||||
# Heuristic:
|
||||
# - 모델이 "JSON만 반환"을 어기면 content에 분석/설명 텍스트가 들어올 수 있음.
|
||||
# - 그 경우 reasoning_content 쪽에 실제 JSON이 들어있는 패턴을 우선 복구한다.
|
||||
def looks_like_json(s: str) -> bool:
|
||||
if not s:
|
||||
return False
|
||||
if '"actions"' in s or '"current_goal"' in s:
|
||||
return True
|
||||
# 최소 토큰 기반 (finish_reason=length에서 특히 reasoning에 JSON이 들어오는 케이스)
|
||||
return ("{" in s) or ("[" in s)
|
||||
|
||||
content_stripped = content.strip()
|
||||
reasoning_stripped = reasoning.strip()
|
||||
|
||||
if _glm_debug_enabled():
|
||||
c_preview = (content_stripped[:120] + ("..." if len(content_stripped) > 120 else ""))
|
||||
r_preview = (reasoning_stripped[:120] + ("..." if len(reasoning_stripped) > 120 else ""))
|
||||
print(
|
||||
"[GLM][디버그] extract_assistant_text | "
|
||||
f"content_len={len(content_stripped)} reasoning_len={len(reasoning_stripped)} | "
|
||||
f"content_looks_json={looks_like_json(content_stripped)} reasoning_looks_json={looks_like_json(reasoning_stripped)} | "
|
||||
f"content_preview={c_preview!r} | reasoning_preview={r_preview!r}"
|
||||
)
|
||||
|
||||
if looks_like_json(content_stripped):
|
||||
return content_stripped
|
||||
if looks_like_json(reasoning_stripped):
|
||||
return reasoning_stripped
|
||||
# 둘 다 JSON처럼 보이지 않더라도, content가 있으면 먼저 반환(기존 동작 유지)
|
||||
if content_stripped:
|
||||
return content_stripped
|
||||
return reasoning_stripped
|
||||
|
||||
def _parse_json(self, raw: str) -> dict:
|
||||
text = raw.strip()
|
||||
if "<think>" in text:
|
||||
text = text.split("</think>")[-1].strip()
|
||||
if text.startswith("```"):
|
||||
text = "\n".join(
|
||||
l for l in text.splitlines()
|
||||
if not l.strip().startswith("```")
|
||||
).strip()
|
||||
try:
|
||||
loaded = json.loads(text)
|
||||
if isinstance(loaded, dict):
|
||||
return loaded
|
||||
if isinstance(loaded, list):
|
||||
return self._plan_from_actions_array(loaded)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
if not text:
|
||||
raise ValueError("JSON 파싱 실패: raw가 비어있습니다.")
|
||||
|
||||
start = text.find("{")
|
||||
if start == -1:
|
||||
# GLM이 상단 레벨에서 JSON 배열(`[ ... ]`)로 응답하는 경우를 허용
|
||||
arr_start = text.find("[")
|
||||
if arr_start != -1:
|
||||
candidate = self._extract_balanced_json_candidate(text, arr_start)
|
||||
try:
|
||||
loaded = json.loads(candidate)
|
||||
if isinstance(loaded, list):
|
||||
return self._plan_from_actions_array(loaded)
|
||||
except json.JSONDecodeError:
|
||||
# 아래 공통 오류 메시지로 떨어져서 fallback 사용
|
||||
pass
|
||||
|
||||
first_non_ws = next((c for c in text if not c.isspace()), "")
|
||||
raise ValueError(
|
||||
"JSON 파싱 실패 ('{' 없음): first_non_ws="
|
||||
+ repr(first_non_ws)
|
||||
+ "\n"
|
||||
+ raw[:300]
|
||||
)
|
||||
# 응답 안에 { ... }가 여러 개(예: 인벤토리 하위 객체, 분석용 JSON)가 섞일 수 있다.
|
||||
# decide() 파이프라인은 "actions" 스키마를 가진 계획 객체가 필요하므로,
|
||||
# candidates 중 actions를 포함하는 것을 우선 선택한다.
|
||||
search_pos = start
|
||||
for _ in range(10):
|
||||
next_start = text.find("{", search_pos)
|
||||
if next_start == -1:
|
||||
break
|
||||
candidate = self._extract_balanced_json_candidate(text, next_start)
|
||||
if candidate:
|
||||
try:
|
||||
loaded = json.loads(candidate)
|
||||
if (
|
||||
isinstance(loaded, dict)
|
||||
and isinstance(loaded.get("actions"), list)
|
||||
):
|
||||
return loaded
|
||||
except json.JSONDecodeError:
|
||||
# candidate가 중간에 잘려 실패한 경우 다음 '{'를 시도
|
||||
pass
|
||||
search_pos = next_start + 1
|
||||
|
||||
# 그래도 actions 스키마를 찾지 못하면, 잘림 복구 로직으로 마지막 수습을 시도한다.
|
||||
partial = text[start:]
|
||||
repaired = self._repair_truncated_json(partial)
|
||||
try:
|
||||
loaded = json.loads(repaired)
|
||||
if isinstance(loaded, dict):
|
||||
return loaded
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
first_non_ws = next((c for c in text if not c.isspace()), "")
|
||||
raise ValueError(
|
||||
"JSON 파싱 실패: actions 스키마를 포함한 후보를 찾지 못함 "
|
||||
"(first_non_ws="
|
||||
+ repr(first_non_ws)
|
||||
+ ")\n"
|
||||
+ raw[:400]
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_balanced_json_candidate(text: str, start: int) -> str:
|
||||
"""
|
||||
text[start:]에서 중괄호/대괄호 균형이 맞는 첫 구간을 잘라 후보 JSON 문자열을 만든다.
|
||||
"""
|
||||
brace_depth = 0
|
||||
bracket_depth = 0
|
||||
in_string = False
|
||||
escape = False
|
||||
end = start
|
||||
|
||||
for i in range(start, len(text)):
|
||||
c = text[i]
|
||||
if escape:
|
||||
escape = False
|
||||
continue
|
||||
if c == '\\' and in_string:
|
||||
escape = True
|
||||
continue
|
||||
if c == '"' and not escape:
|
||||
in_string = not in_string
|
||||
continue
|
||||
if in_string:
|
||||
continue
|
||||
if c == '{':
|
||||
brace_depth += 1
|
||||
elif c == '}':
|
||||
brace_depth -= 1
|
||||
elif c == '[':
|
||||
bracket_depth += 1
|
||||
elif c == ']':
|
||||
bracket_depth -= 1
|
||||
|
||||
if brace_depth == 0 and bracket_depth == 0:
|
||||
if i > start:
|
||||
end = i + 1
|
||||
break
|
||||
return text[start:end]
|
||||
|
||||
@staticmethod
|
||||
def _plan_from_actions_array(loaded: list[object]) -> dict:
|
||||
"""
|
||||
GLM이 JSON 배열(`[{"action": ...}, ...]`)로 응답하는 경우를
|
||||
decide()에서 기대하는 JSON 객체 구조로 래핑한다.
|
||||
"""
|
||||
if not all(isinstance(x, dict) for x in loaded):
|
||||
raise ValueError("JSON 파싱 실패: actions 배열 원소가 객체(dict)가 아닙니다.")
|
||||
actions = loaded # type: ignore[assignment]
|
||||
if actions and "action" not in actions[0]:
|
||||
raise ValueError("JSON 파싱 실패: actions 배열이 행동 형식(action 키)을 포함하지 않습니다.")
|
||||
return {
|
||||
"thinking": "",
|
||||
"current_goal": "",
|
||||
"actions": actions,
|
||||
"after_this": "재시도",
|
||||
}
|
||||
|
||||
def _repair_truncated_json(self, text: str) -> str:
|
||||
if '"actions"' not in text:
|
||||
return '{"thinking":"응답 잘림","current_goal":"탐색","actions":[],"after_this":"재시도"}'
|
||||
last_complete = -1
|
||||
for m in re.finditer(r'"reason"\s*:\s*"[^"]*"\s*\}', text):
|
||||
last_complete = m.end()
|
||||
if last_complete > 0:
|
||||
result = text[:last_complete]
|
||||
open_brackets = result.count('[') - result.count(']')
|
||||
open_braces = result.count('{') - result.count('}')
|
||||
# JSON이 '...,' 로 끝나는 경우를 방지
|
||||
if result.rstrip().endswith(","):
|
||||
result = result.rstrip()[:-1]
|
||||
result += ']' * max(0, open_brackets)
|
||||
if '"after_this"' not in result and open_braces > 0:
|
||||
result += ',"after_this":"계속 진행"'
|
||||
result += '}' * max(0, open_braces)
|
||||
return result
|
||||
return '{"thinking":"응답 잘림","current_goal":"탐색","actions":[],"after_this":"재시도"}'
|
||||
|
||||
def set_goal(self, goal: str):
|
||||
self.long_term_goal = goal
|
||||
self.feedback_log.clear()
|
||||
@@ -732,7 +283,7 @@ class AIPlanner:
|
||||
|
||||
def _fallback_plan_from_summary(self, state_summary: str, last_error: str = "") -> dict:
|
||||
"""
|
||||
GLM 실패 시에도 상태 요약(주변 패치·기억된 광맥 좌표)이 있으면
|
||||
Ollama 실패 시에도 상태 요약(주변 패치·기억된 광맥 좌표)이 있으면
|
||||
무한 explore 루프 대신 mine_resource / move 를 선택한다.
|
||||
"""
|
||||
pos = self._parse_player_position(state_summary)
|
||||
@@ -758,28 +309,28 @@ class AIPlanner:
|
||||
|
||||
if best is not None:
|
||||
ore, ox, oy, dist = best
|
||||
move_threshold = float(os.environ.get("GLM_FALLBACK_MOVE_THRESHOLD", "200"))
|
||||
move_threshold = float(os.environ.get("FALLBACK_MOVE_THRESHOLD", "200"))
|
||||
actions: list[dict] = []
|
||||
if px is not None and py is not None and dist > move_threshold:
|
||||
actions.append({
|
||||
"action": "move",
|
||||
"params": {"x": ox, "y": oy},
|
||||
"reason": f"GLM 폴백: 광맥까지 약 {dist:.0f}타일 — 먼저 이동",
|
||||
"reason": f"폴백: 광맥까지 약 {dist:.0f}타일 — 먼저 이동",
|
||||
})
|
||||
actions.append({
|
||||
"action": "mine_resource",
|
||||
"params": {"ore": ore, "count": 35},
|
||||
"reason": "GLM 폴백: 상태에 표시된 인근 광맥 채굴",
|
||||
"reason": "폴백: 상태에 표시된 인근 광맥 채굴",
|
||||
})
|
||||
err_note = f" ({last_error})" if last_error else ""
|
||||
return {
|
||||
"thinking": (
|
||||
f"GLM API를 사용할 수 없어 상태 요약의 광맥({ore}, 앵커 {ox},{oy})으로 "
|
||||
f"Ollama를 사용할 수 없어 상태 요약의 광맥({ore}, 앵커 {ox},{oy})으로 "
|
||||
f"채굴을 시도합니다.{err_note}"
|
||||
),
|
||||
"current_goal": f"{ore} 채굴 (휴리스틱)",
|
||||
"actions": actions,
|
||||
"after_this": "GLM 복구 시 정상 계획으로 복귀",
|
||||
"after_this": "Ollama 복구 시 정상 계획으로 복귀",
|
||||
}
|
||||
|
||||
dirs = [
|
||||
@@ -791,7 +342,7 @@ class AIPlanner:
|
||||
err_note = f" ({last_error})" if last_error else ""
|
||||
return {
|
||||
"thinking": (
|
||||
f"GLM API 실패이며 상태 요약에서 광맥 좌표를 찾지 못해 "
|
||||
f"Ollama 실패이며 상태 요약에서 광맥 좌표를 찾지 못해 "
|
||||
f"{direction} 방향으로 탐색합니다.{err_note}"
|
||||
),
|
||||
"current_goal": "주변 탐색 (휴리스틱)",
|
||||
@@ -799,7 +350,7 @@ class AIPlanner:
|
||||
{
|
||||
"action": "explore",
|
||||
"params": {"direction": direction, "max_steps": 200},
|
||||
"reason": "GLM 폴백: 광맥 정보 없음 — 탐색",
|
||||
"reason": "폴백: 광맥 정보 없음 — 탐색",
|
||||
},
|
||||
],
|
||||
"after_this": "자원 발견 후 채굴",
|
||||
|
||||
@@ -1,87 +1,47 @@
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from unittest.mock import patch, MagicMock
|
||||
from ai_planner import AIPlanner
|
||||
|
||||
|
||||
class TestAIPlannerParseJson(unittest.TestCase):
|
||||
class TestAIPlannerFallback(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# AIPlanner 생성 시 ZAI_API_KEY가 필요하므로 테스트에서는 더미를 주입한다.
|
||||
os.environ.setdefault("ZAI_API_KEY", "dummy")
|
||||
self.planner = AIPlanner()
|
||||
|
||||
def test_parse_json_object(self):
|
||||
raw = (
|
||||
def test_fallback_uses_ore_anchor_from_summary(self):
|
||||
summary = "- 위치: (0, 0)\n- iron-ore: 100타일 (앵커: 50, 30)\n"
|
||||
plan = self.planner._fallback_plan_from_summary(summary)
|
||||
actions = plan["actions"]
|
||||
self.assertTrue(any(a["action"] == "mine_resource" for a in actions))
|
||||
|
||||
def test_fallback_explore_when_no_anchors(self):
|
||||
summary = "- 위치: (0, 0)\n"
|
||||
plan = self.planner._fallback_plan_from_summary(summary)
|
||||
actions = plan["actions"]
|
||||
self.assertTrue(any(a["action"] == "explore" for a in actions))
|
||||
|
||||
def test_decide_returns_actions_from_ollama(self):
|
||||
mock_response = MagicMock()
|
||||
mock_response.message.content = (
|
||||
'{"thinking":"t","current_goal":"g",'
|
||||
'"actions":[{"action":"explore","params":{"direction":"east","max_steps":1},"reason":"x"}],'
|
||||
'"after_this":"a"}'
|
||||
)
|
||||
plan = self.planner._parse_json(raw)
|
||||
self.assertEqual(plan["current_goal"], "g")
|
||||
self.assertEqual(len(plan["actions"]), 1)
|
||||
self.assertEqual(plan["actions"][0]["action"], "explore")
|
||||
with patch("ai_planner.ollama.Client") as MockClient:
|
||||
MockClient.return_value.chat.return_value = mock_response
|
||||
actions = self.planner.decide("## 스텝 1\n현재 상태: 초기")
|
||||
self.assertIsInstance(actions, list)
|
||||
self.assertTrue(len(actions) >= 1)
|
||||
self.assertEqual(actions[0]["action"], "explore")
|
||||
|
||||
def test_parse_json_array_top_level(self):
|
||||
raw = '[{"action":"explore","params":{"direction":"east","max_steps":1},"reason":"x"}]'
|
||||
plan = self.planner._parse_json(raw)
|
||||
self.assertEqual(len(plan["actions"]), 1)
|
||||
self.assertEqual(plan["actions"][0]["action"], "explore")
|
||||
self.assertIn("after_this", plan)
|
||||
def test_decide_falls_back_when_ollama_raises(self):
|
||||
summary = "- 위치: (10, 10)\n- iron-ore: 50타일 (앵커: 60, 10)\n"
|
||||
with patch("ai_planner.ollama.Client") as MockClient:
|
||||
MockClient.return_value.chat.side_effect = Exception("connection refused")
|
||||
actions = self.planner.decide(summary)
|
||||
self.assertIsInstance(actions, list)
|
||||
self.assertTrue(len(actions) >= 1)
|
||||
|
||||
def test_parse_json_array_with_code_fence(self):
|
||||
raw = (
|
||||
"```json\n"
|
||||
'[{"action":"explore","params":{"direction":"east","max_steps":1},"reason":"x"}]\n'
|
||||
"```"
|
||||
)
|
||||
plan = self.planner._parse_json(raw)
|
||||
self.assertEqual(len(plan["actions"]), 1)
|
||||
self.assertEqual(plan["actions"][0]["action"], "explore")
|
||||
|
||||
def test_extract_glm_text_prefers_content_then_reasoning(self):
|
||||
# content가 비어있고 reasoning_content에 JSON이 들어있는 케이스
|
||||
fake = {
|
||||
"choices": [
|
||||
{
|
||||
"finish_reason": "length",
|
||||
"message": {
|
||||
"content": "",
|
||||
"reasoning_content": '{"thinking":"t","current_goal":"g","actions":[],"after_this":"a"}',
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
extracted = self.planner._extract_glm_assistant_text(fake)
|
||||
self.assertIn('"current_goal":"g"', extracted)
|
||||
|
||||
def test_extract_glm_text_uses_reasoning_when_content_has_no_json(self):
|
||||
fake = {
|
||||
"choices": [
|
||||
{
|
||||
"finish_reason": "length",
|
||||
"message": {
|
||||
"content": "1. **Current State Analysis:**\n- Location: (0, 0)\n- Inventory: {...}",
|
||||
"reasoning_content": '{"thinking":"t","current_goal":"g","actions":[{"action":"explore","params":{"direction":"east","max_steps":1},"reason":"x"}],"after_this":"a"}',
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
extracted = self.planner._extract_glm_assistant_text(fake)
|
||||
self.assertIn('"current_goal":"g"', extracted)
|
||||
|
||||
def test_parse_json_selects_actions_object_when_multiple_json_objects_exist(self):
|
||||
# 분석 텍스트 안에 먼저 나오는 하위 JSON({ "foo": 1 })이 있고,
|
||||
# 뒤에 실제 계획 JSON({ "actions": [...] })이 있는 경우를 검증한다.
|
||||
raw = (
|
||||
"1. Analyze...\n"
|
||||
'{"foo": 1}\n'
|
||||
"2. Continue...\n"
|
||||
'{"thinking":"t","current_goal":"g",'
|
||||
'"actions":[{"action":"explore","params":{"direction":"east","max_steps":1},"reason":"x"}],'
|
||||
'"after_this":"a"}'
|
||||
)
|
||||
plan = self.planner._parse_json(raw)
|
||||
self.assertEqual(plan["current_goal"], "g")
|
||||
self.assertEqual(len(plan["actions"]), 1)
|
||||
self.assertEqual(plan["actions"][0]["action"], "explore")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user