Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import threading | |
| import time | |
| from typing import Protocol | |
| from urllib.parse import urlencode, urlparse | |
| from urllib.request import Request, urlopen | |
| from freeciv_env.adapter import ActionRef, RawSnapshot | |
| class FreecivSession(Protocol): | |
| def reset(self, seed: int | None = None) -> RawSnapshot: ... | |
| def apply_action(self, action_ref: ActionRef) -> RawSnapshot: ... | |
| def end_turn(self) -> RawSnapshot: ... | |
| def close(self) -> None: ... | |
| class _InteractiveBot: | |
| def __init__(self, session: "LiveFreecivSession"): | |
| from freecivbot.bot.base_bot import BaseBot | |
| class InteractiveBotImpl(BaseBot): | |
| def __init__(self, owner: "LiveFreecivSession"): | |
| super().__init__() | |
| self._owner = owner | |
| def conduct_turn(self, pplayer, info_controls, end_turn_hook): | |
| super().conduct_turn(pplayer, info_controls, end_turn_hook) | |
| self._publish_snapshot() | |
| def calculate_next_move(self): | |
| if self._turn_active: | |
| self._publish_snapshot() | |
| def _publish_snapshot(self): | |
| self._acquire_state() | |
| self._owner._publish_snapshot( | |
| RawSnapshot( | |
| turn=self.turn, | |
| state=self._turn_state, | |
| actions=self._turn_opts, | |
| ) | |
| ) | |
| self.impl = InteractiveBotImpl(session) | |
| class _ConfiguredCivClient: | |
| def __init__(self, bot, user_name: str, *, client_port: int, visual_monitor: bool = False): | |
| from freecivbot.civclient import CivClient | |
| class ConfiguredCivClientImpl(CivClient): | |
| def init_control(self, ws_client): | |
| self.ws_client = ws_client | |
| self.init_controller() | |
| if self.visual_monitor: | |
| self.monitor.start_monitor() | |
| login_message = { | |
| "pid": 4, | |
| "username": self.user_name, | |
| "capability": "+Freeciv.Web.Devel-3.2", | |
| "version_label": "-dev", | |
| "major_version": 3, | |
| "minor_version": 1, | |
| "patch_version": 90, | |
| "port": self.client_port, | |
| "password": None, | |
| "subject": None, | |
| } | |
| self.ws_client.send(login_message) | |
| def handle_chat_msg(self, packet): | |
| from freecivbot.utils.fc_events import E_UNDEFINED | |
| message = packet["message"] | |
| conn_id = packet["conn_id"] | |
| event = packet["event"] | |
| if message is None: | |
| return | |
| if event is None or event < 0 or event >= E_UNDEFINED: | |
| print("Undefined message event type") | |
| print(packet) | |
| print("\r\n") | |
| packet["event"] = event = E_UNDEFINED | |
| if conn_id in self.clstate.connections: | |
| message = "<b>" + self.clstate.connections[conn_id]["username"] + ":</b>" + message | |
| else: | |
| if "/metamessage" in message: | |
| return | |
| if "Metaserver message string" in message: | |
| return | |
| packet["message"] = message | |
| print(packet) | |
| print("\r\n") | |
| if "You are logged in as" in message: | |
| self.ws_client.send_message("/set minplayers 1") | |
| self.prepare_game() | |
| def handle_conn_info(self, packet): | |
| from freecivbot.connectivity.client_state import C_S_PREPARING | |
| from freecivbot.utils.freecivlog import freelog | |
| pconn = self.clstate.find_conn_by_id(packet["id"]) | |
| if not packet["used"]: | |
| if pconn is None: | |
| freelog(f"Server removed unknown connection {packet['id']}") | |
| return | |
| self.clstate.client_remove_cli_conn(pconn) | |
| pconn = None | |
| else: | |
| pplayer = self.player_ctrl.valid_player_by_number(packet["player_num"]) | |
| if pplayer is None: | |
| return | |
| packet["playing"] = pplayer | |
| if self.clstate.has_id(packet["id"]): | |
| self.clstate.init_state(packet) | |
| self.clstate.conn_list_append(packet) | |
| if self.clstate.has_id(packet["id"]) and self.clstate.cur_player() != packet["playing"]: | |
| self.clstate.set_client_state(C_S_PREPARING) | |
| self.impl = ConfiguredCivClientImpl( | |
| bot, | |
| user_name, | |
| client_port=client_port, | |
| visual_monitor=visual_monitor, | |
| ) | |
| class _ConfiguredCivConnection: | |
| def __init__(self, civ_client, base_url: str, *, owner: "LiveFreecivSession", wait_for_server: int = 120, retry_interval: int = 5): | |
| from math import ceil | |
| import websocket | |
| self._websocket = websocket | |
| self.client = civ_client | |
| self.base_url = base_url | |
| self._owner = owner | |
| self._loop = None | |
| self._owner._connection = self | |
| self.civserverport = self._reserve_client_port(base_url, civ_client.client_port) | |
| self.client.client_port = self.civserverport | |
| self.proxyport = 1000 + self.civserverport | |
| self._retry_interval = retry_interval | |
| self._num_retries = int(ceil(wait_for_server / retry_interval)) | |
| self._cur_retry = 0 | |
| self._ws_url = self._build_ws_url(base_url) | |
| self.network_init() | |
| def _build_ws_url(self, base_url: str) -> str: | |
| parsed = urlparse(base_url) | |
| scheme = "wss" if parsed.scheme == "https" else "ws" | |
| host = parsed.hostname or "localhost" | |
| port = parsed.port | |
| if port is None: | |
| port = 443 if scheme == "wss" else 80 | |
| return f"{scheme}://{host}:{port}/civsocket/{self.proxyport}" | |
| def _reserve_client_port(self, base_url: str, requested_port: int) -> int: | |
| parsed = urlparse(base_url) | |
| scheme = parsed.scheme or "http" | |
| host = parsed.hostname or "localhost" | |
| port = parsed.port | |
| if port is None: | |
| port = 443 if scheme == "https" else 80 | |
| query = urlencode({"civserverport": requested_port}) | |
| launcher_url = f"{scheme}://{host}:{port}/civclientlauncher?{query}" | |
| request = Request(launcher_url, method="POST") | |
| with urlopen(request, timeout=10) as response: | |
| result = response.headers.get("result") | |
| reserved_port = response.headers.get("port") | |
| if result != "success" or reserved_port is None: | |
| raise RuntimeError(f"failed to reserve freeciv client port via {launcher_url}") | |
| return int(reserved_port) | |
| def _retry(self): | |
| self._cur_retry += 1 | |
| time.sleep(self._retry_interval) | |
| return self._detect_server_up() | |
| def _detect_server_up(self): | |
| ws = self._websocket.WebSocket() | |
| try: | |
| ws.connect(self._ws_url, timeout=10) | |
| return True | |
| except Exception as err: | |
| print("Connect not successful:", err, " retrying in %s seconds." % self._retry_interval) | |
| if self._cur_retry < self._num_retries: | |
| return self._retry() | |
| return False | |
| finally: | |
| try: | |
| ws.close() | |
| except Exception: | |
| pass | |
| def network_init(self): | |
| self._cur_retry = 0 | |
| print("Connecting to server at %s ..." % self.base_url) | |
| if self._detect_server_up(): | |
| self.websocket_init() | |
| else: | |
| print("Connection could not be established!") | |
| def websocket_init(self): | |
| from tornado import ioloop | |
| from freecivbot.connectivity.clinet import CivWSClient | |
| asyncio.set_event_loop(asyncio.new_event_loop()) | |
| ioloop.IOLoop.clear_current() | |
| self._loop = ioloop.IOLoop.current() | |
| client = CivWSClient(self.client) | |
| def send_json(data): | |
| if not client._ws_connection: | |
| raise RuntimeError("Web socket connection is closed.") | |
| msg = json.dumps(data, separators=(",", ":")) | |
| client._ws_connection.write_message(msg) | |
| client.send = send_json | |
| client.connect(self._ws_url) | |
| try: | |
| self._loop.start() | |
| except KeyboardInterrupt: | |
| client.close() | |
| def submit(self, fn) -> None: | |
| if self._loop is None: | |
| raise RuntimeError("freeciv connection loop is not ready") | |
| done = threading.Event() | |
| error: BaseException | None = None | |
| def run(): | |
| nonlocal error | |
| try: | |
| fn() | |
| except BaseException as exc: | |
| error = exc | |
| finally: | |
| done.set() | |
| self._loop.add_callback(run) | |
| if not done.wait(timeout=10): | |
| raise TimeoutError("timed out dispatching action to freeciv loop") | |
| if error is not None: | |
| raise error | |
| def close(self) -> None: | |
| if self._loop is None: | |
| return | |
| self.submit(self.client.close) | |
| class LiveFreecivSession: | |
| def __init__( | |
| self, | |
| *, | |
| username: str = "openenvbot", | |
| client_port: int = 6000, | |
| base_url: str = "http://localhost", | |
| turn_timeout_s: float = 60.0, | |
| ): | |
| self.username = username | |
| self.client_port = client_port | |
| self.base_url = base_url | |
| self.turn_timeout_s = turn_timeout_s | |
| self._bot_wrapper: _InteractiveBot | None = None | |
| self._client = None | |
| self._connection: _ConfiguredCivConnection | None = None | |
| self._thread: threading.Thread | None = None | |
| self._ready = threading.Event() | |
| self._snapshot_lock = threading.Lock() | |
| self._snapshot: RawSnapshot | None = None | |
| self._thread_error: BaseException | None = None | |
| self._reset_counter = 0 | |
| self._session_seed = time.monotonic_ns() % 1_000_000 | |
| def reset(self, seed: int | None = None) -> RawSnapshot: | |
| del seed | |
| self.close() | |
| self._reset_counter += 1 | |
| username = self._next_username() | |
| client_port = self.client_port + ((self._session_seed + self._reset_counter - 1) % 3) | |
| self._ready.clear() | |
| self._thread_error = None | |
| self._snapshot = None | |
| self._bot_wrapper = _InteractiveBot(self) | |
| self._client = _ConfiguredCivClient( | |
| self._bot_wrapper.impl, | |
| username, | |
| client_port=client_port, | |
| visual_monitor=False, | |
| ).impl | |
| def run() -> None: | |
| try: | |
| _ConfiguredCivConnection(self._client, self.base_url, owner=self) | |
| except BaseException as exc: # pragma: no cover - surfaced in waiters | |
| self._thread_error = exc | |
| self._ready.set() | |
| self._thread = threading.Thread(target=run, name="freeciv-live-session", daemon=True) | |
| self._thread.start() | |
| return self._wait_for_snapshot("reset") | |
| def apply_action(self, action_ref: ActionRef) -> RawSnapshot: | |
| snapshot = self._require_snapshot() | |
| action_list = snapshot.actions[action_ref.controller] | |
| valid_actions = action_list.get_actions(action_ref.actor_id, valid_only=True) | |
| action = None if valid_actions is None else valid_actions.get(action_ref.raw_action_key) | |
| if action is None: | |
| raise ValueError( | |
| f"action {action_ref.raw_action_key} is no longer valid for {action_ref.controller}:{action_ref.actor_id}" | |
| ) | |
| self._ready.clear() | |
| connection = self._require_connection() | |
| connection.submit(lambda: action_list.trigger_validated_action(action)) | |
| return self._wait_for_snapshot(action_ref.raw_action_key) | |
| def end_turn(self) -> RawSnapshot: | |
| if self._bot_wrapper is None: | |
| raise RuntimeError("session has not been reset") | |
| self._ready.clear() | |
| connection = self._require_connection() | |
| connection.submit(self._bot_wrapper.impl.end_turn) | |
| return self._wait_for_snapshot("end_turn") | |
| def close(self) -> None: | |
| if self._connection is not None: | |
| try: | |
| self._connection.close() | |
| except Exception: | |
| pass | |
| elif self._client is not None: | |
| try: | |
| self._client.close() | |
| except Exception: | |
| pass | |
| if self._thread is not None and self._thread.is_alive(): | |
| self._thread.join(timeout=5) | |
| self._bot_wrapper = None | |
| self._client = None | |
| self._connection = None | |
| self._thread = None | |
| self._snapshot = None | |
| self._thread_error = None | |
| self._ready.clear() | |
| def _publish_snapshot(self, snapshot: RawSnapshot) -> None: | |
| with self._snapshot_lock: | |
| self._snapshot = snapshot | |
| self._ready.set() | |
| def _next_username(self) -> str: | |
| suffix = str(self._session_seed + self._reset_counter) | |
| prefix_len = max(1, 31 - len(suffix)) | |
| return f"{self.username[:prefix_len]}{suffix}" | |
| def _require_connection(self) -> _ConfiguredCivConnection: | |
| if self._connection is None: | |
| raise RuntimeError("freeciv connection is not ready") | |
| return self._connection | |
| def _require_snapshot(self) -> RawSnapshot: | |
| with self._snapshot_lock: | |
| if self._snapshot is None: | |
| raise RuntimeError("no live snapshot is available") | |
| return self._snapshot | |
| def _wait_for_snapshot(self, reason: str) -> RawSnapshot: | |
| deadline = time.monotonic() + self.turn_timeout_s | |
| while time.monotonic() < deadline: | |
| if self._thread_error is not None: | |
| raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error | |
| if self._ready.wait(timeout=0.1): | |
| if self._thread_error is not None: | |
| raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error | |
| snapshot = self._require_snapshot() | |
| if snapshot is not None: | |
| return snapshot | |
| raise TimeoutError(f"timed out waiting for freeciv snapshot during {reason}") | |