nexus-relay / relay.py
ChandimaPrabath's picture
Upload relay.py
c856944 verified
raw
history blame
4.74 kB
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
import json
from datetime import datetime
import httpx
from dotenv import load_dotenv
import os
load_dotenv()
SERVICES_URL = os.getenv("SERVICES_URL")
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins; use specific domains for security
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Dictionary to hold active connections
active_connections = {}
# Dictionary to store undelivered messages
message_store = {}
async def register_client(websocket: WebSocket, username: str):
"""Register a new client."""
active_connections[username] = websocket
print(f"DEBUG: {username} connected.")
# Deliver undelivered messages if any
if username in message_store:
for message in message_store[username]:
print(f"DEBUG: Sending undelivered message to {username}: {message}")
await websocket.send_text(json.dumps(message))
del message_store[username] # Clear delivered messages
print(f"DEBUG: Cleared stored messages for {username}")
async def unregister_client(username: str):
"""Unregister a client."""
if username in active_connections:
del active_connections[username]
print(f"DEBUG: {username} disconnected.")
@app.websocket("/ws")
async def relay_server(websocket: WebSocket):
"""Relay server handling WebSocket connections."""
username = None
try:
await websocket.accept()
print("DEBUG: WebSocket connection accepted.")
# Initial login
login_data = await websocket.receive_text()
print(f"DEBUG: Received login data: {login_data}")
login_details = json.loads(login_data)
username = login_details.get("username")
password = login_details.get("password")
# Authenticate with services.py (user management)
print(f"DEBUG: Authenticating user: {username}")
async with httpx.AsyncClient() as client:
response = await client.post(f"{SERVICES_URL}/login", json={"username": username, "password": password})
if response.status_code == 200:
print(f"DEBUG: Authentication successful for {username}")
await websocket.send_text(json.dumps({"status": "success", "message": "Authenticated"}))
await register_client(websocket, username)
else:
print(f"DEBUG: Authentication failed for {username}")
await websocket.send_text(json.dumps({"status": "error", "message": "Invalid credentials"}))
return
# Relay messages
while True:
try:
message = await websocket.receive_text()
print(f"DEBUG: Received message: {message}")
msg_data = json.loads(message)
recipient = msg_data.get("recipient")
msg_content = msg_data.get("message")
timestamp = datetime.now().isoformat()
# Create message object
message_obj = {
"from": username,
"message": msg_content,
"timestamp": timestamp
}
# Validate recipient and send/deliver message
print(f"DEBUG: Sending message to recipient: {recipient}")
if recipient in active_connections:
recipient_socket = active_connections[recipient]
await recipient_socket.send_text(json.dumps(message_obj))
print(f"DEBUG: Message sent to {recipient}: {message_obj}")
else:
# Store undelivered message
if recipient not in message_store:
message_store[recipient] = []
message_store[recipient].append(message_obj)
print(f"DEBUG: Message stored for {recipient}: {message_obj}")
await websocket.send_text(json.dumps({"status": "success", "message": "Message stored for delivery"}))
except Exception as e:
# Handle errors gracefully
await websocket.send_text(json.dumps({"status": "error", "message": "Message processing error"}))
print(f"DEBUG: Error processing message: {e}")
except WebSocketDisconnect:
print(f"DEBUG: Connection with {username} closed.")
finally:
if username:
await unregister_client(username)