Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query | |
| from sqlalchemy.orm import Session | |
| from typing import Dict, Any | |
| import json | |
| import uuid | |
| import base64 | |
| from datetime import datetime | |
| from app.database import get_db | |
| from app.models.user import User | |
| from app.models.interview import Interview, InterviewResult | |
| from app.core.security import decode_token | |
| from app.core.exceptions import AuthenticationException | |
| from app.agents.voice_interview_agent import VoiceInterviewAgent | |
| from app.services.interview_service import InterviewService | |
| router = APIRouter() | |
| class ConnectionManager: | |
| """Manage WebSocket connections""" | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| async def connect(self, websocket: WebSocket, connection_id: str): | |
| await websocket.accept() | |
| self.active_connections[connection_id] = websocket | |
| def disconnect(self, connection_id: str): | |
| if connection_id in self.active_connections: | |
| del self.active_connections[connection_id] | |
| async def send_json(self, connection_id: str, data: dict): | |
| if connection_id in self.active_connections: | |
| websocket = self.active_connections[connection_id] | |
| await websocket.send_json(data) | |
| async def send_bytes(self, connection_id: str, data: bytes): | |
| if connection_id in self.active_connections: | |
| websocket = self.active_connections[connection_id] | |
| await websocket.send_bytes(data) | |
| manager = ConnectionManager() | |
| async def get_current_user_ws(token: str, db: Session) -> User: | |
| """Authenticate WebSocket connection""" | |
| payload = decode_token(token) | |
| if payload is None or payload.get("type") != "access": | |
| raise AuthenticationException("Invalid token") | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise AuthenticationException("Invalid token payload") | |
| user = db.query(User).filter(User.id == uuid.UUID(user_id)).first() | |
| if not user or not user.is_active: | |
| raise AuthenticationException("User not found or inactive") | |
| return user | |
| async def voice_interview_websocket( | |
| websocket: WebSocket, | |
| interview_id: str, | |
| token: str = Query(...), | |
| mode: str = Query("voice"), # "voice" or "text" | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Real-time Voice Interview WebSocket | |
| Modes: | |
| - voice: Full voice mode (audio in/out) | |
| - text: Text mode (text in/out) | |
| Message Types: | |
| Client → Server: | |
| 1. {"type": "audio_chunk", "data": "<base64_audio>"} # Voice mode | |
| 2. {"type": "answer", "answer_text": "..."} # Text mode | |
| 3. {"type": "audio_complete"} # Signal end of audio | |
| 4. {"type": "ping"} # Keep-alive | |
| Server → Client: | |
| 1. {"type": "welcome", "message": "...", "mode": "voice"} | |
| 2. {"type": "question_audio", "data": "<base64_audio>", "question": {...}} | |
| 3. {"type": "question_text", "question": {...}} | |
| 4. {"type": "transcription", "text": "...", "confidence": 0.95} | |
| 5. {"type": "feedback", "score": 8, "feedback": "...", ...} | |
| 6. {"type": "interview_complete", "report": {...}} | |
| """ | |
| connection_id = f"{interview_id}_{uuid.uuid4()}" | |
| try: | |
| # Authenticate | |
| user = await get_current_user_ws(token, db) | |
| # Get interview | |
| interview = db.query(Interview).filter( | |
| Interview.id == uuid.UUID(interview_id), | |
| Interview.user_id == user.id | |
| ).first() | |
| if not interview: | |
| await websocket.close(code=4004, reason="Interview not found") | |
| return | |
| if interview.status == "completed": | |
| await websocket.close(code=4000, reason="Interview already completed") | |
| return | |
| # Accept connection | |
| await manager.connect(websocket, connection_id) | |
| # Update interview status | |
| if interview.status == "draft": | |
| interview.status = "in_progress" | |
| interview.started_at = datetime.utcnow() | |
| db.commit() | |
| # Get resume | |
| resume_data = InterviewService.get_user_resume(db, user.id) | |
| # Initialize Voice AI Agent | |
| agent = VoiceInterviewAgent( | |
| interview_type=interview.interview_type, | |
| target_role=interview.target_role, | |
| questions=interview.questions, | |
| resume_data=resume_data | |
| ) | |
| # Send welcome message | |
| await manager.send_json(connection_id, { | |
| "type": "welcome", | |
| "message": f"Welcome to your {interview.interview_type} interview for {interview.target_role}", | |
| "mode": mode, | |
| "total_questions": agent.total_questions | |
| }) | |
| # Send first question | |
| await send_next_question(connection_id, agent, mode) | |
| # Audio buffer for voice mode | |
| audio_buffer = bytearray() | |
| # Listen for messages | |
| while True: | |
| # Receive message | |
| message = await websocket.receive() | |
| # Handle different message types | |
| if "text" in message: | |
| data = json.loads(message["text"]) | |
| msg_type = data.get("type") | |
| if msg_type == "audio_chunk": | |
| # Voice mode: accumulate audio chunks | |
| if mode == "voice": | |
| audio_data = base64.b64decode(data.get("data", "")) | |
| audio_buffer.extend(audio_data) | |
| elif msg_type == "audio_complete": | |
| # Voice mode: process complete audio | |
| if mode == "voice" and audio_buffer: | |
| await process_voice_answer( | |
| connection_id, agent, bytes(audio_buffer), db, interview, user | |
| ) | |
| audio_buffer.clear() | |
| elif msg_type == "answer": | |
| # Text mode: process text answer | |
| if mode == "text": | |
| answer_text = data.get("answer_text", "") | |
| time_taken = data.get("time_taken", 0) | |
| if not answer_text: | |
| await manager.send_json(connection_id, { | |
| "type": "error", | |
| "message": "Answer cannot be empty" | |
| }) | |
| continue | |
| await process_text_answer( | |
| connection_id, agent, answer_text, time_taken, db, interview, user | |
| ) | |
| elif msg_type == "ping": | |
| await manager.send_json(connection_id, {"type": "pong"}) | |
| elif msg_type == "end_interview": | |
| interview.status = "abandoned" | |
| db.commit() | |
| break | |
| elif "bytes" in message: | |
| # Direct binary audio data | |
| if mode == "voice": | |
| audio_buffer.extend(message["bytes"]) | |
| except WebSocketDisconnect: | |
| print(f"WebSocket disconnected: {connection_id}") | |
| manager.disconnect(connection_id) | |
| if interview.status == "in_progress": | |
| interview.status = "abandoned" | |
| db.commit() | |
| except Exception as e: | |
| print(f"WebSocket error: {e}") | |
| await websocket.close(code=1011, reason=f"Server error: {str(e)}") | |
| manager.disconnect(connection_id) | |
| finally: | |
| manager.disconnect(connection_id) | |
| async def send_next_question(connection_id: str, agent: VoiceInterviewAgent, mode: str): | |
| """Send next question to client""" | |
| question = agent.get_current_question() | |
| if not question: | |
| return | |
| # Send question metadata | |
| await manager.send_json(connection_id, { | |
| "type": "question", | |
| "question_number": question["id"], | |
| "question_text": question["text"], | |
| "category": question["category"], | |
| "difficulty": question["difficulty"], | |
| "total_questions": agent.total_questions, | |
| "mode": mode | |
| }) | |
| # If voice mode, generate and send audio | |
| if mode == "voice": | |
| await manager.send_json(connection_id, { | |
| "type": "generating_audio", | |
| "message": "Generating question audio..." | |
| }) | |
| # Generate TTS audio | |
| audio_data = agent.get_question_audio(question["text"]) | |
| if audio_data: | |
| # Send audio as base64 | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| await manager.send_json(connection_id, { | |
| "type": "question_audio", | |
| "data": audio_base64, | |
| "format": "mp3" | |
| }) | |
| async def process_voice_answer( | |
| connection_id: str, | |
| agent: VoiceInterviewAgent, | |
| audio_data: bytes, | |
| db: Session, | |
| interview: Interview, | |
| user: User | |
| ): | |
| """Process voice answer""" | |
| # Send processing message | |
| await manager.send_json(connection_id, { | |
| "type": "processing", | |
| "message": "Transcribing your answer..." | |
| }) | |
| # Transcribe audio | |
| transcription_result = await agent.transcribe_answer(audio_data) | |
| if transcription_result.get("error"): | |
| await manager.send_json(connection_id, { | |
| "type": "error", | |
| "message": f"Transcription failed: {transcription_result['error']}" | |
| }) | |
| return | |
| transcript = transcription_result["transcript"] | |
| voice_analysis = transcription_result.get("voice_analysis") | |
| # Send transcription to client | |
| await manager.send_json(connection_id, { | |
| "type": "transcription", | |
| "text": transcript, | |
| "confidence": transcription_result.get("confidence", 0.0), | |
| "voice_analysis": voice_analysis | |
| }) | |
| # Evaluate answer | |
| await manager.send_json(connection_id, { | |
| "type": "evaluating", | |
| "message": "Evaluating your answer..." | |
| }) | |
| evaluation = await agent.evaluate_answer( | |
| transcript, | |
| voice_analysis=voice_analysis, | |
| time_taken=voice_analysis.get("duration_seconds", 0) if voice_analysis else 0 | |
| ) | |
| # Send feedback | |
| await manager.send_json(connection_id, { | |
| "type": "feedback", | |
| "question_number": agent.current_question_index, | |
| "score": evaluation["score"], | |
| "feedback": evaluation["feedback"], | |
| "strengths": evaluation.get("strengths", []), | |
| "improvements": evaluation.get("improvements", []) | |
| }) | |
| # Check if complete | |
| if agent.is_complete: | |
| await complete_interview(connection_id, agent, db, interview, user) | |
| else: | |
| # Send next question | |
| await send_next_question(connection_id, agent, "voice") | |
| async def process_text_answer( | |
| connection_id: str, | |
| agent: VoiceInterviewAgent, | |
| answer_text: str, | |
| time_taken: int, | |
| db: Session, | |
| interview: Interview, | |
| user: User | |
| ): | |
| """Process text answer""" | |
| await manager.send_json(connection_id, { | |
| "type": "processing", | |
| "message": "Evaluating your answer..." | |
| }) | |
| # Evaluate (no voice analysis for text mode) | |
| evaluation = await agent.evaluate_answer(answer_text, time_taken=time_taken) | |
| # Send feedback | |
| await manager.send_json(connection_id, { | |
| "type": "feedback", | |
| "question_number": agent.current_question_index, | |
| "score": evaluation["score"], | |
| "feedback": evaluation["feedback"], | |
| "strengths": evaluation.get("strengths", []), | |
| "improvements": evaluation.get("improvements", []) | |
| }) | |
| # Check if complete | |
| if agent.is_complete: | |
| await complete_interview(connection_id, agent, db, interview, user) | |
| else: | |
| # Send next question | |
| await send_next_question(connection_id, agent, "text") | |
| async def complete_interview( | |
| connection_id: str, | |
| agent: VoiceInterviewAgent, | |
| db: Session, | |
| interview: Interview, | |
| user: User | |
| ): | |
| """Complete interview and generate report""" | |
| await manager.send_json(connection_id, { | |
| "type": "generating_report", | |
| "message": "Generating your interview report..." | |
| }) | |
| # Generate final report | |
| final_report = await agent.generate_final_report() | |
| # Save to database | |
| interview_result = InterviewResult( | |
| interview_id=interview.id, | |
| user_id=user.id, | |
| overall_score=final_report["overall_score"], | |
| summary=final_report["summary"], | |
| detailed_feedback=final_report["detailed_feedback"], | |
| improvement_areas=final_report["improvement_areas"], | |
| strengths=final_report["strengths"], | |
| transcript=final_report["transcript"], | |
| voice_analysis=final_report.get("voice_summary"), | |
| ai_remarks=final_report.get("next_steps", "") | |
| ) | |
| db.add(interview_result) | |
| # Update interview | |
| interview.status = "completed" | |
| interview.completed_at = datetime.utcnow() | |
| if interview.started_at: | |
| interview.duration_seconds = int((datetime.utcnow() - interview.started_at).total_seconds()) | |
| db.commit() | |
| # Send complete message | |
| await manager.send_json(connection_id, { | |
| "type": "interview_complete", | |
| "overall_score": final_report["overall_score"], | |
| "summary": final_report["summary"], | |
| "detailed_feedback": final_report["detailed_feedback"], | |
| "strengths": final_report["strengths"], | |
| "improvement_areas": final_report["improvement_areas"], | |
| "voice_summary": final_report.get("voice_summary"), | |
| "recommendation": final_report.get("recommendation"), | |
| "next_steps": final_report.get("next_steps"), | |
| "message": "Interview completed successfully!" | |
| }) | |