thomasm6m6's picture
Initial Freeciv OpenEnv Space
8dc7642 verified
from __future__ import annotations
import asyncio
import json
import threading
import time
from typing import Protocol
from urllib.parse import urlencode, urlparse
from urllib.request import Request, urlopen
from freeciv_env.adapter import ActionRef, RawSnapshot
class FreecivSession(Protocol):
def reset(self, seed: int | None = None) -> RawSnapshot: ...
def apply_action(self, action_ref: ActionRef) -> RawSnapshot: ...
def end_turn(self) -> RawSnapshot: ...
def close(self) -> None: ...
class _InteractiveBot:
def __init__(self, session: "LiveFreecivSession"):
from freecivbot.bot.base_bot import BaseBot
class InteractiveBotImpl(BaseBot):
def __init__(self, owner: "LiveFreecivSession"):
super().__init__()
self._owner = owner
def conduct_turn(self, pplayer, info_controls, end_turn_hook):
super().conduct_turn(pplayer, info_controls, end_turn_hook)
self._publish_snapshot()
def calculate_next_move(self):
if self._turn_active:
self._publish_snapshot()
def _publish_snapshot(self):
self._acquire_state()
self._owner._publish_snapshot(
RawSnapshot(
turn=self.turn,
state=self._turn_state,
actions=self._turn_opts,
)
)
self.impl = InteractiveBotImpl(session)
class _ConfiguredCivClient:
def __init__(self, bot, user_name: str, *, client_port: int, visual_monitor: bool = False):
from freecivbot.civclient import CivClient
class ConfiguredCivClientImpl(CivClient):
def init_control(self, ws_client):
self.ws_client = ws_client
self.init_controller()
if self.visual_monitor:
self.monitor.start_monitor()
login_message = {
"pid": 4,
"username": self.user_name,
"capability": "+Freeciv.Web.Devel-3.2",
"version_label": "-dev",
"major_version": 3,
"minor_version": 1,
"patch_version": 90,
"port": self.client_port,
"password": None,
"subject": None,
}
self.ws_client.send(login_message)
def handle_chat_msg(self, packet):
from freecivbot.utils.fc_events import E_UNDEFINED
message = packet["message"]
conn_id = packet["conn_id"]
event = packet["event"]
if message is None:
return
if event is None or event < 0 or event >= E_UNDEFINED:
print("Undefined message event type")
print(packet)
print("\r\n")
packet["event"] = event = E_UNDEFINED
if conn_id in self.clstate.connections:
message = "<b>" + self.clstate.connections[conn_id]["username"] + ":</b>" + message
else:
if "/metamessage" in message:
return
if "Metaserver message string" in message:
return
packet["message"] = message
print(packet)
print("\r\n")
if "You are logged in as" in message:
self.ws_client.send_message("/set minplayers 1")
self.prepare_game()
def handle_conn_info(self, packet):
from freecivbot.connectivity.client_state import C_S_PREPARING
from freecivbot.utils.freecivlog import freelog
pconn = self.clstate.find_conn_by_id(packet["id"])
if not packet["used"]:
if pconn is None:
freelog(f"Server removed unknown connection {packet['id']}")
return
self.clstate.client_remove_cli_conn(pconn)
pconn = None
else:
pplayer = self.player_ctrl.valid_player_by_number(packet["player_num"])
if pplayer is None:
return
packet["playing"] = pplayer
if self.clstate.has_id(packet["id"]):
self.clstate.init_state(packet)
self.clstate.conn_list_append(packet)
if self.clstate.has_id(packet["id"]) and self.clstate.cur_player() != packet["playing"]:
self.clstate.set_client_state(C_S_PREPARING)
self.impl = ConfiguredCivClientImpl(
bot,
user_name,
client_port=client_port,
visual_monitor=visual_monitor,
)
class _ConfiguredCivConnection:
def __init__(self, civ_client, base_url: str, *, owner: "LiveFreecivSession", wait_for_server: int = 120, retry_interval: int = 5):
from math import ceil
import websocket
self._websocket = websocket
self.client = civ_client
self.base_url = base_url
self._owner = owner
self._loop = None
self._owner._connection = self
self.civserverport = self._reserve_client_port(base_url, civ_client.client_port)
self.client.client_port = self.civserverport
self.proxyport = 1000 + self.civserverport
self._retry_interval = retry_interval
self._num_retries = int(ceil(wait_for_server / retry_interval))
self._cur_retry = 0
self._ws_url = self._build_ws_url(base_url)
self.network_init()
def _build_ws_url(self, base_url: str) -> str:
parsed = urlparse(base_url)
scheme = "wss" if parsed.scheme == "https" else "ws"
host = parsed.hostname or "localhost"
port = parsed.port
if port is None:
port = 443 if scheme == "wss" else 80
return f"{scheme}://{host}:{port}/civsocket/{self.proxyport}"
def _reserve_client_port(self, base_url: str, requested_port: int) -> int:
parsed = urlparse(base_url)
scheme = parsed.scheme or "http"
host = parsed.hostname or "localhost"
port = parsed.port
if port is None:
port = 443 if scheme == "https" else 80
query = urlencode({"civserverport": requested_port})
launcher_url = f"{scheme}://{host}:{port}/civclientlauncher?{query}"
request = Request(launcher_url, method="POST")
with urlopen(request, timeout=10) as response:
result = response.headers.get("result")
reserved_port = response.headers.get("port")
if result != "success" or reserved_port is None:
raise RuntimeError(f"failed to reserve freeciv client port via {launcher_url}")
return int(reserved_port)
def _retry(self):
self._cur_retry += 1
time.sleep(self._retry_interval)
return self._detect_server_up()
def _detect_server_up(self):
ws = self._websocket.WebSocket()
try:
ws.connect(self._ws_url, timeout=10)
return True
except Exception as err:
print("Connect not successful:", err, " retrying in %s seconds." % self._retry_interval)
if self._cur_retry < self._num_retries:
return self._retry()
return False
finally:
try:
ws.close()
except Exception:
pass
def network_init(self):
self._cur_retry = 0
print("Connecting to server at %s ..." % self.base_url)
if self._detect_server_up():
self.websocket_init()
else:
print("Connection could not be established!")
def websocket_init(self):
from tornado import ioloop
from freecivbot.connectivity.clinet import CivWSClient
asyncio.set_event_loop(asyncio.new_event_loop())
ioloop.IOLoop.clear_current()
self._loop = ioloop.IOLoop.current()
client = CivWSClient(self.client)
def send_json(data):
if not client._ws_connection:
raise RuntimeError("Web socket connection is closed.")
msg = json.dumps(data, separators=(",", ":"))
client._ws_connection.write_message(msg)
client.send = send_json
client.connect(self._ws_url)
try:
self._loop.start()
except KeyboardInterrupt:
client.close()
def submit(self, fn) -> None:
if self._loop is None:
raise RuntimeError("freeciv connection loop is not ready")
done = threading.Event()
error: BaseException | None = None
def run():
nonlocal error
try:
fn()
except BaseException as exc:
error = exc
finally:
done.set()
self._loop.add_callback(run)
if not done.wait(timeout=10):
raise TimeoutError("timed out dispatching action to freeciv loop")
if error is not None:
raise error
def close(self) -> None:
if self._loop is None:
return
self.submit(self.client.close)
class LiveFreecivSession:
def __init__(
self,
*,
username: str = "openenvbot",
client_port: int = 6000,
base_url: str = "http://localhost",
turn_timeout_s: float = 60.0,
):
self.username = username
self.client_port = client_port
self.base_url = base_url
self.turn_timeout_s = turn_timeout_s
self._bot_wrapper: _InteractiveBot | None = None
self._client = None
self._connection: _ConfiguredCivConnection | None = None
self._thread: threading.Thread | None = None
self._ready = threading.Event()
self._snapshot_lock = threading.Lock()
self._snapshot: RawSnapshot | None = None
self._thread_error: BaseException | None = None
self._reset_counter = 0
self._session_seed = time.monotonic_ns() % 1_000_000
def reset(self, seed: int | None = None) -> RawSnapshot:
del seed
self.close()
self._reset_counter += 1
username = self._next_username()
client_port = self.client_port + ((self._session_seed + self._reset_counter - 1) % 3)
self._ready.clear()
self._thread_error = None
self._snapshot = None
self._bot_wrapper = _InteractiveBot(self)
self._client = _ConfiguredCivClient(
self._bot_wrapper.impl,
username,
client_port=client_port,
visual_monitor=False,
).impl
def run() -> None:
try:
_ConfiguredCivConnection(self._client, self.base_url, owner=self)
except BaseException as exc: # pragma: no cover - surfaced in waiters
self._thread_error = exc
self._ready.set()
self._thread = threading.Thread(target=run, name="freeciv-live-session", daemon=True)
self._thread.start()
return self._wait_for_snapshot("reset")
def apply_action(self, action_ref: ActionRef) -> RawSnapshot:
snapshot = self._require_snapshot()
action_list = snapshot.actions[action_ref.controller]
valid_actions = action_list.get_actions(action_ref.actor_id, valid_only=True)
action = None if valid_actions is None else valid_actions.get(action_ref.raw_action_key)
if action is None:
raise ValueError(
f"action {action_ref.raw_action_key} is no longer valid for {action_ref.controller}:{action_ref.actor_id}"
)
self._ready.clear()
connection = self._require_connection()
connection.submit(lambda: action_list.trigger_validated_action(action))
return self._wait_for_snapshot(action_ref.raw_action_key)
def end_turn(self) -> RawSnapshot:
if self._bot_wrapper is None:
raise RuntimeError("session has not been reset")
self._ready.clear()
connection = self._require_connection()
connection.submit(self._bot_wrapper.impl.end_turn)
return self._wait_for_snapshot("end_turn")
def close(self) -> None:
if self._connection is not None:
try:
self._connection.close()
except Exception:
pass
elif self._client is not None:
try:
self._client.close()
except Exception:
pass
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=5)
self._bot_wrapper = None
self._client = None
self._connection = None
self._thread = None
self._snapshot = None
self._thread_error = None
self._ready.clear()
def _publish_snapshot(self, snapshot: RawSnapshot) -> None:
with self._snapshot_lock:
self._snapshot = snapshot
self._ready.set()
def _next_username(self) -> str:
suffix = str(self._session_seed + self._reset_counter)
prefix_len = max(1, 31 - len(suffix))
return f"{self.username[:prefix_len]}{suffix}"
def _require_connection(self) -> _ConfiguredCivConnection:
if self._connection is None:
raise RuntimeError("freeciv connection is not ready")
return self._connection
def _require_snapshot(self) -> RawSnapshot:
with self._snapshot_lock:
if self._snapshot is None:
raise RuntimeError("no live snapshot is available")
return self._snapshot
def _wait_for_snapshot(self, reason: str) -> RawSnapshot:
deadline = time.monotonic() + self.turn_timeout_s
while time.monotonic() < deadline:
if self._thread_error is not None:
raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error
if self._ready.wait(timeout=0.1):
if self._thread_error is not None:
raise RuntimeError(f"freeciv session failed during {reason}") from self._thread_error
snapshot = self._require_snapshot()
if snapshot is not None:
return snapshot
raise TimeoutError(f"timed out waiting for freeciv snapshot during {reason}")