Files
factorio-ai-agent/context_compressor.py
21in7 8c90e80582 feat: 인벤토리 캐시 및 JSON 인코더 추가
- 인벤토리 캐시 기능을 추가하여, RCON으로 인벤토리를 읽지 못할 경우 이전에 성공적으로 읽은 데이터를 활용
- Lua에서 JSON 인코딩을 위한 간단한 함수 추가, 일부 Factorio 버전에서 `game.table_to_json`이 없을 경우 대체
- `README.md`에 인벤토리 캐시 및 JSON 인코더 사용에 대한 설명 추가
- `scan_resources()`와 `mine_resource`의 반경을 확장하여 자원 탐색 실패를 줄임
2026-03-25 23:03:08 +09:00

299 lines
11 KiB
Python

"""
context_compressor.py — 계층적 컨텍스트 압축
핵심 변경:
- f-string 제거 또는 최소화 (Lua 중괄호 충돌 방지)
- position+radius 방식 사용
- pcall + try/except 완전 감싸기
- get_contents() 대신 인덱스 접근 (Factorio 2.0 호환)
"""
import json
import math
from factorio_rcon import FactorioRCON
ZONE_SIZE = 64
P = 'local p = game.players[1] if not p then rcon.print("{}") return end '
# Factorio 버전에 따라 game.table_to_json 이 없을 수 있어,
# Lua 내부에서 간단한 JSON 인코더를 제공한다.
JSON_HELPER_LUA = r"""
local function json_escape(s)
s = tostring(s)
s = s:gsub("\\", "\\\\")
s = s:gsub('"', '\\"')
s = s:gsub('\b', '\\b')
s = s:gsub('\f', '\\f')
s = s:gsub('\n', '\\n')
s = s:gsub('\r', '\\r')
s = s:gsub('\t', '\\t')
return s
end
local function json_encode_value(v)
local t = type(v)
if t == "string" then
return '"' .. json_escape(v) .. '"'
elseif t == "number" then
return tostring(v)
elseif t == "boolean" then
return v and "true" or "false"
elseif t == "table" then
-- object encoding (배열은 별도 함수에서 처리)
local parts = {}
for k, val in pairs(v) do
local key = '"' .. json_escape(k) .. '"'
parts[#parts + 1] = key .. ":" .. json_encode_value(val)
end
return "{" .. table.concat(parts, ",") .. "}"
else
return "null"
end
end
local function json_encode_object(t)
return json_encode_value(t)
end
local function json_encode_array(arr)
local parts = {}
local n = #arr
for i = 1, n do
parts[#parts + 1] = json_encode_value(arr[i])
end
return "[" .. table.concat(parts, ",") .. "]"
end
"""
class ContextCompressor:
def __init__(self, rcon: FactorioRCON):
self.rcon = rcon
def get_compressed_state(self, detail_level: int = 1) -> str:
global_summary = self._get_global_summary()
problems = self._detect_problems()
player = self._get_player_info()
if detail_level == 0:
return self._format_global(global_summary, problems, player)
zones = self._get_zone_summaries()
if detail_level == 1:
return self._format_level1(global_summary, zones, problems, player)
drilldown = self._drilldown_problem_zones(zones, problems)
return self._format_level2(global_summary, zones, problems, drilldown, player)
def _get_global_summary(self) -> dict:
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local surface = p.surface
local force = p.force
local result = {}
local total = 0
local all = surface.find_entities_filtered{position = p.position, radius = 500, force = "player"}
for _, e in ipairs(all) do
if e.name ~= "character" then
result[e.name] = (result[e.name] or 0) + 1
total = total + 1
end
end
local current_tech = "none"
local tech_progress = 0
if force.current_research then
current_tech = force.current_research.name
tech_progress = math.floor(force.research_progress * 100)
end
local researched = 0
for _, t in pairs(force.technologies) do
if t.researched then researched = researched + 1 end
end
rcon.print(json_encode_object({
total_entities = total,
counts = result,
current_research = current_tech,
research_progress = tech_progress,
researched_count = researched,
evolution = math.floor(force.evolution_factor * 100)
}))
end)
if not ok then rcon.print("{}") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("{") else {}
except Exception:
return {}
def _get_zone_summaries(self) -> list[dict]:
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local surface = p.surface
local Z = 64
local zones = {}
local all = surface.find_entities_filtered{position = p.position, radius = 512, force = "player"}
for _, e in ipairs(all) do
local kx = math.floor(e.position.x / Z)
local ky = math.floor(e.position.y / Z)
local key = kx .. "," .. ky
if not zones[key] then
zones[key] = {key=key, x=kx*Z, y=ky*Z, entities=0, miners=0, furnaces=0, assemblers=0, belts=0, inserters=0, poles=0, turrets=0, labs=0, pumps=0}
end
local z = zones[key]
z.entities = z.entities + 1
local t = e.type
if t == "mining-drill" then z.miners = z.miners + 1
elseif t == "furnace" then z.furnaces = z.furnaces + 1
elseif t == "assembling-machine" then z.assemblers = z.assemblers + 1
elseif t == "transport-belt" then z.belts = z.belts + 1
elseif t == "inserter" then z.inserters = z.inserters + 1
elseif t == "electric-pole" then z.poles = z.poles + 1
elseif t == "ammo-turret" or t == "electric-turret" then z.turrets = z.turrets + 1
elseif e.name == "lab" then z.labs = z.labs + 1
elseif t == "offshore-pump" then z.pumps = z.pumps + 1
end
end
local result = {}
for _, z in pairs(zones) do
if z.entities > 2 then result[#result+1] = z end
end
rcon.print(json_encode_array(result))
end)
if not ok then rcon.print("[]") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("[") else []
except Exception:
return []
def _detect_problems(self) -> list[str]:
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local surface = p.surface
local problems = {}
local burners = surface.find_entities_filtered{position = p.position, radius = 300, type = {"mining-drill", "furnace"}}
local no_fuel = 0
for _, e in ipairs(burners) do
if e.burner then
local fi = e.burner.inventory
if fi and fi.is_empty() then no_fuel = no_fuel + 1 end
end
end
if no_fuel > 0 then problems[#problems+1] = "연료 부족: " .. no_fuel .. "" end
local drills = surface.find_entities_filtered{position = p.position, radius = 300, type = "mining-drill"}
local depleting = 0
for _, d in ipairs(drills) do
local ore = surface.find_entities_filtered{position = d.position, radius = 3, type = "resource"}
if #ore > 0 and ore[1].amount < 5000 then depleting = depleting + 1 end
end
if depleting > 0 then problems[#problems+1] = "자원 고갈 임박: " .. depleting .. "" end
rcon.print(json_encode_array(problems))
end)
if not ok then rcon.print("[]") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("[") else []
except Exception:
return []
def _drilldown_problem_zones(self, zones: list, problems: list) -> str:
if not problems or not zones:
return ""
top = sorted(zones, key=lambda z: z.get("entities", 0), reverse=True)[:2]
lines = ["#### 주요 구역 상세"]
for z in top:
lines.append(
f" 구역({z['x']},{z['y']}): "
f"채굴기{z.get('miners',0)} 제련소{z.get('furnaces',0)} "
f"조립기{z.get('assemblers',0)} 벨트{z.get('belts',0)}"
)
return "\n".join(lines)
def _get_player_info(self) -> dict:
lua = P + JSON_HELPER_LUA + """
local ok, err = pcall(function()
local inv = p.get_main_inventory()
if not inv then rcon.print("{}") return end
local inv_summary = {}
if inv.get_contents then
-- Factorio 2.0: get_contents()가 있으면 가장 안정적으로 읽음
inv_summary = inv.get_contents()
else
-- 호환 폴백
for i = 1, #inv do
local stack = inv[i]
if stack.valid_for_read then
inv_summary[stack.name] = (inv_summary[stack.name] or 0) + stack.count
end
end
end
rcon.print(json_encode_object({
x = math.floor(p.position.x),
y = math.floor(p.position.y),
inventory = inv_summary
}))
end)
if not ok then rcon.print("{}") end
"""
try:
raw = self.rcon.lua(lua)
return json.loads(raw) if raw and raw.startswith("{") else {}
except Exception:
return {}
# ── 포맷터 ──────────────────────────────────────────────────────
def _format_global(self, g: dict, problems: list, player: dict) -> str:
lines = [
"## 공장 상태 (글로벌 요약)",
f"위치: ({player.get('x','?')}, {player.get('y','?')})",
f"전체 건물 수: {g.get('total_entities', 0)}",
f"진화도: {g.get('evolution', 0)}%",
f"완료 연구: {g.get('researched_count', 0)}",
f"진행 중 연구: {g.get('current_research','없음')} ({g.get('research_progress',0)}%)",
]
if problems:
lines.append("\n## 감지된 문제")
for p in problems:
lines.append(f" ! {p}")
lines += self._format_inventory(player.get("inventory", {}))
return "\n".join(lines)
def _format_level1(self, g: dict, zones: list, problems: list, player: dict) -> str:
lines = [self._format_global(g, problems, player), "\n## 구역별 요약"]
for z in sorted(zones, key=lambda x: x.get("entities", 0), reverse=True)[:8]:
role = self._guess_zone_role(z)
lines.append(
f" [{z['x']:+d},{z['y']:+d}] {role} | "
f"채굴{z.get('miners',0)} 제련{z.get('furnaces',0)} "
f"조립{z.get('assemblers',0)} 벨트{z.get('belts',0)}"
)
return "\n".join(lines)
def _format_level2(self, g, zones, problems, drilldown, player) -> str:
base = self._format_level1(g, zones, problems, player)
if drilldown:
base += "\n\n" + drilldown
return base
def _format_inventory(self, inv: dict) -> list[str]:
if not inv:
return ["\n인벤토리: 비어있음"]
lines = ["\n## 인벤토리 (핵심 아이템)"]
for name, count in sorted(inv.items(), key=lambda x: -x[1])[:20]:
lines.append(f" {name}: {count}")
return lines
def _guess_zone_role(self, z: dict) -> str:
if z.get("turrets", 0) > 3: return "방어구역"
if z.get("labs", 0) > 0: return "연구실"
if z.get("pumps", 0) > 0: return "정유/전력"
if z.get("assemblers", 0) > z.get("miners", 0): return "조립라인"
if z.get("furnaces", 0) > z.get("miners", 0): return "제련구역"
if z.get("miners", 0) > 0: return "채굴구역"
return "기타"