File size: 6,936 Bytes
ab81e9c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# app.py (Version 6 - Final Correction)
import asyncio
from typing import Dict
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
from fastapi.responses import HTMLResponse
from starlette.websockets import WebSocketState # <--- ADD THIS IMPORT
# --- Configuration ---
HOST_API_KEY = "HOST_SUPER_SECRET_KEY_123" # The Controller
USER_API_KEY = "USER_SECRET_KEY_456" # The Agent (executes commands)
app = FastAPI()
# --- Connection and Session Management ---
class ConnectionManager:
"""A robust class to manage agents, controllers, and their sessions."""
def __init__(self):
self.agents: Dict[str, WebSocket] = {}
self.sessions: Dict[WebSocket, WebSocket] = {} # {controller: agent}
self.relay_tasks: Dict[WebSocket, asyncio.Task] = {} # {agent: task}
async def connect(self, websocket: WebSocket, client_id: str, role: str):
await websocket.accept()
if role == "user":
self.agents[client_id] = websocket
print(f"Agent connected: {client_id}")
elif role == "host":
print(f"Controller connected: {client_id}")
async def disconnect(self, websocket: WebSocket, client_id: str):
"""A single, robust function to clean up any connection."""
# Case 1: An agent disconnected
if client_id in self.agents:
del self.agents[client_id]
controller_ws = next((c for c, a in self.sessions.items() if a == websocket), None)
if controller_ws:
try:
await controller_ws.send_text("\n[SERVER] The agent has disconnected. Session terminated.\n> ")
except:
pass
del self.sessions[controller_ws]
if websocket in self.relay_tasks:
self.relay_tasks[websocket].cancel()
del self.relay_tasks[websocket]
print(f"Agent cleaned up: {client_id}")
# Case 2: A controller disconnected
if websocket in self.sessions:
del self.sessions[websocket]
print(f"Controller session ended for {client_id}")
print(f"Client disconnected: {client_id}")
async def send_to_websocket(self, websocket: WebSocket, message: str):
"""A safe wrapper for sending messages to prevent crashes."""
try:
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_text(message)
return True
except (WebSocketDisconnect, RuntimeError):
pass
return False
manager = ConnectionManager()
def get_client_role(api_key: str):
if api_key == HOST_API_KEY: return "host"
if api_key == USER_API_KEY: return "user"
return None
@app.get("/")
async def get(): return HTMLResponse("<h1>Relay Server is Running</h1>")
async def relay_agent_to_controller(agent_ws: WebSocket, controller_ws: WebSocket):
"""The dedicated task to forward an agent's output to its controller."""
try:
while True:
data = await agent_ws.receive_text()
if not await manager.send_to_websocket(controller_ws, data):
break
except (WebSocketDisconnect, asyncio.CancelledError):
pass
finally:
print("Relay task finished.")
@app.websocket("/ws/{api_key}")
async def websocket_endpoint(websocket: WebSocket, api_key: str):
role = get_client_role(api_key)
if not role:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
client_id = f"{role}-{websocket.client.host}-{websocket.client.port}"
await manager.connect(websocket, client_id, role)
try:
if role == "user": # Agent Logic
await manager.send_to_websocket(websocket, f"[SERVER] Connected as agent: {client_id}. Awaiting session.")
while True:
# *** THIS IS THE CORRECTED LINE ***
if websocket.client_state != WebSocketState.CONNECTED:
break
await asyncio.sleep(1)
elif role == "host": # Controller Logic
await manager.send_to_websocket(websocket, "[SERVER] Welcome, Controller! Commands: !list, !connect <agent_id>, !disconnect, !exit\n> ")
while True:
data = await websocket.receive_text()
if websocket in manager.sessions:
agent_socket = manager.sessions[websocket]
if data == "!disconnect":
manager.sessions.pop(websocket, None)
if agent_socket in manager.relay_tasks:
manager.relay_tasks[agent_socket].cancel()
del manager.relay_tasks[agent_socket]
await manager.send_to_websocket(websocket, "Session disconnected.\n> ")
else:
if not await manager.send_to_websocket(agent_socket, data):
await manager.send_to_websocket(websocket, "\n[SERVER] Failed to send command: Agent disconnected.\n> ")
manager.sessions.pop(websocket, None)
else: # Not in a session
if data.startswith("!list"):
agent_list = "Available agents:\n" + "".join(f"- {_id}\n" for _id in manager.agents.keys()) or "No agents online.\n"
await manager.send_to_websocket(websocket, agent_list + "> ")
elif data.startswith("!connect"):
parts = data.split(" ", 1)
if len(parts) < 2:
await manager.send_to_websocket(websocket, "Usage: !connect <agent_id>\n> ")
continue
target_agent_id = parts[1]
if target_agent_id in manager.agents:
agent_socket = manager.agents[target_agent_id]
manager.sessions[websocket] = agent_socket
task = asyncio.create_task(relay_agent_to_controller(agent_socket, websocket))
manager.relay_tasks[agent_socket] = task
await manager.send_to_websocket(websocket, f"[*] Connected to {target_agent_id}. You are now in a session.\n")
else:
await manager.send_to_websocket(websocket, "Agent not found.\n> ")
elif data == "!exit": break
else: await manager.send_to_websocket(websocket, "You are not connected to an agent. Use !list and !connect.\n> ")
except WebSocketDisconnect:
pass
finally:
await manager.disconnect(websocket, client_id)
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)
|