Spaces:
Paused
Paused
| import os | |
| import uuid | |
| import json | |
| import logging | |
| from flask import Blueprint, request, jsonify, send_file, url_for, current_app | |
| from flask_login import login_required, current_user | |
| from backend.models.database import db, Job, Application | |
| from backend.services.interview_engine import ( | |
| generate_first_question, | |
| edge_tts_to_file_sync, | |
| whisper_stt, | |
| evaluate_answer | |
| ) | |
| interview_api = Blueprint("interview_api", __name__) | |
| def start_interview(): | |
| """ | |
| Start a new interview. Generates the first question based on the user's | |
| resume/profile and the selected job. Always returns a JSON payload | |
| containing the question text and, if available, a URL to an audio | |
| rendition of the question. | |
| """ | |
| try: | |
| data = request.get_json() or {} | |
| job_id = data.get("job_id") | |
| # Validate the job and the user's application | |
| job = Job.query.get_or_404(job_id) | |
| application = Application.query.filter_by( | |
| user_id=current_user.id, | |
| job_id=job_id | |
| ).first() | |
| if not application or not application.extracted_features: | |
| return jsonify({"error": "No application/profile data found."}), 400 | |
| # Parse the candidate's profile | |
| try: | |
| profile = json.loads(application.extracted_features) | |
| except Exception as e: | |
| logging.error(f"Invalid profile JSON: {e}") | |
| return jsonify({"error": "Invalid profile JSON"}), 500 | |
| # Generate the first question using the LLM | |
| question = generate_first_question(profile, job) | |
| if not question: | |
| question = "Tell me about yourself and why you're interested in this position." | |
| # Attempt to generate a TTS audio file for the question | |
| audio_url = None | |
| try: | |
| audio_dir = "/tmp/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| filename = f"q_{uuid.uuid4().hex}.wav" | |
| audio_path = os.path.join(audio_dir, filename) | |
| audio_result = edge_tts_to_file_sync(question, audio_path) | |
| if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
| audio_url = url_for("interview_api.get_audio", filename=filename) | |
| logging.info(f"Audio generated successfully: {audio_url}") | |
| else: | |
| logging.warning("Audio generation failed or file too small") | |
| except Exception as e: | |
| logging.error(f"Error generating TTS audio: {e}") | |
| audio_url = None | |
| return jsonify({ | |
| "question": question, | |
| "audio_url": audio_url | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error in start_interview: {e}") | |
| return jsonify({"error": "Internal server error"}), 500 | |
| import subprocess | |
| def transcribe_audio(): | |
| """Transcribe uploaded .webm audio using ffmpeg conversion and Faster-Whisper""" | |
| audio_file = request.files.get("audio") | |
| if not audio_file: | |
| return jsonify({"error": "No audio file received."}), 400 | |
| temp_dir = "/tmp/interview_temp" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| original_path = os.path.join(temp_dir, f"user_audio_{uuid.uuid4().hex}.webm") | |
| wav_path = original_path.replace(".webm", ".wav") | |
| audio_file.save(original_path) | |
| # Convert to WAV using ffmpeg | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", original_path, wav_path], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL | |
| ) | |
| except Exception as e: | |
| logging.error(f"FFmpeg conversion failed: {e}") | |
| return jsonify({"error": "Failed to convert audio"}), 500 | |
| # Transcribe | |
| transcript = whisper_stt(wav_path) | |
| # Cleanup | |
| try: | |
| os.remove(original_path) | |
| os.remove(wav_path) | |
| except: | |
| pass | |
| if not transcript or not transcript.strip(): | |
| return jsonify({"error": "No speech detected in audio. Please try again."}), 400 | |
| return jsonify({"transcript": transcript}) | |
| def process_answer(): | |
| """ | |
| Process a user's answer and return a follow‑up question along with an | |
| evaluation. Always responds with JSON. | |
| """ | |
| try: | |
| data = request.get_json() or {} | |
| answer = data.get("answer", "").strip() | |
| question_idx = data.get("questionIndex", 0) | |
| if not answer: | |
| return jsonify({"error": "No answer provided."}), 400 | |
| # Get the current question for evaluation context | |
| current_question = data.get("current_question", "Tell me about yourself") | |
| # Evaluate the answer | |
| evaluation_result = evaluate_answer(current_question, answer) | |
| # Determine completion (3 questions in total, zero‑based index) | |
| is_complete = question_idx >= 2 | |
| next_question_text = None | |
| audio_url = None | |
| if not is_complete: | |
| # Generate the next question based on the current question index. | |
| # | |
| # Question indices are zero‑based: 0 for the first follow‑up question, | |
| # 1 for the second, and so on. We want the final (third) question | |
| # delivered by this route to always probe the candidate's salary | |
| # expectations and preferred working arrangement. After the user | |
| # answers this question (i.e. when ``question_idx`` becomes 2), the | |
| # interview is considered complete and no further questions are | |
| # generated. | |
| if question_idx == 0: | |
| next_question_text = "Can you describe a challenging project you've worked on and how you overcame the difficulties?" | |
| elif question_idx == 1: | |
| # Salary expectations question for the final interview round | |
| next_question_text = ( | |
| "What are your salary expectations? Are you looking for a full-time or part-time role, " | |
| "and do you prefer remote or on-site work?" | |
| ) | |
| else: | |
| # Fallback for unexpected indices; ask if the candidate has any questions. | |
| next_question_text = "Do you have any questions about the role or our company?" | |
| # Try to generate audio for the next question | |
| try: | |
| audio_dir = "/tmp/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| filename = f"q_{uuid.uuid4().hex}.wav" | |
| audio_path = os.path.join(audio_dir, filename) | |
| audio_result = edge_tts_to_file_sync(next_question_text, audio_path) | |
| if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
| audio_url = url_for("interview_api.get_audio", filename=filename) | |
| logging.info(f"Next question audio generated: {audio_url}") | |
| except Exception as e: | |
| logging.error(f"Error generating next question audio: {e}") | |
| audio_url = None | |
| return jsonify({ | |
| "success": True, | |
| "next_question": next_question_text, | |
| "audio_url": audio_url, | |
| "evaluation": evaluation_result, | |
| "is_complete": is_complete | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error in process_answer: {e}") | |
| return jsonify({"error": "Error processing answer. Please try again."}), 500 | |
| def get_audio(filename: str): | |
| """Serve previously generated TTS audio from the /tmp/audio directory.""" | |
| try: | |
| # Sanitize filename to prevent directory traversal | |
| safe_name = os.path.basename(filename) | |
| if not safe_name.endswith('.wav'): | |
| return jsonify({"error": "Invalid audio file format."}), 400 | |
| audio_path = os.path.join("/tmp/audio", safe_name) | |
| if not os.path.exists(audio_path): | |
| logging.warning(f"Audio file not found: {audio_path}") | |
| return jsonify({"error": "Audio file not found."}), 404 | |
| if os.path.getsize(audio_path) == 0: | |
| logging.warning(f"Audio file is empty: {audio_path}") | |
| return jsonify({"error": "Audio file is empty."}), 404 | |
| return send_file( | |
| audio_path, | |
| mimetype="audio/wav", | |
| as_attachment=False, | |
| conditional=True # Enable range requests for better audio streaming | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error serving audio file {filename}: {e}") | |
| return jsonify({"error": "Error serving audio file."}), 500 |