from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query from sqlalchemy.orm import Session from typing import Dict, Any import json import uuid import base64 from datetime import datetime from app.database import get_db from app.models.user import User from app.models.interview import Interview, InterviewResult from app.core.security import decode_token from app.core.exceptions import AuthenticationException from app.agents.voice_interview_agent import VoiceInterviewAgent from app.services.interview_service import InterviewService router = APIRouter() class ConnectionManager: """Manage WebSocket connections""" def __init__(self): self.active_connections: Dict[str, WebSocket] = {} async def connect(self, websocket: WebSocket, connection_id: str): await websocket.accept() self.active_connections[connection_id] = websocket def disconnect(self, connection_id: str): if connection_id in self.active_connections: del self.active_connections[connection_id] async def send_json(self, connection_id: str, data: dict): if connection_id in self.active_connections: websocket = self.active_connections[connection_id] await websocket.send_json(data) async def send_bytes(self, connection_id: str, data: bytes): if connection_id in self.active_connections: websocket = self.active_connections[connection_id] await websocket.send_bytes(data) manager = ConnectionManager() async def get_current_user_ws(token: str, db: Session) -> User: """Authenticate WebSocket connection""" payload = decode_token(token) if payload is None or payload.get("type") != "access": raise AuthenticationException("Invalid token") user_id = payload.get("sub") if not user_id: raise AuthenticationException("Invalid token payload") user = db.query(User).filter(User.id == uuid.UUID(user_id)).first() if not user or not user.is_active: raise AuthenticationException("User not found or inactive") return user @router.websocket("/interview/{interview_id}") async def voice_interview_websocket( websocket: WebSocket, interview_id: str, token: str = Query(...), mode: str = Query("voice"), # "voice" or "text" db: Session = Depends(get_db) ): """ Real-time Voice Interview WebSocket Modes: - voice: Full voice mode (audio in/out) - text: Text mode (text in/out) Message Types: Client → Server: 1. {"type": "audio_chunk", "data": ""} # Voice mode 2. {"type": "answer", "answer_text": "..."} # Text mode 3. {"type": "audio_complete"} # Signal end of audio 4. {"type": "ping"} # Keep-alive Server → Client: 1. {"type": "welcome", "message": "...", "mode": "voice"} 2. {"type": "question_audio", "data": "", "question": {...}} 3. {"type": "question_text", "question": {...}} 4. {"type": "transcription", "text": "...", "confidence": 0.95} 5. {"type": "feedback", "score": 8, "feedback": "...", ...} 6. {"type": "interview_complete", "report": {...}} """ connection_id = f"{interview_id}_{uuid.uuid4()}" try: # Authenticate user = await get_current_user_ws(token, db) # Get interview interview = db.query(Interview).filter( Interview.id == uuid.UUID(interview_id), Interview.user_id == user.id ).first() if not interview: await websocket.close(code=4004, reason="Interview not found") return if interview.status == "completed": await websocket.close(code=4000, reason="Interview already completed") return # Accept connection await manager.connect(websocket, connection_id) # Update interview status if interview.status == "draft": interview.status = "in_progress" interview.started_at = datetime.utcnow() db.commit() # Get resume resume_data = InterviewService.get_user_resume(db, user.id) # Initialize Voice AI Agent agent = VoiceInterviewAgent( interview_type=interview.interview_type, target_role=interview.target_role, questions=interview.questions, resume_data=resume_data ) # Send welcome message await manager.send_json(connection_id, { "type": "welcome", "message": f"Welcome to your {interview.interview_type} interview for {interview.target_role}", "mode": mode, "total_questions": agent.total_questions }) # Send first question await send_next_question(connection_id, agent, mode) # Audio buffer for voice mode audio_buffer = bytearray() # Listen for messages while True: # Receive message message = await websocket.receive() # Handle different message types if "text" in message: data = json.loads(message["text"]) msg_type = data.get("type") if msg_type == "audio_chunk": # Voice mode: accumulate audio chunks if mode == "voice": audio_data = base64.b64decode(data.get("data", "")) audio_buffer.extend(audio_data) elif msg_type == "audio_complete": # Voice mode: process complete audio if mode == "voice" and audio_buffer: await process_voice_answer( connection_id, agent, bytes(audio_buffer), db, interview, user ) audio_buffer.clear() elif msg_type == "answer": # Text mode: process text answer if mode == "text": answer_text = data.get("answer_text", "") time_taken = data.get("time_taken", 0) if not answer_text: await manager.send_json(connection_id, { "type": "error", "message": "Answer cannot be empty" }) continue await process_text_answer( connection_id, agent, answer_text, time_taken, db, interview, user ) elif msg_type == "ping": await manager.send_json(connection_id, {"type": "pong"}) elif msg_type == "end_interview": interview.status = "abandoned" db.commit() break elif "bytes" in message: # Direct binary audio data if mode == "voice": audio_buffer.extend(message["bytes"]) except WebSocketDisconnect: print(f"WebSocket disconnected: {connection_id}") manager.disconnect(connection_id) if interview.status == "in_progress": interview.status = "abandoned" db.commit() except Exception as e: print(f"WebSocket error: {e}") await websocket.close(code=1011, reason=f"Server error: {str(e)}") manager.disconnect(connection_id) finally: manager.disconnect(connection_id) async def send_next_question(connection_id: str, agent: VoiceInterviewAgent, mode: str): """Send next question to client""" question = agent.get_current_question() if not question: return # Send question metadata await manager.send_json(connection_id, { "type": "question", "question_number": question["id"], "question_text": question["text"], "category": question["category"], "difficulty": question["difficulty"], "total_questions": agent.total_questions, "mode": mode }) # If voice mode, generate and send audio if mode == "voice": await manager.send_json(connection_id, { "type": "generating_audio", "message": "Generating question audio..." }) # Generate TTS audio audio_data = agent.get_question_audio(question["text"]) if audio_data: # Send audio as base64 audio_base64 = base64.b64encode(audio_data).decode('utf-8') await manager.send_json(connection_id, { "type": "question_audio", "data": audio_base64, "format": "mp3" }) async def process_voice_answer( connection_id: str, agent: VoiceInterviewAgent, audio_data: bytes, db: Session, interview: Interview, user: User ): """Process voice answer""" # Send processing message await manager.send_json(connection_id, { "type": "processing", "message": "Transcribing your answer..." }) # Transcribe audio transcription_result = await agent.transcribe_answer(audio_data) if transcription_result.get("error"): await manager.send_json(connection_id, { "type": "error", "message": f"Transcription failed: {transcription_result['error']}" }) return transcript = transcription_result["transcript"] voice_analysis = transcription_result.get("voice_analysis") # Send transcription to client await manager.send_json(connection_id, { "type": "transcription", "text": transcript, "confidence": transcription_result.get("confidence", 0.0), "voice_analysis": voice_analysis }) # Evaluate answer await manager.send_json(connection_id, { "type": "evaluating", "message": "Evaluating your answer..." }) evaluation = await agent.evaluate_answer( transcript, voice_analysis=voice_analysis, time_taken=voice_analysis.get("duration_seconds", 0) if voice_analysis else 0 ) # Send feedback await manager.send_json(connection_id, { "type": "feedback", "question_number": agent.current_question_index, "score": evaluation["score"], "feedback": evaluation["feedback"], "strengths": evaluation.get("strengths", []), "improvements": evaluation.get("improvements", []) }) # Check if complete if agent.is_complete: await complete_interview(connection_id, agent, db, interview, user) else: # Send next question await send_next_question(connection_id, agent, "voice") async def process_text_answer( connection_id: str, agent: VoiceInterviewAgent, answer_text: str, time_taken: int, db: Session, interview: Interview, user: User ): """Process text answer""" await manager.send_json(connection_id, { "type": "processing", "message": "Evaluating your answer..." }) # Evaluate (no voice analysis for text mode) evaluation = await agent.evaluate_answer(answer_text, time_taken=time_taken) # Send feedback await manager.send_json(connection_id, { "type": "feedback", "question_number": agent.current_question_index, "score": evaluation["score"], "feedback": evaluation["feedback"], "strengths": evaluation.get("strengths", []), "improvements": evaluation.get("improvements", []) }) # Check if complete if agent.is_complete: await complete_interview(connection_id, agent, db, interview, user) else: # Send next question await send_next_question(connection_id, agent, "text") async def complete_interview( connection_id: str, agent: VoiceInterviewAgent, db: Session, interview: Interview, user: User ): """Complete interview and generate report""" await manager.send_json(connection_id, { "type": "generating_report", "message": "Generating your interview report..." }) # Generate final report final_report = await agent.generate_final_report() # Save to database interview_result = InterviewResult( interview_id=interview.id, user_id=user.id, overall_score=final_report["overall_score"], summary=final_report["summary"], detailed_feedback=final_report["detailed_feedback"], improvement_areas=final_report["improvement_areas"], strengths=final_report["strengths"], transcript=final_report["transcript"], voice_analysis=final_report.get("voice_summary"), ai_remarks=final_report.get("next_steps", "") ) db.add(interview_result) # Update interview interview.status = "completed" interview.completed_at = datetime.utcnow() if interview.started_at: interview.duration_seconds = int((datetime.utcnow() - interview.started_at).total_seconds()) db.commit() # Send complete message await manager.send_json(connection_id, { "type": "interview_complete", "overall_score": final_report["overall_score"], "summary": final_report["summary"], "detailed_feedback": final_report["detailed_feedback"], "strengths": final_report["strengths"], "improvement_areas": final_report["improvement_areas"], "voice_summary": final_report.get("voice_summary"), "recommendation": final_report.get("recommendation"), "next_steps": final_report.get("next_steps"), "message": "Interview completed successfully!" })