""" factorio_rcon.py 팩토리오 서버에 RCON으로 연결하고 Lua 명령을 실행하는 모듈 """ import socket import struct import time # 모든 Lua 코드 앞에 붙일 플레이어 참조 헬퍼 # RCON에서는 game.player가 nil이므로 game.players[1]을 사용 PLAYER_HELPER = """ local p = game.players[1] if not p then rcon.print("NO_PLAYER") return end if not p.character then rcon.print("NO_CHARACTER") return end """ class FactorioRCON: """팩토리오 RCON 클라이언트""" def __init__(self, host="127.0.0.1", port=25575, password="factorio_ai"): self.host = host self.port = port self.password = password self.sock = None self._req_id = 1 def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(10) self.sock.connect((self.host, self.port)) # 인증 self._send_packet(3, self.password) # type 3 = AUTH resp = self._recv_packet() if resp["id"] == -1: raise Exception("RCON 인증 실패: 비밀번호를 확인하세요") print(f"[RCON] 팩토리오 서버 연결 성공 ({self.host}:{self.port})") def disconnect(self): if self.sock: self.sock.close() self.sock = None def run(self, lua_command: str) -> str: """Lua 명령 실행 후 결과 반환 (silent-command로 콘솔 스팸 방지)""" cmd = f"/silent-command {lua_command}" req_id = self._send_packet(2, cmd) # type 2 = EXECCOMMAND result = self._recv_packet() return result.get("body", "").strip() def lua(self, code: str) -> str: """여러 줄 Lua 코드 실행""" return self.run(code) def lua_with_player(self, code: str) -> str: """플레이어 참조(p)를 자동으로 주입한 Lua 코드 실행. code 안에서 'p'로 플레이어를 참조할 수 있음.""" return self.run(PLAYER_HELPER + code) def check_player(self) -> bool: """접속한 플레이어가 있는지 확인""" result = self.run(""" local count = 0 for _ in pairs(game.players) do count = count + 1 end if count == 0 then rcon.print("NO_PLAYER") else local p = game.players[1] if p.character then rcon.print("OK:" .. p.name .. ":" .. string.format("%.0f,%.0f", p.position.x, p.position.y)) else rcon.print("NO_CHARACTER:" .. p.name) end end """) return result.startswith("OK") def _send_packet(self, ptype: int, body: str) -> int: req_id = self._req_id self._req_id += 1 encoded = body.encode("utf-8") + b"\x00" size = 4 + 4 + len(encoded) + 1 data = struct.pack(" dict: raw = self._recv_bytes(4) size = struct.unpack(" bytes: buf = b"" while len(buf) < n: chunk = self.sock.recv(n - len(buf)) if not chunk: raise Exception("RCON 연결이 끊어졌습니다") buf += chunk return buf def __enter__(self): self.connect() return self def __exit__(self, *args): self.disconnect()