from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse import json import logging from typing import Dict, List from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Transcript WebSocket Backend", version="1.0.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Store active connections and transcripts class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.transcripts: List[Dict] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) logger.info(f"Client connected. Total connections: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) logger.info(f"Client disconnected. Total connections: {len(self.active_connections)}") async def send_personal_message(self, message: str, websocket: WebSocket): try: await websocket.send_text(message) except Exception as e: logger.error(f"Error sending message: {e}") self.disconnect(websocket) async def broadcast(self, message: str): disconnected = [] for connection in self.active_connections: try: await connection.send_text(message) except Exception as e: logger.error(f"Error broadcasting message: {e}") disconnected.append(connection) # Remove disconnected connections for connection in disconnected: self.disconnect(connection) def add_transcript(self, transcript_data: Dict): """Add a transcript entry with timestamp""" transcript_entry = { "timestamp": datetime.now().isoformat(), "type": transcript_data.get("type", "unknown"), "data": transcript_data.get("data", {}), "raw": transcript_data } self.transcripts.append(transcript_entry) logger.info(f"Added transcript: {transcript_entry['type']}") def get_transcripts(self) -> List[Dict]: """Get all transcripts""" return self.transcripts def clear_transcripts(self): """Clear all transcripts""" self.transcripts.clear() logger.info("Cleared all transcripts") manager = ConnectionManager() # WebSocket endpoint @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) try: while True: # Receive message from frontend data = await websocket.receive_text() message = json.loads(data) logger.info(f"Received from client {client_id}: {message}") # Handle different message types message_type = message.get("type", "unknown") if message_type in ["ontranscript", "onresponsetext", "onfunction", "onerror", "onclose", "onopen", "onready", "onsessionended"]: # Store transcript/response data manager.add_transcript(message) # Broadcast to all connected clients (for real-time viewing) await manager.broadcast(json.dumps({ "type": "transcript_update", "data": { "timestamp": datetime.now().isoformat(), "message_type": message_type, "content": message.get("data", {}), "client_id": client_id } })) # Send acknowledgment back to sender await manager.send_personal_message( json.dumps({ "type": "acknowledgment", "data": { "status": "received", "message_type": message_type, "timestamp": datetime.now().isoformat() } }), websocket ) elif message_type == "get_transcripts": # Send all stored transcripts await manager.send_personal_message( json.dumps({ "type": "transcripts_data", "data": { "transcripts": manager.get_transcripts(), "count": len(manager.get_transcripts()) } }), websocket ) elif message_type == "clear_transcripts": # Clear all transcripts manager.clear_transcripts() await manager.broadcast(json.dumps({ "type": "transcripts_cleared", "data": { "timestamp": datetime.now().isoformat(), "message": "All transcripts cleared" } })) else: # Handle unknown message types logger.warning(f"Unknown message type: {message_type}") await manager.send_personal_message( json.dumps({ "type": "error", "data": { "message": f"Unknown message type: {message_type}", "timestamp": datetime.now().isoformat() } }), websocket ) except WebSocketDisconnect: logger.info(f"Client {client_id} disconnected") manager.disconnect(websocket) except Exception as e: logger.error(f"Error in WebSocket connection: {e}") manager.disconnect(websocket) # REST API endpoints for transcript management @app.get("/transcripts") async def get_transcripts(): """Get all stored transcripts""" return { "transcripts": manager.get_transcripts(), "count": len(manager.get_transcripts()), "timestamp": datetime.now().isoformat() } @app.delete("/transcripts") async def clear_transcripts(): """Clear all stored transcripts""" manager.clear_transcripts() return { "message": "All transcripts cleared", "timestamp": datetime.now().isoformat() } @app.get("/transcripts/count") async def get_transcript_count(): """Get the count of stored transcripts""" return { "count": len(manager.get_transcripts()), "timestamp": datetime.now().isoformat() } # Health check endpoint @app.get("/health") async def health_check(): return { "status": "healthy", "connections": len(manager.active_connections), "transcripts_count": len(manager.get_transcripts()), "timestamp": datetime.now().isoformat() } # Root endpoint with real-time transcript viewer @app.get("/", response_class=HTMLResponse) async def root(): return """
📝 No transcripts yet
Connect your AirFGPL app to see real-time updates