""" state_reader.py RCON을 통해 팩토리오 게임 상태를 읽어오는 모듈 핵심 변경: - f-string 제거: Lua 중괄호와 Python f-string 충돌 방지 - position+radius 방식 사용 (area 중첩 중괄호 문제 해결) - 모든 Lua 코드 pcall로 감싸기 - 모든 Python 파싱 try/except 감싸기 """ import json import os from factorio_rcon import FactorioRCON from ore_patch_memory import ( load_ore_patch_memory, save_ore_patch_memory, update_ore_patch_memory, compute_distance_sq, ) from agent_last_action_memory import load_last_action_memory P = 'local p = game.players[1] if not p then rcon.print("{}") return end ' INVENTORY_CACHE_FILE = "inventory_memory.json" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Factorio 버전에 따라 game.table_to_json 이 없을 수 있어, # Lua 내부에서 간단한 JSON 인코더를 제공한다. JSON_HELPER_LUA = r""" local function json_escape(s) s = tostring(s) s = s:gsub("\\", "\\\\") s = s:gsub('"', '\\"') s = s:gsub('\b', '\\b') s = s:gsub('\f', '\\f') s = s:gsub('\n', '\\n') s = s:gsub('\r', '\\r') s = s:gsub('\t', '\\t') return s end local function json_encode_value(v) local t = type(v) if t == "string" then return '"' .. json_escape(v) .. '"' elseif t == "number" then return tostring(v) elseif t == "boolean" then return v and "true" or "false" elseif t == "table" then local parts = {} for k, val in pairs(v) do local key = '"' .. json_escape(k) .. '"' parts[#parts + 1] = key .. ":" .. json_encode_value(val) end return "{" .. table.concat(parts, ",") .. "}" else return "null" end end local function json_encode_object(t) return json_encode_value(t) end """ def _load_inventory_cache(cache_path: str) -> dict: """로컬에 저장된 마지막 인벤토리 캐시를 로드한다.""" try: if not os.path.exists(cache_path): return {} with open(cache_path, "r", encoding="utf-8") as f: raw = f.read().strip() if not raw: return {} data = json.loads(raw) return data if isinstance(data, dict) else {} except Exception: # 캐시는 진짜 동작을 막으면 안 되므로 실패해도 {} 로 폴백 return {} def _save_inventory_cache(cache_path: str, inv: dict) -> None: """성공적으로 읽은 인벤토리를 캐시에 저장한다.""" try: # 게임 아이템 목록이라도 최소한의 정합성만 보장 (숫자 값 위주) if not isinstance(inv, dict): return with open(cache_path, "w", encoding="utf-8") as f: json.dump(inv, f, ensure_ascii=False) except Exception: # 저장 실패는 비중요(다음 실행에서 다시 RCON으로 얻으려 하면 됨) pass def _try_parse_json_object(raw: str) -> tuple[dict, bool]: """ RCON 출력이 JSON으로 시작하지 않더라도(앞에 잡문/개행이 섞임 등) '{ ... }' 구간을 찾아 파싱을 시도한다. """ if not raw: return {}, False s = raw.strip() if not s: return {}, False # 1) 우선 가장 일반적인 케이스: JSON 오브젝트로 바로 시작 if s.startswith("{"): try: val = json.loads(s) return val if isinstance(val, dict) else {}, True except Exception: return {}, False # 2) '{' 위치를 찾아서 구간 파싱 시도 start = s.find("{") end = s.rfind("}") if start == -1 or end == -1 or end <= start: return {}, False candidate = s[start : end + 1] try: val = json.loads(candidate) return val if isinstance(val, dict) else {}, True except Exception: return {}, False def _count_total(inv: dict) -> int: try: if not isinstance(inv, dict): return 0 return int(sum(v for v in inv.values() if isinstance(v, (int, float)))) except Exception: return 0 class StateReader: def __init__(self, rcon: FactorioRCON): self.rcon = rcon # 현재 작업 디렉터리가 바뀌어도 동일 캐시를 찾게 보장 self.inventory_cache_path = os.path.join(BASE_DIR, INVENTORY_CACHE_FILE) def get_full_state(self) -> dict: return { "player": self.get_player_info(), "inventory": self.get_inventory(), "resources": self.scan_resources(), "buildings": self.get_buildings(), "tech": self.get_research_status(), "ore_patch_memory": load_ore_patch_memory(), "last_action_memory": load_last_action_memory(), } def get_player_info(self) -> dict: lua = P + JSON_HELPER_LUA + 'rcon.print(json_encode_object({x=math.floor(p.position.x), y=math.floor(p.position.y), health=p.character and p.character.health or 100}))' try: raw = self.rcon.lua(lua) return json.loads(raw) if raw and raw.startswith("{") else {} except Exception: return {} def get_inventory(self) -> dict: # Factorio 2.0 get_contents() 호환: pcall로 안전하게 debug_flag = "true" if os.environ.get("INV_DEBUG") == "1" else "false" lua = P + JSON_HELPER_LUA + f"local DEBUG_INV = {debug_flag}\n" + """ local ok, err = pcall(function() local inv = (p.get_main_inventory and p.get_main_inventory()) or nil local meta = { DEBUG_INV = DEBUG_INV } if not inv and p.get_inventory and defines.inventory and defines.inventory.character_main then -- 일부 환경에서는 get_main_inventory() 대신 character_main을 직접 요청해야 함 inv = p.get_inventory(defines.inventory.character_main) end if not inv then if DEBUG_INV then rcon.print(json_encode_object({ inventory = {}, debug = { _inv_error = "inv_nil", meta = meta } })) else rcon.print("{}") end return end local contents = {} meta.inv_has_get_contents = (inv.get_contents and true) or false local used_get_contents = false if inv.get_contents then -- Factorio 2.0: get_contents()가 있으면 가장 안정적으로 읽음 local c = inv.get_contents() -- 일부 환경에서 get_contents()가 nil/비테이블을 반환할 수 있음 meta.get_contents_type = type(c) if c and type(c) == "table" then contents = c used_get_contents = true local k = 0 for _ in pairs(c) do k = k + 1 end meta.get_contents_keys = k end end -- 방어적 폴백: -- get_contents()가 존재해도 빈/비정상 구조가 올 수 있어, -- out이 완전히 비면 index 기반으로 한 번 더 시도한다. if (not used_get_contents) or next(contents) == nil then contents = {} for i = 1, #inv do local stack = inv[i] if stack and stack.valid_for_read then contents[stack.name] = (contents[stack.name] or 0) + stack.count end end end -- 직렬화 실패를 줄이기 위해, out은 "아이템명(string) -> 개수(number)"만 남긴다. -- inv.get_contents()가 (1) map 형태(name->count)일 수도 있고, (2) 스택 객체 리스트 형태일 수도 있어 방어적으로 처리한다. local out = {} for name, count in pairs(contents) do if type(name) == "string" then -- map 형태: contents["iron-plate"] = 3 out[name] = tonumber(count) or 0 elseif type(count) == "table" then -- 리스트 형태: contents[1] = {name=..., count=...} (또는 유사 필드) local nm = count.name or count[1] local ct = count.count or count.amount or count[2] if nm then out[tostring(nm)] = tonumber(ct) or 0 end else -- 알 수 없는 형태는 무시 end end local ok2, json_str = pcall(function() return json_encode_object(out) end) if ok2 and json_str then if DEBUG_INV then local debug = {} local cursor = p.cursor_stack if cursor and cursor.valid_for_read then debug.cursor_name = tostring(cursor.name) debug.cursor_count = tonumber(cursor.count) or 0 else debug.cursor_name = "EMPTY" debug.cursor_count = 0 end local armor_type = (defines.inventory and defines.inventory.character_armor) or nil local armor_inv = (armor_type and p.get_inventory) and p.get_inventory(armor_type) or nil local armor_out = {} local armor_total = 0 if armor_inv and armor_inv.get_contents then local a = armor_inv.get_contents() if a and type(a) == "table" then for name, count in pairs(a) do local c = tonumber(count) or 0 armor_out[tostring(name)] = c armor_total = armor_total + c end end end debug.armor_total = armor_total debug.armor = armor_out local trash_type = (defines.inventory and defines.inventory.character_trash) or nil local trash_inv = (trash_type and p.get_inventory) and p.get_inventory(trash_type) or nil local trash_out = {} local trash_total = 0 if trash_inv and trash_inv.get_contents then local t = trash_inv.get_contents() if t and type(t) == "table" then for name, count in pairs(t) do local c = tonumber(count) or 0 trash_out[tostring(name)] = c trash_total = trash_total + c end end end debug.trash_total = trash_total debug.trash = trash_out rcon.print(json_encode_object({ inventory = out, debug = debug })) else rcon.print(json_str) end else if DEBUG_INV then rcon.print(json_encode_object({ inventory = out, debug = { _json_encode_ok2 = ok2, _json_encode_err = tostring(json_str), meta = meta } })) else rcon.print("{}") end end end) if not ok then if DEBUG_INV then rcon.print(json_encode_object({ inventory = {}, debug = { _pcall_error = tostring(err), meta = { DEBUG_INV = DEBUG_INV } } })) else rcon.print("{}") end end """ try: raw = self.rcon.lua(lua) if os.environ.get("INV_DEBUG") == "1": rs = raw if isinstance(raw, str) else str(raw) # 너무 길어지지 않게 일부만 출력 rs1 = rs.replace("\n", "\\n")[:200] print( f"[INV_DEBUG] raw_inv_resp_snip={rs1} " f"has_brace_open={'{' in rs} has_brace_close={'}' in rs}" ) inv, parsed_ok = _try_parse_json_object(raw) if os.environ.get("INV_DEBUG") == "1": print( f"[INV_DEBUG] inv_parse parsed_ok={parsed_ok} " f"inv_is_dict={isinstance(inv, dict)} " f"inv_keys={list(inv.keys())[:5] if isinstance(inv, dict) else 'na'}" ) if os.environ.get("INV_DEBUG") == "1" and parsed_ok and isinstance(inv, dict): if "inventory" in inv and "debug" in inv: debug = inv.get("debug", {}) meta = debug.get("meta", {}) cursor_name = debug.get("cursor_name", "") cursor_count = debug.get("cursor_count", 0) armor_total = debug.get("armor_total", 0) trash_total = debug.get("trash_total", 0) get_contents_type = meta.get("get_contents_type", "") get_contents_keys = meta.get("get_contents_keys", 0) print( f"[INV_DEBUG] main_items={len(inv.get('inventory',{}))} " f"cursor={cursor_name}:{cursor_count} armor_total={armor_total} trash_total={trash_total} " f"get_contents={get_contents_type}:{get_contents_keys}" ) inv = inv.get("inventory", {}) else: items = len(inv) if isinstance(inv, dict) else 0 total = _count_total(inv) print(f"[INV_DEBUG] main_parseOK | items={items} total={total} raw_snip={raw[:80]}") # 캐시 fallback 정책: # - 파싱이 성공(parsed_ok=True)했으면, inv가 {}여도 그대로 반환한다. # (캐시를 덮어쓰면 실제 진행이 반영되지 않는 반복 루프가 생긴다.) # - 파싱이 실패(parsed_ok=False)했을 때만 캐시를 사용한다. if parsed_ok: if inv: _save_inventory_cache(self.inventory_cache_path, inv) return inv cached = _load_inventory_cache(self.inventory_cache_path) if os.environ.get("INV_DEBUG") == "1": print( f"[INV_DEBUG] cache_loaded | items={len(cached)} total={_count_total(cached)} " f"path={self.inventory_cache_path}" ) return cached except Exception: # RCON 실패면 캐시로 대체 (cache도 없으면 {}) cached = _load_inventory_cache(self.inventory_cache_path) if os.environ.get("INV_DEBUG") == "1": print( f"[INV_DEBUG] cache_loaded_on_exception | items={len(cached)} total={_count_total(cached)} " f"path={self.inventory_cache_path}" ) return cached def scan_resources(self) -> dict: """position+radius 방식으로 자원 스캔 (f-string 중괄호 문제 완전 회피)""" lua = P + JSON_HELPER_LUA + """ local ok, err = pcall(function() local surface = p.surface local px, py = p.position.x, p.position.y local all_res = surface.find_entities_filtered{position = p.position, radius = 500, type = "resource"} if not all_res or #all_res == 0 then rcon.print("{}") return end local resources = {} for _, e in ipairs(all_res) do local name = e.name if not resources[name] then resources[name] = {count = 0, sx = 0, sy = 0, best_dx2 = math.huge, best_x = 0, best_y = 0} end local r = resources[name] r.count = r.count + 1 r.sx = r.sx + e.position.x r.sy = r.sy + e.position.y -- "패치 중심(평균)"이 실제 엔티티 좌표와 멀어질 수 있어, -- 플레이어 기준으로 가장 가까운 엔티티 좌표를 anchor로 함께 반환한다. local dx = e.position.x - px local dy = e.position.y - py local d2 = dx*dx + dy*dy if d2 < r.best_dx2 then r.best_dx2 = d2 r.best_x = e.position.x r.best_y = e.position.y end end local out = {} for name, r in pairs(resources) do out[name] = { count = r.count, center_x = math.floor(r.sx / r.count), center_y = math.floor(r.sy / r.count), anchor_x = r.best_x, anchor_y = r.best_y, anchor_tile_x = math.floor(r.best_x + 0.5), anchor_tile_y = math.floor(r.best_y + 0.5), } end rcon.print(json_encode_object(out)) end) if not ok then rcon.print("{}") end """ try: raw = self.rcon.lua(lua) res = json.loads(raw) if raw and raw.startswith("{") else {} # scan_resources() 결과로도 광맥 좌표를 기억한다. ore_mem = load_ore_patch_memory() changed = False if isinstance(res, dict) and res: for ore, info in res.items(): if not isinstance(info, dict): continue tx = info.get("anchor_tile_x") ty = info.get("anchor_tile_y") cnt = info.get("count") if isinstance(tx, int) and isinstance(ty, int): before_patches = ore_mem.get(ore) before_set: set[tuple[int, int]] = set() if isinstance(before_patches, list): for pp in before_patches: if not isinstance(pp, dict): continue bx = pp.get("tile_x") by = pp.get("tile_y") if isinstance(bx, int) and isinstance(by, int): before_set.add((bx, by)) ore_mem = update_ore_patch_memory( ore_mem, ore, tx, ty, count=cnt if isinstance(cnt, int) else None, ) after_patches = ore_mem.get(ore) after_set: set[tuple[int, int]] = set() if isinstance(after_patches, list): for ap in after_patches: if not isinstance(ap, dict): continue ax = ap.get("tile_x") ay = ap.get("tile_y") if isinstance(ax, int) and isinstance(ay, int): after_set.add((ax, ay)) if before_set != after_set: changed = True if changed: save_ore_patch_memory(ore_mem) return res except Exception: return {} def get_buildings(self) -> dict: """type 기반 검색 (f-string 없음, pcall 안전)""" lua = P + JSON_HELPER_LUA + """ local ok, err = pcall(function() local surface = p.surface local result = {} local all = surface.find_entities_filtered{position = p.position, radius = 300, force = "player"} for _, e in ipairs(all) do if e.name ~= "character" then result[e.name] = (result[e.name] or 0) + 1 end end rcon.print(json_encode_object(result)) end) if not ok then rcon.print("{}") end """ try: raw = self.rcon.lua(lua) return json.loads(raw) if raw and raw.startswith("{") else {} except Exception: return {} def get_research_status(self) -> dict: lua = P + JSON_HELPER_LUA + """ local ok, err = pcall(function() local force = p.force local k = 0 for name, tech in pairs(force.technologies) do if tech.researched then k = k + 1 end end rcon.print(json_encode_object({ current = force.current_research and force.current_research.name or "none", completed_count = k })) end) if not ok then rcon.print("{}") end """ try: raw = self.rcon.lua(lua) return json.loads(raw) if raw and raw.startswith("{") else {} except Exception: return {} def summarize_for_ai(self, state: dict) -> str: p = state.get("player", {}) inv = state.get("inventory", {}) res = state.get("resources", {}) bld = state.get("buildings", {}) ore_mem = state.get("ore_patch_memory", {}) or {} last_action = state.get("last_action_memory", {}) or {} lines = [ "## 현재 게임 상태", "### 플레이어", f"- 위치: ({p.get('x', '?')}, {p.get('y', '?')})", f"- 체력: {p.get('health', '?')}", "", "### 인벤토리", ] if inv: for item, count in sorted(inv.items(), key=lambda x: -x[1])[:15]: lines.append(f"- {item}: {count}개") else: lines.append("- 비어 있음") lines += ["", "### 주변 자원 패치 (반경 500타일 스캔)"] if res: px = p.get('x', 0) py = p.get('y', 0) sorted_res = sorted( res.items(), key=lambda item: ( ( item[1].get('anchor_tile_x', item[1].get('center_x', 0)) - px )**2 + ( item[1].get('anchor_tile_y', item[1].get('center_y', 0)) - py )**2 ) ) for ore, info in sorted_res: ax = info.get('anchor_tile_x', info.get('center_x', 0)) ay = info.get('anchor_tile_y', info.get('center_y', 0)) dist = int(((ax - px)**2 + (ay - py)**2)**0.5) lines.append( f"- {ore}: {info.get('count',0)}타일 " f"(앵커: {ax}, {ay}) " f"[중심: {info.get('center_x','?')}, {info.get('center_y','?')}] " f"[거리: ~{dist}타일]" ) else: lines.append("- 반경 500타일 내 자원 없음 — 더 멀리 탐색 필요") lines += ["", "### 기억된 광맥 (좌표)"] if ore_mem: px = p.get('x', 0) py = p.get('y', 0) known = [] for ore, patches in ore_mem.items(): if not isinstance(patches, list): continue for patch in patches: if not isinstance(patch, dict): continue tx = patch.get("tile_x") ty = patch.get("tile_y") if isinstance(tx, int) and isinstance(ty, int): dist = int(compute_distance_sq(px, py, tx, ty) ** 0.5) known.append((ore, patch, dist)) known.sort(key=lambda x: x[2]) for ore, info, dist in known[:10]: lines.append( f"- {ore}: ({info.get('tile_x')},{info.get('tile_y')}) " f"[거리: ~{dist}타일] count={info.get('count','?')}" ) else: lines.append("- 없음") lines += ["", "### 마지막 행동(기억)"] if last_action and isinstance(last_action, dict) and last_action.get("action"): # state_summary 크기 방지를 위해 필드만 요약 act = last_action.get("action", "") params = last_action.get("params", {}) success = last_action.get("success", None) msg = last_action.get("message", "") lines.append(f"- action={act} success={success} message={msg}") if params: try: lines.append(f"- params={json.dumps(params, ensure_ascii=False)}") except Exception: pass else: lines.append("- 없음") lines += ["", "### 건설된 건물"] if bld: for name, count in sorted(bld.items(), key=lambda x: -x[1])[:10]: lines.append(f"- {name}: {count}개") else: lines.append("- 아직 건물 없음") return "\n".join(lines)