from __future__ import annotations import asyncio import json import threading import time from typing import Protocol from urllib.parse import urlencode, urlparse from urllib.request import Request, urlopen from freeciv_env.adapter import ActionRef, RawSnapshot class FreecivSession(Protocol): def reset(self, seed: int | None = None) -> RawSnapshot: ... def apply_action(self, action_ref: ActionRef) -> RawSnapshot: ... def end_turn(self) -> RawSnapshot: ... def close(self) -> None: ... class _InteractiveBot: def __init__(self, session: "LiveFreecivSession"): from freecivbot.bot.base_bot import BaseBot class InteractiveBotImpl(BaseBot): def __init__(self, owner: "LiveFreecivSession"): super().__init__() self._owner = owner def conduct_turn(self, pplayer, info_controls, end_turn_hook): super().conduct_turn(pplayer, info_controls, end_turn_hook) self._publish_snapshot() def calculate_next_move(self): if self._turn_active: self._publish_snapshot() def _publish_snapshot(self): self._acquire_state() self._owner._publish_snapshot( RawSnapshot( turn=self.turn, state=self._turn_state, actions=self._turn_opts, ) ) self.impl = InteractiveBotImpl(session) class _ConfiguredCivClient: def __init__(self, bot, user_name: str, *, client_port: int, visual_monitor: bool = False): from freecivbot.civclient import CivClient class ConfiguredCivClientImpl(CivClient): def init_control(self, ws_client): self.ws_client = ws_client self.init_controller() if self.visual_monitor: self.monitor.start_monitor() login_message = { "pid": 4, "username": self.user_name, "capability": "+Freeciv.Web.Devel-3.2", "version_label": "-dev", "major_version": 3, "minor_version": 1, "patch_version": 90, "port": self.client_port, "password": None, "subject": None, } self.ws_client.send(login_message) def handle_chat_msg(self, packet): from freecivbot.utils.fc_events import E_UNDEFINED message = packet["message"] conn_id = packet["conn_id"] event = packet["event"] if message is None: return if event is None or event < 0 or event >= E_UNDEFINED: print("Undefined message event type") print(packet) print("\r\n") packet["event"] = event = E_UNDEFINED if conn_id in self.clstate.connections: message = "" + self.clstate.connections[conn_id]["username"] + ":" + message else: if "/metamessage" in message: return if "Metaserver message string" in message: return packet["message"] = message print(packet) print("\r\n") if "You are logged in as" in message: self.ws_client.send_message("/set minplayers 1") self.prepare_game() def handle_conn_info(self, packet): from freecivbot.connectivity.client_state import C_S_PREPARING from freecivbot.utils.freecivlog import freelog pconn = self.clstate.find_conn_by_id(packet["id"]) if not packet["used"]: if pconn is None: freelog(f"Server removed unknown connection {packet['id']}") return self.clstate.client_remove_cli_conn(pconn) pconn = None else: pplayer = self.player_ctrl.valid_player_by_number(packet["player_num"]) if pplayer is None: return packet["playing"] = pplayer if self.clstate.has_id(packet["id"]): self.clstate.init_state(packet) self.clstate.conn_list_append(packet) if self.clstate.has_id(packet["id"]) and self.clstate.cur_player() != packet["playing"]: self.clstate.set_client_state(C_S_PREPARING) self.impl = ConfiguredCivClientImpl( bot, user_name, client_port=client_port, visual_monitor=visual_monitor, ) class _ConfiguredCivConnection: def __init__(self, civ_client, base_url: str, *, owner: "LiveFreecivSession", wait_for_server: int = 120, retry_interval: int = 5): from math import ceil import websocket self._websocket = websocket self.client = civ_client self.base_url = base_url self._owner = owner self._loop = None self._owner._connection = self self.civserverport = self._reserve_client_port(base_url, civ_client.client_port) self.client.client_port = self.civserverport self.proxyport = 1000 + self.civserverport self._retry_interval = retry_interval self._num_retries = int(ceil(wait_for_server / retry_interval)) self._cur_retry = 0 self._ws_url = self._build_ws_url(base_url) self.network_init() def _build_ws_url(self, base_url: str) -> str: parsed = urlparse(base_url) scheme = "wss" if parsed.scheme == "https" else "ws" host = parsed.hostname or "localhost" port = parsed.port if port is None: port = 443 if scheme == "wss" else 80 return f"{scheme}://{host}:{port}/civsocket/{self.proxyport}" def _reserve_client_port(self, base_url: str, requested_port: int) -> int: parsed = urlparse(base_url) scheme = parsed.scheme or "http" host = parsed.hostname or "localhost" port = parsed.port if port is None: port = 443 if scheme == "https" else 80 query = urlencode({"civserverport": requested_port}) launcher_url = f"{scheme}://{host}:{port}/civclientlauncher?{query}" request = Request(launcher_url, method="POST") with urlopen(request, timeout=10) as response: result = response.headers.get("result") reserved_port = response.headers.get("port") if result != "success" or reserved_port is None: raise RuntimeError(f"failed to reserve freeciv client port via {launcher_url}") return int(reserved_port) def _retry(self): self._cur_retry += 1 time.sleep(self._retry_interval) return self._detect_server_up() def _detect_server_up(self): ws = self._websocket.WebSocket() try: ws.connect(self._ws_url, timeout=10) return True except Exception as err: print("Connect not successful:", err, " retrying in %s seconds." % self._retry_interval) if self._cur_retry < self._num_retries: return self._retry() return False finally: try: ws.close() except Exception: pass def network_init(self): self._cur_retry = 0 print("Connecting to server at %s ..." % self.base_url) if self._detect_server_up(): self.websocket_init() else: print("Connection could not be established!") def websocket_init(self): from tornado import ioloop from freecivbot.connectivity.clinet import CivWSClient asyncio.set_event_loop(asyncio.new_event_loop()) ioloop.IOLoop.clear_current() self._loop = ioloop.IOLoop.current() client = CivWSClient(self.client) def send_json(data): if not client._ws_connection: raise RuntimeError("Web socket connection is closed.") msg = json.dumps(data, separators=(",", ":")) client._ws_connection.write_message(msg) client.send = send_json client.connect(self._ws_url) try: self._loop.start() except KeyboardInterrupt: client.close() def submit(self, fn) -> None: if self._loop is None: raise RuntimeError("freeciv connection loop is not ready") done = threading.Event() error: BaseException | None = None def run(): nonlocal error try: fn() except BaseException as exc: error = exc finally: done.set() self._loop.add_callback(run) if not done.wait(timeout=10): raise TimeoutError("timed out dispatching action to freeciv loop") if error is not None: raise error def close(self) -> None: if self._loop is None: return self.submit(self.client.close) class LiveFreecivSession: def __init__( self, *, username: str = "openenvbot", client_port: int = 6000, base_url: str = "http://localhost", turn_timeout_s: float = 60.0, ): self.username = username self.client_port = client_port self.base_url = base_url self.turn_timeout_s = turn_timeout_s self._bot_wrapper: _InteractiveBot | None = None self._client = None self._connection: _ConfiguredCivConnection | None = None self._thread: threading.Thread | None = None self._ready = threading.Event() self._snapshot_lock = threading.Lock() self._snapshot: RawSnapshot | None = None self._thread_error: BaseException | None = None self._reset_counter = 0 self._session_seed = time.monotonic_ns() % 1_000_000 def reset(self, seed: int | None = None) -> RawSnapshot: del seed self.close() self._reset_counter += 1 username = self._next_username() client_port = self.client_port + ((self._session_seed + self._reset_counter - 1) % 3) self._ready.clear() self._thread_error = None self._snapshot = None self._bot_wrapper = _InteractiveBot(self) self._client = _ConfiguredCivClient( self._bot_wrapper.impl, username, client_port=client_port, visual_monitor=False, ).impl def run() -> None: try: _ConfiguredCivConnection(self._client, self.base_url, owner=self) except BaseException as exc: # pragma: no cover - surfaced in waiters self._thread_error = exc self._ready.set() self._thread = threading.Thread(target=run, name="freeciv-live-session", daemon=True) self._thread.start() return self._wait_for_snapshot("reset") def apply_action(self, action_ref: ActionRef) -> RawSnapshot: snapshot = self._require_snapshot() action_list = snapshot.actions[action_ref.controller] valid_actions = action_list.get_actions(action_ref.actor_id, valid_only=True) action = None if valid_actions is None else valid_actions.get(action_ref.raw_action_key) if action is None: raise ValueError( f"action {action_ref.raw_action_key} is no longer valid for {action_ref.controller}:{action_ref.actor_id}" ) self._ready.clear() connection = self._require_connection() connection.submit(lambda: action_list.trigger_validated_action(action)) return self._wait_for_snapshot(action_ref.raw_action_key) def end_turn(self) -> RawSnapshot: if self._bot_wrapper is None: raise RuntimeError("session has not been reset") self._ready.clear() connection = self._require_connection() connection.submit(self._bot_wrapper.impl.end_turn) return self._wait_for_snapshot("end_turn") def close(self) -> None: if self._connection is not None: try: self._connection.close() except Exception: pass elif self._client is not None: try: self._client.close() except Exception: pass if self._thread is not None and self._thread.is_alive(): self._thread.join(timeout=5) self._bot_wrapper = None self._client = None self._connection = None self._thread = None self._snapshot = None self._thread_error = None self._ready.clear() def _publish_snapshot(self, snapshot: RawSnapshot) -> None: with self._snapshot_lock: self._snapshot = snapshot self._ready.set() def _next_username(self) -> str: suffix = str(self._session_seed + self._reset_counter) prefix_len = max(1, 31 - len(suffix)) return f"{self.username[:prefix_len]}{suffix}" def _require_connection(self) -> _ConfiguredCivConnection: if self._connection is None: raise RuntimeError("freeciv connection is not ready") return self._connection def _require_snapshot(self) -> RawSnapshot: with self._snapshot_lock: if self._snapshot is None: raise RuntimeError("no live snapshot is available") return self._snapshot def _wait_for_snapshot(self, reason: str) -> RawSnapshot: deadline = time.monotonic() + self.turn_timeout_s while time.monotonic() < deadline: if self._thread_error is not None: raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error if self._ready.wait(timeout=0.1): if self._thread_error is not None: raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error snapshot = self._require_snapshot() if snapshot is not None: return snapshot raise TimeoutError(f"timed out waiting for freeciv snapshot during {reason}")