ChandimaPrabath commited on
Commit
54ebe81
·
verified ·
1 Parent(s): 9c8dee3

Update relay.py

Browse files
Files changed (1) hide show
  1. relay.py +132 -116
relay.py CHANGED
@@ -1,116 +1,132 @@
1
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2
- from fastapi.middleware.cors import CORSMiddleware
3
- import json
4
- from datetime import datetime
5
- import httpx
6
- from dotenv import load_dotenv
7
- import os
8
-
9
- load_dotenv()
10
-
11
- SERVICES_URL = os.getenv("SERVICES_URL")
12
-
13
- app = FastAPI()
14
-
15
- app.add_middleware(
16
- CORSMiddleware,
17
- allow_origins=["*"], # Allow all origins; use specific domains for security
18
- allow_credentials=True,
19
- allow_methods=["*"],
20
- allow_headers=["*"],
21
- )
22
-
23
- # Dictionary to hold active connections
24
- active_connections = {}
25
-
26
- # Dictionary to store undelivered messages
27
- message_store = {}
28
-
29
- async def register_client(websocket: WebSocket, username: str):
30
- """Register a new client."""
31
- active_connections[username] = websocket
32
- print(f"DEBUG: {username} connected.")
33
-
34
- # Deliver undelivered messages if any
35
- if username in message_store:
36
- for message in message_store[username]:
37
- print(f"DEBUG: Sending undelivered message to {username}: {message}")
38
- await websocket.send_text(json.dumps(message))
39
- del message_store[username] # Clear delivered messages
40
- print(f"DEBUG: Cleared stored messages for {username}")
41
-
42
- async def unregister_client(username: str):
43
- """Unregister a client."""
44
- if username in active_connections:
45
- del active_connections[username]
46
- print(f"DEBUG: {username} disconnected.")
47
-
48
- @app.websocket("/ws")
49
- async def relay_server(websocket: WebSocket):
50
- """Relay server handling WebSocket connections."""
51
- username = None
52
- try:
53
- await websocket.accept()
54
- print("DEBUG: WebSocket connection accepted.")
55
-
56
- # Initial login
57
- login_data = await websocket.receive_text()
58
- print(f"DEBUG: Received login data: {login_data}")
59
- login_details = json.loads(login_data)
60
- username = login_details.get("username")
61
- password = login_details.get("password")
62
-
63
- # Authenticate with services.py (user management)
64
- print(f"DEBUG: Authenticating user: {username}")
65
- async with httpx.AsyncClient() as client:
66
- response = await client.post(f"{SERVICES_URL}/login", json={"username": username, "password": password})
67
-
68
- if response.status_code == 200:
69
- print(f"DEBUG: Authentication successful for {username}")
70
- await websocket.send_text(json.dumps({"status": "success", "message": "Authenticated"}))
71
- await register_client(websocket, username)
72
- else:
73
- print(f"DEBUG: Authentication failed for {username}")
74
- await websocket.send_text(json.dumps({"status": "error", "message": "Invalid credentials"}))
75
- return
76
-
77
- # Relay messages
78
- while True:
79
- try:
80
- message = await websocket.receive_text()
81
- print(f"DEBUG: Received message: {message}")
82
- msg_data = json.loads(message)
83
- recipient = msg_data.get("recipient")
84
- msg_content = msg_data.get("message")
85
- timestamp = datetime.now().isoformat()
86
-
87
- # Create message object
88
- message_obj = {
89
- "from": username,
90
- "message": msg_content,
91
- "timestamp": timestamp
92
- }
93
-
94
- # Validate recipient and send/deliver message
95
- print(f"DEBUG: Sending message to recipient: {recipient}")
96
- if recipient in active_connections:
97
- recipient_socket = active_connections[recipient]
98
- await recipient_socket.send_text(json.dumps(message_obj))
99
- print(f"DEBUG: Message sent to {recipient}: {message_obj}")
100
- else:
101
- # Store undelivered message
102
- if recipient not in message_store:
103
- message_store[recipient] = []
104
- message_store[recipient].append(message_obj)
105
- print(f"DEBUG: Message stored for {recipient}: {message_obj}")
106
- await websocket.send_text(json.dumps({"status": "success", "message": "Message stored for delivery"}))
107
- except Exception as e:
108
- # Handle errors gracefully
109
- await websocket.send_text(json.dumps({"status": "error", "message": "Message processing error"}))
110
- print(f"DEBUG: Error processing message: {e}")
111
-
112
- except WebSocketDisconnect:
113
- print(f"DEBUG: Connection with {username} closed.")
114
- finally:
115
- if username:
116
- await unregister_client(username)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ import json
4
+ from datetime import datetime
5
+ import httpx
6
+ from dotenv import load_dotenv
7
+ import os
8
+
9
+ load_dotenv()
10
+
11
+ SERVICES_URL = os.getenv("SERVICES_URL")
12
+
13
+ app = FastAPI()
14
+
15
+ app.add_middleware(
16
+ CORSMiddleware,
17
+ allow_origins=["*"], # Allow all origins; use specific domains for security
18
+ allow_credentials=True,
19
+ allow_methods=["*"],
20
+ allow_headers=["*"],
21
+ )
22
+
23
+ # Dictionary to hold active connections
24
+ active_connections = {}
25
+
26
+ # Dictionary to store undelivered messages
27
+ message_store = {}
28
+
29
+ async def register_client(websocket: WebSocket, username: str):
30
+ """Register a new client."""
31
+ active_connections[username] = websocket
32
+ print(f"DEBUG: {username} connected.")
33
+
34
+ # Deliver undelivered messages if any
35
+ if username in message_store:
36
+ for message in message_store[username]:
37
+ print(f"DEBUG: Sending undelivered message to {username}: {message}")
38
+ await websocket.send_text(json.dumps(message))
39
+ del message_store[username] # Clear delivered messages
40
+ print(f"DEBUG: Cleared stored messages for {username}")
41
+
42
+ async def unregister_client(username: str):
43
+ """Unregister a client."""
44
+ if username in active_connections:
45
+ del active_connections[username]
46
+ print(f"DEBUG: {username} disconnected.")
47
+
48
+ @app.websocket("/ws")
49
+ async def relay_server(websocket: WebSocket):
50
+ """Relay server handling WebSocket connections."""
51
+ username = None
52
+ try:
53
+ await websocket.accept()
54
+ print("DEBUG: WebSocket connection accepted.")
55
+
56
+ # Initial login or signup
57
+ login_data = await websocket.receive_text()
58
+ print(f"DEBUG: Received initial data: {login_data}")
59
+ user_data = json.loads(login_data)
60
+ action = user_data.get("action") # 'login' or 'signup'
61
+ username = user_data.get("username")
62
+ password = user_data.get("password")
63
+
64
+ if action == "signup":
65
+ # Attempt user registration
66
+ print(f"DEBUG: Registering new user: {username}")
67
+ async with httpx.AsyncClient() as client:
68
+ response = await client.post(f"{SERVICES_URL}/register", json={"username": username, "password": password})
69
+ if response.status_code == 200:
70
+ print(f"DEBUG: Registration successful for {username}")
71
+ await websocket.send_text(json.dumps({"status": "success", "message": "Registration successful"}))
72
+ else:
73
+ error_message = response.json().get("detail", "Registration failed")
74
+ print(f"DEBUG: Registration failed for {username}: {error_message}")
75
+ await websocket.send_text(json.dumps({"status": "error", "message": error_message}))
76
+ return
77
+
78
+ # Proceed with login
79
+ print(f"DEBUG: Authenticating user: {username}")
80
+ async with httpx.AsyncClient() as client:
81
+ response = await client.post(f"{SERVICES_URL}/login", json={"username": username, "password": password})
82
+
83
+ if response.status_code == 200:
84
+ print(f"DEBUG: Authentication successful for {username}")
85
+ await websocket.send_text(json.dumps({"status": "success", "message": "Authenticated"}))
86
+ await register_client(websocket, username)
87
+ else:
88
+ error_message = response.json().get("detail", "Invalid credentials")
89
+ print(f"DEBUG: Authentication failed for {username}: {error_message}")
90
+ await websocket.send_text(json.dumps({"status": "error", "message": error_message}))
91
+ return
92
+
93
+ # Relay messages
94
+ while True:
95
+ try:
96
+ message = await websocket.receive_text()
97
+ print(f"DEBUG: Received message: {message}")
98
+ msg_data = json.loads(message)
99
+ recipient = msg_data.get("recipient")
100
+ msg_content = msg_data.get("message")
101
+ timestamp = datetime.now().isoformat()
102
+
103
+ # Create message object
104
+ message_obj = {
105
+ "from": username,
106
+ "message": msg_content,
107
+ "timestamp": timestamp
108
+ }
109
+
110
+ # Validate recipient and send/deliver message
111
+ print(f"DEBUG: Sending message to recipient: {recipient}")
112
+ if recipient in active_connections:
113
+ recipient_socket = active_connections[recipient]
114
+ await recipient_socket.send_text(json.dumps(message_obj))
115
+ print(f"DEBUG: Message sent to {recipient}: {message_obj}")
116
+ else:
117
+ # Store undelivered message
118
+ if recipient not in message_store:
119
+ message_store[recipient] = []
120
+ message_store[recipient].append(message_obj)
121
+ print(f"DEBUG: Message stored for {recipient}: {message_obj}")
122
+ await websocket.send_text(json.dumps({"status": "success", "message": "Message stored for delivery"}))
123
+ except Exception as e:
124
+ # Handle errors gracefully
125
+ await websocket.send_text(json.dumps({"status": "error", "message": "Message processing error"}))
126
+ print(f"DEBUG: Error processing message: {e}")
127
+
128
+ except WebSocketDisconnect:
129
+ print(f"DEBUG: Connection with {username} closed.")
130
+ finally:
131
+ if username:
132
+ await unregister_client(username)