- 인벤토리 캐시 기능을 추가하여, RCON으로 인벤토리를 읽지 못할 경우 이전에 성공적으로 읽은 데이터를 활용 - Lua에서 JSON 인코딩을 위한 간단한 함수 추가, 일부 Factorio 버전에서 `game.table_to_json`이 없을 경우 대체 - `README.md`에 인벤토리 캐시 및 JSON 인코더 사용에 대한 설명 추가 - `scan_resources()`와 `mine_resource`의 반경을 확장하여 자원 탐색 실패를 줄임
299 lines
11 KiB
Python
299 lines
11 KiB
Python
"""
|
|
context_compressor.py — 계층적 컨텍스트 압축
|
|
|
|
핵심 변경:
|
|
- f-string 제거 또는 최소화 (Lua 중괄호 충돌 방지)
|
|
- position+radius 방식 사용
|
|
- pcall + try/except 완전 감싸기
|
|
- get_contents() 대신 인덱스 접근 (Factorio 2.0 호환)
|
|
"""
|
|
import json
|
|
import math
|
|
from factorio_rcon import FactorioRCON
|
|
|
|
ZONE_SIZE = 64
|
|
|
|
P = 'local p = game.players[1] if not p then rcon.print("{}") return end '
|
|
|
|
# Factorio 버전에 따라 game.table_to_json 이 없을 수 있어,
|
|
# Lua 내부에서 간단한 JSON 인코더를 제공한다.
|
|
JSON_HELPER_LUA = r"""
|
|
local function json_escape(s)
|
|
s = tostring(s)
|
|
s = s:gsub("\\", "\\\\")
|
|
s = s:gsub('"', '\\"')
|
|
s = s:gsub('\b', '\\b')
|
|
s = s:gsub('\f', '\\f')
|
|
s = s:gsub('\n', '\\n')
|
|
s = s:gsub('\r', '\\r')
|
|
s = s:gsub('\t', '\\t')
|
|
return s
|
|
end
|
|
|
|
local function json_encode_value(v)
|
|
local t = type(v)
|
|
if t == "string" then
|
|
return '"' .. json_escape(v) .. '"'
|
|
elseif t == "number" then
|
|
return tostring(v)
|
|
elseif t == "boolean" then
|
|
return v and "true" or "false"
|
|
elseif t == "table" then
|
|
-- object encoding (배열은 별도 함수에서 처리)
|
|
local parts = {}
|
|
for k, val in pairs(v) do
|
|
local key = '"' .. json_escape(k) .. '"'
|
|
parts[#parts + 1] = key .. ":" .. json_encode_value(val)
|
|
end
|
|
return "{" .. table.concat(parts, ",") .. "}"
|
|
else
|
|
return "null"
|
|
end
|
|
end
|
|
|
|
local function json_encode_object(t)
|
|
return json_encode_value(t)
|
|
end
|
|
|
|
local function json_encode_array(arr)
|
|
local parts = {}
|
|
local n = #arr
|
|
for i = 1, n do
|
|
parts[#parts + 1] = json_encode_value(arr[i])
|
|
end
|
|
return "[" .. table.concat(parts, ",") .. "]"
|
|
end
|
|
"""
|
|
|
|
|
|
class ContextCompressor:
|
|
def __init__(self, rcon: FactorioRCON):
|
|
self.rcon = rcon
|
|
|
|
def get_compressed_state(self, detail_level: int = 1) -> str:
|
|
global_summary = self._get_global_summary()
|
|
problems = self._detect_problems()
|
|
player = self._get_player_info()
|
|
|
|
if detail_level == 0:
|
|
return self._format_global(global_summary, problems, player)
|
|
|
|
zones = self._get_zone_summaries()
|
|
|
|
if detail_level == 1:
|
|
return self._format_level1(global_summary, zones, problems, player)
|
|
|
|
drilldown = self._drilldown_problem_zones(zones, problems)
|
|
return self._format_level2(global_summary, zones, problems, drilldown, player)
|
|
|
|
def _get_global_summary(self) -> dict:
|
|
lua = P + JSON_HELPER_LUA + """
|
|
local ok, err = pcall(function()
|
|
local surface = p.surface
|
|
local force = p.force
|
|
local result = {}
|
|
local total = 0
|
|
local all = surface.find_entities_filtered{position = p.position, radius = 500, force = "player"}
|
|
for _, e in ipairs(all) do
|
|
if e.name ~= "character" then
|
|
result[e.name] = (result[e.name] or 0) + 1
|
|
total = total + 1
|
|
end
|
|
end
|
|
local current_tech = "none"
|
|
local tech_progress = 0
|
|
if force.current_research then
|
|
current_tech = force.current_research.name
|
|
tech_progress = math.floor(force.research_progress * 100)
|
|
end
|
|
local researched = 0
|
|
for _, t in pairs(force.technologies) do
|
|
if t.researched then researched = researched + 1 end
|
|
end
|
|
rcon.print(json_encode_object({
|
|
total_entities = total,
|
|
counts = result,
|
|
current_research = current_tech,
|
|
research_progress = tech_progress,
|
|
researched_count = researched,
|
|
evolution = math.floor(force.evolution_factor * 100)
|
|
}))
|
|
end)
|
|
if not ok then rcon.print("{}") end
|
|
"""
|
|
try:
|
|
raw = self.rcon.lua(lua)
|
|
return json.loads(raw) if raw and raw.startswith("{") else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
def _get_zone_summaries(self) -> list[dict]:
|
|
lua = P + JSON_HELPER_LUA + """
|
|
local ok, err = pcall(function()
|
|
local surface = p.surface
|
|
local Z = 64
|
|
local zones = {}
|
|
local all = surface.find_entities_filtered{position = p.position, radius = 512, force = "player"}
|
|
for _, e in ipairs(all) do
|
|
local kx = math.floor(e.position.x / Z)
|
|
local ky = math.floor(e.position.y / Z)
|
|
local key = kx .. "," .. ky
|
|
if not zones[key] then
|
|
zones[key] = {key=key, x=kx*Z, y=ky*Z, entities=0, miners=0, furnaces=0, assemblers=0, belts=0, inserters=0, poles=0, turrets=0, labs=0, pumps=0}
|
|
end
|
|
local z = zones[key]
|
|
z.entities = z.entities + 1
|
|
local t = e.type
|
|
if t == "mining-drill" then z.miners = z.miners + 1
|
|
elseif t == "furnace" then z.furnaces = z.furnaces + 1
|
|
elseif t == "assembling-machine" then z.assemblers = z.assemblers + 1
|
|
elseif t == "transport-belt" then z.belts = z.belts + 1
|
|
elseif t == "inserter" then z.inserters = z.inserters + 1
|
|
elseif t == "electric-pole" then z.poles = z.poles + 1
|
|
elseif t == "ammo-turret" or t == "electric-turret" then z.turrets = z.turrets + 1
|
|
elseif e.name == "lab" then z.labs = z.labs + 1
|
|
elseif t == "offshore-pump" then z.pumps = z.pumps + 1
|
|
end
|
|
end
|
|
local result = {}
|
|
for _, z in pairs(zones) do
|
|
if z.entities > 2 then result[#result+1] = z end
|
|
end
|
|
rcon.print(json_encode_array(result))
|
|
end)
|
|
if not ok then rcon.print("[]") end
|
|
"""
|
|
try:
|
|
raw = self.rcon.lua(lua)
|
|
return json.loads(raw) if raw and raw.startswith("[") else []
|
|
except Exception:
|
|
return []
|
|
|
|
def _detect_problems(self) -> list[str]:
|
|
lua = P + JSON_HELPER_LUA + """
|
|
local ok, err = pcall(function()
|
|
local surface = p.surface
|
|
local problems = {}
|
|
local burners = surface.find_entities_filtered{position = p.position, radius = 300, type = {"mining-drill", "furnace"}}
|
|
local no_fuel = 0
|
|
for _, e in ipairs(burners) do
|
|
if e.burner then
|
|
local fi = e.burner.inventory
|
|
if fi and fi.is_empty() then no_fuel = no_fuel + 1 end
|
|
end
|
|
end
|
|
if no_fuel > 0 then problems[#problems+1] = "연료 부족: " .. no_fuel .. "개" end
|
|
local drills = surface.find_entities_filtered{position = p.position, radius = 300, type = "mining-drill"}
|
|
local depleting = 0
|
|
for _, d in ipairs(drills) do
|
|
local ore = surface.find_entities_filtered{position = d.position, radius = 3, type = "resource"}
|
|
if #ore > 0 and ore[1].amount < 5000 then depleting = depleting + 1 end
|
|
end
|
|
if depleting > 0 then problems[#problems+1] = "자원 고갈 임박: " .. depleting .. "개" end
|
|
rcon.print(json_encode_array(problems))
|
|
end)
|
|
if not ok then rcon.print("[]") end
|
|
"""
|
|
try:
|
|
raw = self.rcon.lua(lua)
|
|
return json.loads(raw) if raw and raw.startswith("[") else []
|
|
except Exception:
|
|
return []
|
|
|
|
def _drilldown_problem_zones(self, zones: list, problems: list) -> str:
|
|
if not problems or not zones:
|
|
return ""
|
|
top = sorted(zones, key=lambda z: z.get("entities", 0), reverse=True)[:2]
|
|
lines = ["#### 주요 구역 상세"]
|
|
for z in top:
|
|
lines.append(
|
|
f" 구역({z['x']},{z['y']}): "
|
|
f"채굴기{z.get('miners',0)} 제련소{z.get('furnaces',0)} "
|
|
f"조립기{z.get('assemblers',0)} 벨트{z.get('belts',0)}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
def _get_player_info(self) -> dict:
|
|
lua = P + JSON_HELPER_LUA + """
|
|
local ok, err = pcall(function()
|
|
local inv = p.get_main_inventory()
|
|
if not inv then rcon.print("{}") return end
|
|
local inv_summary = {}
|
|
if inv.get_contents then
|
|
-- Factorio 2.0: get_contents()가 있으면 가장 안정적으로 읽음
|
|
inv_summary = inv.get_contents()
|
|
else
|
|
-- 호환 폴백
|
|
for i = 1, #inv do
|
|
local stack = inv[i]
|
|
if stack.valid_for_read then
|
|
inv_summary[stack.name] = (inv_summary[stack.name] or 0) + stack.count
|
|
end
|
|
end
|
|
end
|
|
rcon.print(json_encode_object({
|
|
x = math.floor(p.position.x),
|
|
y = math.floor(p.position.y),
|
|
inventory = inv_summary
|
|
}))
|
|
end)
|
|
if not ok then rcon.print("{}") end
|
|
"""
|
|
try:
|
|
raw = self.rcon.lua(lua)
|
|
return json.loads(raw) if raw and raw.startswith("{") else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
# ── 포맷터 ──────────────────────────────────────────────────────
|
|
|
|
def _format_global(self, g: dict, problems: list, player: dict) -> str:
|
|
lines = [
|
|
"## 공장 상태 (글로벌 요약)",
|
|
f"위치: ({player.get('x','?')}, {player.get('y','?')})",
|
|
f"전체 건물 수: {g.get('total_entities', 0)}개",
|
|
f"진화도: {g.get('evolution', 0)}%",
|
|
f"완료 연구: {g.get('researched_count', 0)}개",
|
|
f"진행 중 연구: {g.get('current_research','없음')} ({g.get('research_progress',0)}%)",
|
|
]
|
|
if problems:
|
|
lines.append("\n## 감지된 문제")
|
|
for p in problems:
|
|
lines.append(f" ! {p}")
|
|
lines += self._format_inventory(player.get("inventory", {}))
|
|
return "\n".join(lines)
|
|
|
|
def _format_level1(self, g: dict, zones: list, problems: list, player: dict) -> str:
|
|
lines = [self._format_global(g, problems, player), "\n## 구역별 요약"]
|
|
for z in sorted(zones, key=lambda x: x.get("entities", 0), reverse=True)[:8]:
|
|
role = self._guess_zone_role(z)
|
|
lines.append(
|
|
f" [{z['x']:+d},{z['y']:+d}] {role} | "
|
|
f"채굴{z.get('miners',0)} 제련{z.get('furnaces',0)} "
|
|
f"조립{z.get('assemblers',0)} 벨트{z.get('belts',0)}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
def _format_level2(self, g, zones, problems, drilldown, player) -> str:
|
|
base = self._format_level1(g, zones, problems, player)
|
|
if drilldown:
|
|
base += "\n\n" + drilldown
|
|
return base
|
|
|
|
def _format_inventory(self, inv: dict) -> list[str]:
|
|
if not inv:
|
|
return ["\n인벤토리: 비어있음"]
|
|
lines = ["\n## 인벤토리 (핵심 아이템)"]
|
|
for name, count in sorted(inv.items(), key=lambda x: -x[1])[:20]:
|
|
lines.append(f" {name}: {count}")
|
|
return lines
|
|
|
|
def _guess_zone_role(self, z: dict) -> str:
|
|
if z.get("turrets", 0) > 3: return "방어구역"
|
|
if z.get("labs", 0) > 0: return "연구실"
|
|
if z.get("pumps", 0) > 0: return "정유/전력"
|
|
if z.get("assemblers", 0) > z.get("miners", 0): return "조립라인"
|
|
if z.get("furnaces", 0) > z.get("miners", 0): return "제련구역"
|
|
if z.get("miners", 0) > 0: return "채굴구역"
|
|
return "기타"
|