dwfwfwfwf commited on
Commit
ab81e9c
·
verified ·
1 Parent(s): b4b2ca8

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +153 -0
app.py ADDED
@@ -0,0 +1,153 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py (Version 6 - Final Correction)
2
+ import asyncio
3
+ from typing import Dict
4
+ import uvicorn
5
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
6
+ from fastapi.responses import HTMLResponse
7
+ from starlette.websockets import WebSocketState # <--- ADD THIS IMPORT
8
+
9
+ # --- Configuration ---
10
+ HOST_API_KEY = "HOST_SUPER_SECRET_KEY_123" # The Controller
11
+ USER_API_KEY = "USER_SECRET_KEY_456" # The Agent (executes commands)
12
+
13
+ app = FastAPI()
14
+
15
+ # --- Connection and Session Management ---
16
+ class ConnectionManager:
17
+ """A robust class to manage agents, controllers, and their sessions."""
18
+ def __init__(self):
19
+ self.agents: Dict[str, WebSocket] = {}
20
+ self.sessions: Dict[WebSocket, WebSocket] = {} # {controller: agent}
21
+ self.relay_tasks: Dict[WebSocket, asyncio.Task] = {} # {agent: task}
22
+
23
+ async def connect(self, websocket: WebSocket, client_id: str, role: str):
24
+ await websocket.accept()
25
+ if role == "user":
26
+ self.agents[client_id] = websocket
27
+ print(f"Agent connected: {client_id}")
28
+ elif role == "host":
29
+ print(f"Controller connected: {client_id}")
30
+
31
+ async def disconnect(self, websocket: WebSocket, client_id: str):
32
+ """A single, robust function to clean up any connection."""
33
+ # Case 1: An agent disconnected
34
+ if client_id in self.agents:
35
+ del self.agents[client_id]
36
+ controller_ws = next((c for c, a in self.sessions.items() if a == websocket), None)
37
+ if controller_ws:
38
+ try:
39
+ await controller_ws.send_text("\n[SERVER] The agent has disconnected. Session terminated.\n> ")
40
+ except:
41
+ pass
42
+ del self.sessions[controller_ws]
43
+ if websocket in self.relay_tasks:
44
+ self.relay_tasks[websocket].cancel()
45
+ del self.relay_tasks[websocket]
46
+ print(f"Agent cleaned up: {client_id}")
47
+
48
+ # Case 2: A controller disconnected
49
+ if websocket in self.sessions:
50
+ del self.sessions[websocket]
51
+ print(f"Controller session ended for {client_id}")
52
+
53
+ print(f"Client disconnected: {client_id}")
54
+
55
+ async def send_to_websocket(self, websocket: WebSocket, message: str):
56
+ """A safe wrapper for sending messages to prevent crashes."""
57
+ try:
58
+ if websocket.client_state == WebSocketState.CONNECTED:
59
+ await websocket.send_text(message)
60
+ return True
61
+ except (WebSocketDisconnect, RuntimeError):
62
+ pass
63
+ return False
64
+
65
+ manager = ConnectionManager()
66
+
67
+ def get_client_role(api_key: str):
68
+ if api_key == HOST_API_KEY: return "host"
69
+ if api_key == USER_API_KEY: return "user"
70
+ return None
71
+
72
+ @app.get("/")
73
+ async def get(): return HTMLResponse("<h1>Relay Server is Running</h1>")
74
+
75
+ async def relay_agent_to_controller(agent_ws: WebSocket, controller_ws: WebSocket):
76
+ """The dedicated task to forward an agent's output to its controller."""
77
+ try:
78
+ while True:
79
+ data = await agent_ws.receive_text()
80
+ if not await manager.send_to_websocket(controller_ws, data):
81
+ break
82
+ except (WebSocketDisconnect, asyncio.CancelledError):
83
+ pass
84
+ finally:
85
+ print("Relay task finished.")
86
+
87
+ @app.websocket("/ws/{api_key}")
88
+ async def websocket_endpoint(websocket: WebSocket, api_key: str):
89
+ role = get_client_role(api_key)
90
+ if not role:
91
+ await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
92
+ return
93
+
94
+ client_id = f"{role}-{websocket.client.host}-{websocket.client.port}"
95
+ await manager.connect(websocket, client_id, role)
96
+
97
+ try:
98
+ if role == "user": # Agent Logic
99
+ await manager.send_to_websocket(websocket, f"[SERVER] Connected as agent: {client_id}. Awaiting session.")
100
+ while True:
101
+ # *** THIS IS THE CORRECTED LINE ***
102
+ if websocket.client_state != WebSocketState.CONNECTED:
103
+ break
104
+ await asyncio.sleep(1)
105
+
106
+ elif role == "host": # Controller Logic
107
+ await manager.send_to_websocket(websocket, "[SERVER] Welcome, Controller! Commands: !list, !connect <agent_id>, !disconnect, !exit\n> ")
108
+ while True:
109
+ data = await websocket.receive_text()
110
+
111
+ if websocket in manager.sessions:
112
+ agent_socket = manager.sessions[websocket]
113
+ if data == "!disconnect":
114
+ manager.sessions.pop(websocket, None)
115
+ if agent_socket in manager.relay_tasks:
116
+ manager.relay_tasks[agent_socket].cancel()
117
+ del manager.relay_tasks[agent_socket]
118
+ await manager.send_to_websocket(websocket, "Session disconnected.\n> ")
119
+ else:
120
+ if not await manager.send_to_websocket(agent_socket, data):
121
+ await manager.send_to_websocket(websocket, "\n[SERVER] Failed to send command: Agent disconnected.\n> ")
122
+ manager.sessions.pop(websocket, None)
123
+ else: # Not in a session
124
+ if data.startswith("!list"):
125
+ agent_list = "Available agents:\n" + "".join(f"- {_id}\n" for _id in manager.agents.keys()) or "No agents online.\n"
126
+ await manager.send_to_websocket(websocket, agent_list + "> ")
127
+
128
+ elif data.startswith("!connect"):
129
+ parts = data.split(" ", 1)
130
+ if len(parts) < 2:
131
+ await manager.send_to_websocket(websocket, "Usage: !connect <agent_id>\n> ")
132
+ continue
133
+
134
+ target_agent_id = parts[1]
135
+ if target_agent_id in manager.agents:
136
+ agent_socket = manager.agents[target_agent_id]
137
+ manager.sessions[websocket] = agent_socket
138
+ task = asyncio.create_task(relay_agent_to_controller(agent_socket, websocket))
139
+ manager.relay_tasks[agent_socket] = task
140
+ await manager.send_to_websocket(websocket, f"[*] Connected to {target_agent_id}. You are now in a session.\n")
141
+ else:
142
+ await manager.send_to_websocket(websocket, "Agent not found.\n> ")
143
+
144
+ elif data == "!exit": break
145
+ else: await manager.send_to_websocket(websocket, "You are not connected to an agent. Use !list and !connect.\n> ")
146
+
147
+ except WebSocketDisconnect:
148
+ pass
149
+ finally:
150
+ await manager.disconnect(websocket, client_id)
151
+
152
+ if __name__ == "__main__":
153
+ uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)