Files
factorio-ai-agent/state_reader.py
21in7 25eaa7f6cd feat: 추가된 메모리 기능으로 광맥 및 마지막 행동 저장
- 에이전트가 발견한 광맥 좌표를 `ore_patch_memory.json`에 저장하여 재시작 시 활용 가능
- 마지막 실행한 행동과 결과를 `agent_last_action_memory.json`에 저장하여 다음 상태 요약에서 참고 가능
- `state_reader.py`에서 메모리 로드 및 상태 요약에 포함
- `ai_planner.py`에서 시스템 프롬프트에 기억된 광맥 및 마지막 행동 관련 가이드 추가
- `README.md`에 새로운 메모리 기능 설명 추가
2026-03-25 23:34:25 +09:00

603 lines
23 KiB
Python

"""
state_reader.py
RCON을 통해 팩토리오 게임 상태를 읽어오는 모듈
핵심 변경:
- f-string 제거: Lua 중괄호와 Python f-string 충돌 방지
- position+radius 방식 사용 (area 중첩 중괄호 문제 해결)
- 모든 Lua 코드 pcall로 감싸기
- 모든 Python 파싱 try/except 감싸기
"""
import json
import os
from factorio_rcon import FactorioRCON
from ore_patch_memory import (
load_ore_patch_memory,
save_ore_patch_memory,
update_ore_patch_memory,
compute_distance_sq,
)
from agent_last_action_memory import load_last_action_memory
P = 'local p = game.players[1] if not p then rcon.print("{}") return end '
INVENTORY_CACHE_FILE = "inventory_memory.json"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Factorio 버전에 따라 game.table_to_json 이 없을 수 있어,
# Lua 내부에서 간단한 JSON 인코더를 제공한다.
JSON_HELPER_LUA = r"""
local function json_escape(s)
s = tostring(s)
s = s:gsub("\\", "\\\\")
s = s:gsub('"', '\\"')
s = s:gsub('\b', '\\b')
s = s:gsub('\f', '\\f')
s = s:gsub('\n', '\\n')
s = s:gsub('\r', '\\r')
s = s:gsub('\t', '\\t')
return s
end
local function json_encode_value(v)
local t = type(v)
if t == "string" then
return '"' .. json_escape(v) .. '"'
elseif t == "number" then
return tostring(v)
elseif t == "boolean" then
return v and "true" or "false"
elseif t == "table" then
local parts = {}
for k, val in pairs(v) do
local key = '"' .. json_escape(k) .. '"'
parts[#parts + 1] = key .. ":" .. json_encode_value(val)
end
return "{" .. table.concat(parts, ",") .. "}"
else
return "null"
end
end
local function json_encode_object(t)
return json_encode_value(t)
end
"""
def _load_inventory_cache(cache_path: str) -> dict:
"""로컬에 저장된 마지막 인벤토리 캐시를 로드한다."""
try:
if not os.path.exists(cache_path):
return {}
with open(cache_path, "r", encoding="utf-8") as f:
raw = f.read().strip()
if not raw:
return {}
data = json.loads(raw)
return data if isinstance(data, dict) else {}
except Exception:
# 캐시는 진짜 동작을 막으면 안 되므로 실패해도 {} 로 폴백
return {}
def _save_inventory_cache(cache_path: str, inv: dict) -> None:
"""성공적으로 읽은 인벤토리를 캐시에 저장한다."""
try:
# 게임 아이템 목록이라도 최소한의 정합성만 보장 (숫자 값 위주)
if not isinstance(inv, dict):
return
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(inv, f, ensure_ascii=False)
except Exception:
# 저장 실패는 비중요(다음 실행에서 다시 RCON으로 얻으려 하면 됨)
pass
def _try_parse_json_object(raw: str) -> tuple[dict, bool]:
"""
RCON 출력이 JSON으로 시작하지 않더라도(앞에 잡문/개행이 섞임 등)
'{ ... }' 구간을 찾아 파싱을 시도한다.
"""
if not raw:
return {}, False
s = raw.strip()
if not s:
return {}, False
# 1) 우선 가장 일반적인 케이스: JSON 오브젝트로 바로 시작
if s.startswith("{"):
try:
val = json.loads(s)
return val if isinstance(val, dict) else {}, True
except Exception:
return {}, False
# 2) '{' 위치를 찾아서 구간 파싱 시도
start = s.find("{")
end = s.rfind("}")
if start == -1 or end == -1 or end <= start:
return {}, False
candidate = s[start : end + 1]
try:
val = json.loads(candidate)
return val if isinstance(val, dict) else {}, True
except Exception:
return {}, False
def _count_total(inv: dict) -> int:
try:
if not isinstance(inv, dict):
return 0
return int(sum(v for v in inv.values() if isinstance(v, (int, float))))
except Exception:
return 0
class StateReader:
def __init__(self, rcon: FactorioRCON):
self.rcon = rcon
# 현재 작업 디렉터리가 바뀌어도 동일 캐시를 찾게 보장
self.inventory_cache_path = os.path.join(BASE_DIR, INVENTORY_CACHE_FILE)
def get_full_state(self) -> dict:
return {
"player": self.get_player_info(),
"inventory": self.get_inventory(),
"resources": self.scan_resources(),
"buildings": self.get_buildings(),
"tech": self.get_research_status(),
"ore_patch_memory": load_ore_patch_memory(),
"last_action_memory": load_last_action_memory(),
}
def get_player_info(self) -> dict:
lua = P + JSON_HELPER_LUA + 'rcon.print(json_encode_object({x=math.floor(p.position.x), y=math.floor(p.position.y), health=p.character and p.character.health or 100}))'
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("{") else {}
except Exception:
return {}
def get_inventory(self) -> dict:
# Factorio 2.0 get_contents() 호환: pcall로 안전하게
debug_flag = "true" if os.environ.get("INV_DEBUG") == "1" else "false"
lua = P + JSON_HELPER_LUA + f"local DEBUG_INV = {debug_flag}\n" + """
local ok, err = pcall(function()
local inv = (p.get_main_inventory and p.get_main_inventory()) or nil
local meta = { DEBUG_INV = DEBUG_INV }
if not inv and p.get_inventory and defines.inventory and defines.inventory.character_main then
-- 일부 환경에서는 get_main_inventory() 대신 character_main을 직접 요청해야 함
inv = p.get_inventory(defines.inventory.character_main)
end
if not inv then
if DEBUG_INV then
rcon.print(json_encode_object({ inventory = {}, debug = { _inv_error = "inv_nil", meta = meta } }))
else
rcon.print("{}")
end
return
end
local contents = {}
meta.inv_has_get_contents = (inv.get_contents and true) or false
if inv.get_contents then
-- Factorio 2.0: get_contents()가 있으면 가장 안정적으로 읽음
local c = inv.get_contents()
-- 일부 환경에서 get_contents()가 nil/비테이블을 반환할 수 있음
meta.get_contents_type = type(c)
if c and type(c) == "table" then
local k = 0
for _ in pairs(c) do
k = k + 1
end
meta.get_contents_keys = k
contents = c
end
else
-- 호환용 폴백
for i = 1, #inv do
local stack = inv[i]
if stack and stack.valid_for_read then
contents[stack.name] = (contents[stack.name] or 0) + stack.count
end
end
end
-- 직렬화 실패를 줄이기 위해, out은 "아이템명(string) -> 개수(number)"만 남긴다.
-- inv.get_contents()가 (1) map 형태(name->count)일 수도 있고, (2) 스택 객체 리스트 형태일 수도 있어 방어적으로 처리한다.
local out = {}
for name, count in pairs(contents) do
if type(name) == "string" then
-- map 형태: contents["iron-plate"] = 3
out[name] = tonumber(count) or 0
elseif type(count) == "table" then
-- 리스트 형태: contents[1] = {name=..., count=...} (또는 유사 필드)
local nm = count.name or count[1]
local ct = count.count or count.amount or count[2]
if nm then
out[tostring(nm)] = tonumber(ct) or 0
end
else
-- 알 수 없는 형태는 무시
end
end
local ok2, json_str = pcall(function() return json_encode_object(out) end)
if ok2 and json_str then
if DEBUG_INV then
local debug = {}
local cursor = p.cursor_stack
if cursor and cursor.valid_for_read then
debug.cursor_name = tostring(cursor.name)
debug.cursor_count = tonumber(cursor.count) or 0
else
debug.cursor_name = "EMPTY"
debug.cursor_count = 0
end
local armor_type = (defines.inventory and defines.inventory.character_armor) or nil
local armor_inv = (armor_type and p.get_inventory) and p.get_inventory(armor_type) or nil
local armor_out = {}
local armor_total = 0
if armor_inv and armor_inv.get_contents then
local a = armor_inv.get_contents()
if a and type(a) == "table" then
for name, count in pairs(a) do
local c = tonumber(count) or 0
armor_out[tostring(name)] = c
armor_total = armor_total + c
end
end
end
debug.armor_total = armor_total
debug.armor = armor_out
local trash_type = (defines.inventory and defines.inventory.character_trash) or nil
local trash_inv = (trash_type and p.get_inventory) and p.get_inventory(trash_type) or nil
local trash_out = {}
local trash_total = 0
if trash_inv and trash_inv.get_contents then
local t = trash_inv.get_contents()
if t and type(t) == "table" then
for name, count in pairs(t) do
local c = tonumber(count) or 0
trash_out[tostring(name)] = c
trash_total = trash_total + c
end
end
end
debug.trash_total = trash_total
debug.trash = trash_out
rcon.print(json_encode_object({ inventory = out, debug = debug }))
else
rcon.print(json_str)
end
else
if DEBUG_INV then
rcon.print(json_encode_object({
inventory = out,
debug = { _json_encode_ok2 = ok2, _json_encode_err = tostring(json_str), meta = meta }
}))
else
rcon.print("{}")
end
end
end)
if not ok then
if DEBUG_INV then
rcon.print(json_encode_object({ inventory = {}, debug = { _pcall_error = tostring(err), meta = { DEBUG_INV = DEBUG_INV } } }))
else
rcon.print("{}")
end
end
"""
try:
raw = self.rcon.lua(lua)
if os.environ.get("INV_DEBUG") == "1":
rs = raw if isinstance(raw, str) else str(raw)
# 너무 길어지지 않게 일부만 출력
rs1 = rs.replace("\n", "\\n")[:200]
print(
f"[INV_DEBUG] raw_inv_resp_snip={rs1} "
f"has_brace_open={'{' in rs} has_brace_close={'}' in rs}"
)
inv, parsed_ok = _try_parse_json_object(raw)
if os.environ.get("INV_DEBUG") == "1":
print(
f"[INV_DEBUG] inv_parse parsed_ok={parsed_ok} "
f"inv_is_dict={isinstance(inv, dict)} "
f"inv_keys={list(inv.keys())[:5] if isinstance(inv, dict) else 'na'}"
)
if os.environ.get("INV_DEBUG") == "1" and parsed_ok and isinstance(inv, dict):
if "inventory" in inv and "debug" in inv:
debug = inv.get("debug", {})
meta = debug.get("meta", {})
cursor_name = debug.get("cursor_name", "")
cursor_count = debug.get("cursor_count", 0)
armor_total = debug.get("armor_total", 0)
trash_total = debug.get("trash_total", 0)
get_contents_type = meta.get("get_contents_type", "")
get_contents_keys = meta.get("get_contents_keys", 0)
print(
f"[INV_DEBUG] main_items={len(inv.get('inventory',{}))} "
f"cursor={cursor_name}:{cursor_count} armor_total={armor_total} trash_total={trash_total} "
f"get_contents={get_contents_type}:{get_contents_keys}"
)
inv = inv.get("inventory", {})
else:
items = len(inv) if isinstance(inv, dict) else 0
total = _count_total(inv)
print(f"[INV_DEBUG] main_parseOK | items={items} total={total} raw_snip={raw[:80]}")
# fallback only: RCON에서 읽어온 inv가 비어있으면 캐시 사용
# (inv가 {}인 원인이 "진짜 빈 인벤토리"든 "읽기 실패"든, 정책상 캐시 fallback을 한다)
if inv:
_save_inventory_cache(self.inventory_cache_path, inv)
return inv
cached = _load_inventory_cache(self.inventory_cache_path)
if os.environ.get("INV_DEBUG") == "1":
print(
f"[INV_DEBUG] cache_loaded | items={len(cached)} total={_count_total(cached)} "
f"path={self.inventory_cache_path}"
)
return cached
except Exception:
# RCON 실패면 캐시로 대체 (cache도 없으면 {})
cached = _load_inventory_cache(self.inventory_cache_path)
if os.environ.get("INV_DEBUG") == "1":
print(
f"[INV_DEBUG] cache_loaded_on_exception | items={len(cached)} total={_count_total(cached)} "
f"path={self.inventory_cache_path}"
)
return cached
def scan_resources(self) -> dict:
"""position+radius 방식으로 자원 스캔 (f-string 중괄호 문제 완전 회피)"""
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local surface = p.surface
local px, py = p.position.x, p.position.y
local all_res = surface.find_entities_filtered{position = p.position, radius = 500, type = "resource"}
if not all_res or #all_res == 0 then
rcon.print("{}")
return
end
local resources = {}
for _, e in ipairs(all_res) do
local name = e.name
if not resources[name] then
resources[name] = {count = 0, sx = 0, sy = 0, best_dx2 = math.huge, best_x = 0, best_y = 0}
end
local r = resources[name]
r.count = r.count + 1
r.sx = r.sx + e.position.x
r.sy = r.sy + e.position.y
-- "패치 중심(평균)"이 실제 엔티티 좌표와 멀어질 수 있어,
-- 플레이어 기준으로 가장 가까운 엔티티 좌표를 anchor로 함께 반환한다.
local dx = e.position.x - px
local dy = e.position.y - py
local d2 = dx*dx + dy*dy
if d2 < r.best_dx2 then
r.best_dx2 = d2
r.best_x = e.position.x
r.best_y = e.position.y
end
end
local out = {}
for name, r in pairs(resources) do
out[name] = {
count = r.count,
center_x = math.floor(r.sx / r.count),
center_y = math.floor(r.sy / r.count),
anchor_x = r.best_x,
anchor_y = r.best_y,
anchor_tile_x = math.floor(r.best_x + 0.5),
anchor_tile_y = math.floor(r.best_y + 0.5),
}
end
rcon.print(json_encode_object(out))
end)
if not ok then rcon.print("{}") end
"""
try:
raw = self.rcon.lua(lua)
res = json.loads(raw) if raw and raw.startswith("{") else {}
# scan_resources() 결과로도 광맥 좌표를 기억한다.
ore_mem = load_ore_patch_memory()
changed = False
if isinstance(res, dict) and res:
for ore, info in res.items():
if not isinstance(info, dict):
continue
tx = info.get("anchor_tile_x")
ty = info.get("anchor_tile_y")
cnt = info.get("count")
if isinstance(tx, int) and isinstance(ty, int):
before_patches = ore_mem.get(ore)
before_set: set[tuple[int, int]] = set()
if isinstance(before_patches, list):
for pp in before_patches:
if not isinstance(pp, dict):
continue
bx = pp.get("tile_x")
by = pp.get("tile_y")
if isinstance(bx, int) and isinstance(by, int):
before_set.add((bx, by))
ore_mem = update_ore_patch_memory(
ore_mem,
ore,
tx,
ty,
count=cnt if isinstance(cnt, int) else None,
)
after_patches = ore_mem.get(ore)
after_set: set[tuple[int, int]] = set()
if isinstance(after_patches, list):
for ap in after_patches:
if not isinstance(ap, dict):
continue
ax = ap.get("tile_x")
ay = ap.get("tile_y")
if isinstance(ax, int) and isinstance(ay, int):
after_set.add((ax, ay))
if before_set != after_set:
changed = True
if changed:
save_ore_patch_memory(ore_mem)
return res
except Exception:
return {}
def get_buildings(self) -> dict:
"""type 기반 검색 (f-string 없음, pcall 안전)"""
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local surface = p.surface
local result = {}
local all = surface.find_entities_filtered{position = p.position, radius = 300, force = "player"}
for _, e in ipairs(all) do
if e.name ~= "character" then
result[e.name] = (result[e.name] or 0) + 1
end
end
rcon.print(json_encode_object(result))
end)
if not ok then rcon.print("{}") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("{") else {}
except Exception:
return {}
def get_research_status(self) -> dict:
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local force = p.force
local k = 0
for name, tech in pairs(force.technologies) do
if tech.researched then k = k + 1 end
end
rcon.print(json_encode_object({
current = force.current_research and force.current_research.name or "none",
completed_count = k
}))
end)
if not ok then rcon.print("{}") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("{") else {}
except Exception:
return {}
def summarize_for_ai(self, state: dict) -> str:
p = state.get("player", {})
inv = state.get("inventory", {})
res = state.get("resources", {})
bld = state.get("buildings", {})
ore_mem = state.get("ore_patch_memory", {}) or {}
last_action = state.get("last_action_memory", {}) or {}
lines = [
"## 현재 게임 상태",
"### 플레이어",
f"- 위치: ({p.get('x', '?')}, {p.get('y', '?')})",
f"- 체력: {p.get('health', '?')}",
"",
"### 인벤토리",
]
if inv:
for item, count in sorted(inv.items(), key=lambda x: -x[1])[:15]:
lines.append(f"- {item}: {count}")
else:
lines.append("- 비어 있음")
lines += ["", "### 주변 자원 패치 (반경 500타일 스캔)"]
if res:
px = p.get('x', 0)
py = p.get('y', 0)
sorted_res = sorted(
res.items(),
key=lambda item: (
(
item[1].get('anchor_tile_x', item[1].get('center_x', 0)) - px
)**2
+ (
item[1].get('anchor_tile_y', item[1].get('center_y', 0)) - py
)**2
)
)
for ore, info in sorted_res:
ax = info.get('anchor_tile_x', info.get('center_x', 0))
ay = info.get('anchor_tile_y', info.get('center_y', 0))
dist = int(((ax - px)**2 + (ay - py)**2)**0.5)
lines.append(
f"- {ore}: {info.get('count',0)}타일 "
f"(앵커: {ax}, {ay}) "
f"[중심: {info.get('center_x','?')}, {info.get('center_y','?')}] "
f"[거리: ~{dist}타일]"
)
else:
lines.append("- 반경 500타일 내 자원 없음 — 더 멀리 탐색 필요")
lines += ["", "### 기억된 광맥 (좌표)"]
if ore_mem:
px = p.get('x', 0)
py = p.get('y', 0)
known = []
for ore, patches in ore_mem.items():
if not isinstance(patches, list):
continue
for patch in patches:
if not isinstance(patch, dict):
continue
tx = patch.get("tile_x")
ty = patch.get("tile_y")
if isinstance(tx, int) and isinstance(ty, int):
dist = int(compute_distance_sq(px, py, tx, ty) ** 0.5)
known.append((ore, patch, dist))
known.sort(key=lambda x: x[2])
for ore, info, dist in known[:10]:
lines.append(
f"- {ore}: ({info.get('tile_x')},{info.get('tile_y')}) "
f"[거리: ~{dist}타일] count={info.get('count','?')}"
)
else:
lines.append("- 없음")
lines += ["", "### 마지막 행동(기억)"]
if last_action and isinstance(last_action, dict) and last_action.get("action"):
# state_summary 크기 방지를 위해 필드만 요약
act = last_action.get("action", "")
params = last_action.get("params", {})
success = last_action.get("success", None)
msg = last_action.get("message", "")
lines.append(f"- action={act} success={success} message={msg}")
if params:
try:
lines.append(f"- params={json.dumps(params, ensure_ascii=False)}")
except Exception:
pass
else:
lines.append("- 없음")
lines += ["", "### 건설된 건물"]
if bld:
for name, count in sorted(bld.items(), key=lambda x: -x[1])[:10]:
lines.append(f"- {name}: {count}")
else:
lines.append("- 아직 건물 없음")
return "\n".join(lines)