File size: 6,936 Bytes
ab81e9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# app.py (Version 6 - Final Correction)
import asyncio
from typing import Dict
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
from fastapi.responses import HTMLResponse
from starlette.websockets import WebSocketState # <--- ADD THIS IMPORT

# --- Configuration ---
HOST_API_KEY = "HOST_SUPER_SECRET_KEY_123"  # The Controller
USER_API_KEY = "USER_SECRET_KEY_456"      # The Agent (executes commands)

app = FastAPI()

# --- Connection and Session Management ---
class ConnectionManager:
    """A robust class to manage agents, controllers, and their sessions."""
    def __init__(self):
        self.agents: Dict[str, WebSocket] = {}
        self.sessions: Dict[WebSocket, WebSocket] = {}  # {controller: agent}
        self.relay_tasks: Dict[WebSocket, asyncio.Task] = {} # {agent: task}

    async def connect(self, websocket: WebSocket, client_id: str, role: str):
        await websocket.accept()
        if role == "user":
            self.agents[client_id] = websocket
            print(f"Agent connected: {client_id}")
        elif role == "host":
            print(f"Controller connected: {client_id}")

    async def disconnect(self, websocket: WebSocket, client_id: str):
        """A single, robust function to clean up any connection."""
        # Case 1: An agent disconnected
        if client_id in self.agents:
            del self.agents[client_id]
            controller_ws = next((c for c, a in self.sessions.items() if a == websocket), None)
            if controller_ws:
                try:
                    await controller_ws.send_text("\n[SERVER] The agent has disconnected. Session terminated.\n> ")
                except:
                    pass
                del self.sessions[controller_ws]
            if websocket in self.relay_tasks:
                self.relay_tasks[websocket].cancel()
                del self.relay_tasks[websocket]
            print(f"Agent cleaned up: {client_id}")

        # Case 2: A controller disconnected
        if websocket in self.sessions:
            del self.sessions[websocket]
            print(f"Controller session ended for {client_id}")

        print(f"Client disconnected: {client_id}")

    async def send_to_websocket(self, websocket: WebSocket, message: str):
        """A safe wrapper for sending messages to prevent crashes."""
        try:
            if websocket.client_state == WebSocketState.CONNECTED:
                await websocket.send_text(message)
                return True
        except (WebSocketDisconnect, RuntimeError):
            pass
        return False

manager = ConnectionManager()

def get_client_role(api_key: str):
    if api_key == HOST_API_KEY: return "host"
    if api_key == USER_API_KEY: return "user"
    return None

@app.get("/")
async def get(): return HTMLResponse("<h1>Relay Server is Running</h1>")

async def relay_agent_to_controller(agent_ws: WebSocket, controller_ws: WebSocket):
    """The dedicated task to forward an agent's output to its controller."""
    try:
        while True:
            data = await agent_ws.receive_text()
            if not await manager.send_to_websocket(controller_ws, data):
                break
    except (WebSocketDisconnect, asyncio.CancelledError):
        pass
    finally:
        print("Relay task finished.")

@app.websocket("/ws/{api_key}")
async def websocket_endpoint(websocket: WebSocket, api_key: str):
    role = get_client_role(api_key)
    if not role:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    client_id = f"{role}-{websocket.client.host}-{websocket.client.port}"
    await manager.connect(websocket, client_id, role)

    try:
        if role == "user":  # Agent Logic
            await manager.send_to_websocket(websocket, f"[SERVER] Connected as agent: {client_id}. Awaiting session.")
            while True:
                # *** THIS IS THE CORRECTED LINE ***
                if websocket.client_state != WebSocketState.CONNECTED:
                    break
                await asyncio.sleep(1)

        elif role == "host":  # Controller Logic
            await manager.send_to_websocket(websocket, "[SERVER] Welcome, Controller! Commands: !list, !connect <agent_id>, !disconnect, !exit\n> ")
            while True:
                data = await websocket.receive_text()

                if websocket in manager.sessions:
                    agent_socket = manager.sessions[websocket]
                    if data == "!disconnect":
                        manager.sessions.pop(websocket, None)
                        if agent_socket in manager.relay_tasks:
                            manager.relay_tasks[agent_socket].cancel()
                            del manager.relay_tasks[agent_socket]
                        await manager.send_to_websocket(websocket, "Session disconnected.\n> ")
                    else:
                        if not await manager.send_to_websocket(agent_socket, data):
                            await manager.send_to_websocket(websocket, "\n[SERVER] Failed to send command: Agent disconnected.\n> ")
                            manager.sessions.pop(websocket, None)
                else:  # Not in a session
                    if data.startswith("!list"):
                        agent_list = "Available agents:\n" + "".join(f"- {_id}\n" for _id in manager.agents.keys()) or "No agents online.\n"
                        await manager.send_to_websocket(websocket, agent_list + "> ")
                    
                    elif data.startswith("!connect"):
                        parts = data.split(" ", 1)
                        if len(parts) < 2:
                            await manager.send_to_websocket(websocket, "Usage: !connect <agent_id>\n> ")
                            continue
                        
                        target_agent_id = parts[1]
                        if target_agent_id in manager.agents:
                            agent_socket = manager.agents[target_agent_id]
                            manager.sessions[websocket] = agent_socket
                            task = asyncio.create_task(relay_agent_to_controller(agent_socket, websocket))
                            manager.relay_tasks[agent_socket] = task
                            await manager.send_to_websocket(websocket, f"[*] Connected to {target_agent_id}. You are now in a session.\n")
                        else:
                            await manager.send_to_websocket(websocket, "Agent not found.\n> ")
                    
                    elif data == "!exit": break
                    else: await manager.send_to_websocket(websocket, "You are not connected to an agent. Use !list and !connect.\n> ")

    except WebSocketDisconnect:
        pass
    finally:
        await manager.disconnect(websocket, client_id)

if __name__ == "__main__":
    uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)