Spaces:
Paused
Paused
| import os | |
| import uuid | |
| import json | |
| import logging | |
| from flask import Blueprint, request, jsonify, send_file, url_for, current_app | |
| from flask_login import login_required, current_user | |
| from backend.models.database import db, Job, Application | |
| from backend.services.interview_engine import ( | |
| generate_first_question, | |
| generate_next_question, | |
| edge_tts_to_file_sync, | |
| whisper_stt, | |
| evaluate_answer | |
| ) | |
| # Additional imports for report generation | |
| from backend.models.database import Application | |
| from backend.services.report_generator import generate_llm_interview_report, create_pdf_report | |
| from flask import abort | |
| interview_api = Blueprint("interview_api", __name__) | |
| def start_interview(): | |
| """ | |
| Start a new interview. Generates the first question based on the user's | |
| resume/profile and the selected job. Always returns a JSON payload | |
| containing the question text and, if available, a URL to an audio | |
| rendition of the question. | |
| """ | |
| try: | |
| data = request.get_json() or {} | |
| job_id = data.get("job_id") | |
| # Validate the job and the user's application | |
| job = Job.query.get_or_404(job_id) | |
| application = Application.query.filter_by( | |
| user_id=current_user.id, | |
| job_id=job_id | |
| ).first() | |
| if not application or not application.extracted_features: | |
| return jsonify({"error": "No application/profile data found."}), 400 | |
| # Parse the candidate's profile | |
| try: | |
| profile = json.loads(application.extracted_features) | |
| except Exception as e: | |
| logging.error(f"Invalid profile JSON: {e}") | |
| return jsonify({"error": "Invalid profile JSON"}), 500 | |
| # Generate the first question using the LLM | |
| question = generate_first_question(profile, job) | |
| if not question: | |
| question = "Tell me about yourself and why you're interested in this position." | |
| # Attempt to generate a TTS audio file for the question | |
| audio_url = None | |
| try: | |
| audio_dir = "/tmp/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| filename = f"q_{uuid.uuid4().hex}.wav" | |
| audio_path = os.path.join(audio_dir, filename) | |
| audio_result = edge_tts_to_file_sync(question, audio_path) | |
| if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
| audio_url = url_for("interview_api.get_audio", filename=filename) | |
| logging.info(f"Audio generated successfully: {audio_url}") | |
| else: | |
| logging.warning("Audio generation failed or file too small") | |
| except Exception as e: | |
| logging.error(f"Error generating TTS audio: {e}") | |
| audio_url = None | |
| return jsonify({ | |
| "question": question, | |
| "audio_url": audio_url | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error in start_interview: {e}") | |
| return jsonify({"error": "Internal server error"}), 500 | |
| import subprocess | |
| def transcribe_audio(): | |
| """Transcribe uploaded .webm audio using ffmpeg conversion and Faster-Whisper""" | |
| audio_file = request.files.get("audio") | |
| if not audio_file: | |
| return jsonify({"error": "No audio file received."}), 400 | |
| temp_dir = "/tmp/interview_temp" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| original_path = os.path.join(temp_dir, f"user_audio_{uuid.uuid4().hex}.webm") | |
| wav_path = original_path.replace(".webm", ".wav") | |
| audio_file.save(original_path) | |
| # Convert to WAV using ffmpeg | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", original_path, wav_path], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL | |
| ) | |
| except Exception as e: | |
| logging.error(f"FFmpeg conversion failed: {e}") | |
| return jsonify({"error": "Failed to convert audio"}), 500 | |
| # Transcribe | |
| transcript = whisper_stt(wav_path) | |
| # Cleanup | |
| try: | |
| os.remove(original_path) | |
| os.remove(wav_path) | |
| except: | |
| pass | |
| if not transcript or not transcript.strip(): | |
| return jsonify({"error": "No speech detected in audio. Please try again."}), 400 | |
| return jsonify({"transcript": transcript}) | |
| # ---------------------------------------------------------------------------- | |
| # Interview report download | |
| # | |
| # Recruiters can download a PDF summarising a candidate's interview performance. | |
| # This route performs several checks: it verifies that the current user has | |
| # recruiter or admin privileges, ensures that the requested application exists | |
| # and belongs to one of the recruiter's jobs, generates a textual report via | |
| # the ``generate_llm_interview_report`` helper, converts it into a PDF, and | |
| # finally sends the PDF as a file attachment. The heavy lifting is | |
| # encapsulated in ``services/report_generator.py`` to keep this route | |
| # lightweight. | |
| def download_report(application_id: int): | |
| """Generate and return a PDF report for a candidate's interview. | |
| The ``application_id`` corresponds to the ID of the Application record | |
| representing a candidate's job application. Only recruiters (or admins) | |
| associated with the job are permitted to access this report. | |
| """ | |
| # Fetch the application or return 404 if not found | |
| application = Application.query.get_or_404(application_id) | |
| # Authorisation: ensure the current user is a recruiter or admin | |
| if current_user.role not in ('recruiter', 'admin'): | |
| # 403 Forbidden if the user lacks permissions | |
| return abort(403) | |
| # Further check that the recruiter owns the job unless admin | |
| job = getattr(application, 'job', None) | |
| if job is None: | |
| return abort(404) | |
| if current_user.role != 'admin' and job.recruiter_id != current_user.id: | |
| return abort(403) | |
| try: | |
| # Generate the textual report using the helper function. At this | |
| # stage, interview answers and evaluations are not stored server‑side, | |
| # so the report focuses on the candidate's application data and | |
| # computed skill match. Should answer/score data be persisted in | |
| # future iterations, ``generate_llm_interview_report`` can be | |
| # extended accordingly without touching this route. | |
| report_text = generate_llm_interview_report(application) | |
| # Convert the text to a PDF. The helper returns a BytesIO buffer | |
| # ready for sending via Flask's ``send_file``. Matplotlib is used | |
| # under the hood to avoid heavy dependencies like reportlab. | |
| pdf_buffer = create_pdf_report(report_text) | |
| pdf_buffer.seek(0) | |
| filename = f"{application.name.replace(' ', '_')}_interview_report.pdf" | |
| return send_file( | |
| pdf_buffer, | |
| download_name=filename, | |
| as_attachment=True, | |
| mimetype='application/pdf' | |
| ) | |
| except Exception as exc: | |
| # Log the error for debugging; return a 500 to the client | |
| logging.error(f"Error generating report for application {application_id}: {exc}") | |
| return jsonify({"error": "Failed to generate report"}), 500 | |
| def process_answer(): | |
| """ | |
| Process a user's answer and return a follow‑up question along with an | |
| evaluation. Always responds with JSON. | |
| """ | |
| try: | |
| data = request.get_json() or {} | |
| answer = data.get("answer", "").strip() | |
| question_idx = data.get("questionIndex", 0) | |
| # ``job_id`` is required to determine how many total questions are | |
| # expected for this interview. Without it we fall back to a | |
| # three‑question interview. | |
| job_id = data.get("job_id") | |
| if not answer: | |
| return jsonify({"error": "No answer provided."}), 400 | |
| # Get the current question for evaluation context | |
| current_question = data.get("current_question", "Tell me about yourself") | |
| # Evaluate the answer | |
| evaluation_result = evaluate_answer(current_question, answer) | |
| # 🔥 Save Q&A in interview_log for report | |
| try: | |
| application = Application.query.filter_by( | |
| user_id=current_user.id, | |
| job_id=job_id | |
| ).first() | |
| if application: | |
| log_data = [] | |
| if application.interview_log: | |
| try: | |
| log_data = json.loads(application.interview_log) | |
| except Exception: | |
| log_data = [] | |
| log_data.append({ | |
| "question": current_question, | |
| "answer": answer, | |
| "evaluation": evaluation_result | |
| }) | |
| application.interview_log = json.dumps(log_data, ensure_ascii=False) | |
| db.session.commit() | |
| except Exception as log_err: | |
| logging.error(f"Error saving interview log: {log_err}") | |
| # Determine the number of questions configured for this job | |
| total_questions = 4 | |
| if job_id is not None: | |
| try: | |
| job = Job.query.get(int(job_id)) | |
| if job and job.num_questions and job.num_questions > 0: | |
| total_questions = job.num_questions | |
| except Exception: | |
| # If lookup fails, keep default | |
| pass | |
| # Check completion. ``question_idx`` is zero‑based; the last index | |
| # corresponds to ``total_questions - 1``. When the current index | |
| # reaches or exceeds this value, the interview is complete. | |
| is_complete = question_idx >= (total_questions - 1) | |
| next_question_text = None | |
| audio_url = None | |
| if not is_complete: | |
| next_idx = question_idx + 1 | |
| # Determine which question to ask next. If next_idx is the last | |
| # question (i.e. equals total_questions - 1), use the final | |
| # question. Otherwise, select a follow‑up question from the | |
| # bank based on ``next_idx - 1`` (because index 0 is for the | |
| # first follow‑up). If out of range, cycle through the list. | |
| if next_idx == (total_questions - 1): | |
| next_question_text = ( | |
| "What are your salary expectations? Are you looking for a full-time or part-time role, " | |
| "and do you prefer remote or on-site work?" | |
| ) | |
| else: | |
| # 🔥 Use Qdrant-powered next question | |
| try: | |
| # You need profile + job for Qdrant context | |
| job = Job.query.get(int(job_id)) if job_id else None | |
| application = Application.query.filter_by( | |
| user_id=current_user.id, | |
| job_id=job_id | |
| ).first() | |
| profile = {} | |
| if application and application.extracted_features: | |
| profile = json.loads(application.extracted_features) | |
| conversation_history = data.get("conversation_history", []) | |
| next_question_text = generate_next_question( | |
| profile, | |
| job, | |
| conversation_history, | |
| answer | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error generating next question from Qdrant: {e}") | |
| next_question_text = "Could you elaborate more on your last point?" | |
| # Try to generate audio for the next question | |
| try: | |
| audio_dir = "/tmp/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| filename = f"q_{uuid.uuid4().hex}.wav" | |
| audio_path = os.path.join(audio_dir, filename) | |
| audio_result = edge_tts_to_file_sync(next_question_text, audio_path) | |
| if audio_result and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000: | |
| audio_url = url_for("interview_api.get_audio", filename=filename) | |
| logging.info(f"Next question audio generated: {audio_url}") | |
| except Exception as e: | |
| logging.error(f"Error generating next question audio: {e}") | |
| audio_url = None | |
| return jsonify({ | |
| "success": True, | |
| "next_question": next_question_text, | |
| "audio_url": audio_url, | |
| "evaluation": evaluation_result, | |
| "is_complete": is_complete, | |
| "redirect_url": url_for("interview_api.interview_complete") if is_complete else None | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error in process_answer: {e}") | |
| return jsonify({"error": "Error processing answer. Please try again."}), 500 | |
| def get_audio(filename: str): | |
| """Serve previously generated TTS audio from the /tmp/audio directory.""" | |
| try: | |
| # Sanitize filename to prevent directory traversal | |
| safe_name = os.path.basename(filename) | |
| if not safe_name.endswith('.wav'): | |
| return jsonify({"error": "Invalid audio file format."}), 400 | |
| audio_path = os.path.join("/tmp/audio", safe_name) | |
| if not os.path.exists(audio_path): | |
| logging.warning(f"Audio file not found: {audio_path}") | |
| return jsonify({"error": "Audio file not found."}), 404 | |
| if os.path.getsize(audio_path) == 0: | |
| logging.warning(f"Audio file is empty: {audio_path}") | |
| return jsonify({"error": "Audio file is empty."}), 404 | |
| return send_file( | |
| audio_path, | |
| mimetype="audio/wav", | |
| as_attachment=False, | |
| conditional=True # Enable range requests for better audio streaming | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error serving audio file {filename}: {e}") | |
| return jsonify({"error": "Error serving audio file."}), 500 | |
| from flask import render_template | |
| def interview_complete(): | |
| """ | |
| Final interview completion page. After the last question has been | |
| answered, redirect here to show the candidate a brief summary of | |
| their overall performance. The summary consists of a percentage | |
| score and a high‑level label (e.g. "Excellent", "Good"). These | |
| values are derived from the candidate's application data and | |
| interview evaluations. | |
| The calculation mirrors the logic used in the PDF report | |
| generation: the skills match ratio contributes 40% of the final | |
| score while the average of the per‑question evaluation ratings | |
| contributes 60%. If no evaluation data is available, a default | |
| average of 0.5 is used. The resulting number is expressed as a | |
| percentage (e.g. "75%") and mapped to a descriptive label. | |
| """ | |
| score = None | |
| feedback_summary = None | |
| try: | |
| # Attempt to locate the most recent application with interview data | |
| # for the current user. Because the completion route does not | |
| # receive a job ID, we fall back to the latest application that | |
| # contains an interview_log. If none exists, the summary will | |
| # remain empty and the template will render placeholders. | |
| application = ( | |
| Application.query | |
| .filter_by(user_id=current_user.id) | |
| .filter(Application.interview_log.isnot(None)) | |
| .order_by(Application.id.desc()) | |
| .first() | |
| ) | |
| if application: | |
| # Parse candidate and job skills from stored JSON. If either | |
| # field is missing or malformed, fall back to empty lists. | |
| try: | |
| candidate_features = json.loads(application.extracted_features) if application.extracted_features else {} | |
| except Exception: | |
| candidate_features = {} | |
| candidate_skills = candidate_features.get('skills', []) or [] | |
| job_skills = [] | |
| try: | |
| job_skills = json.loads(application.job.skills) if application.job and application.job.skills else [] | |
| except Exception: | |
| job_skills = [] | |
| # Compute the skills match ratio. Normalise skills to lower | |
| # case and strip whitespace for comparison. Avoid division | |
| # by zero if the job has no listed skills. | |
| candidate_set = {s.strip().lower() for s in candidate_skills} | |
| job_set = {s.strip().lower() for s in job_skills} | |
| common = candidate_set & job_set | |
| ratio = (len(common) / len(job_set)) if job_set else 0.0 | |
| # Extract per‑question evaluations from the interview log. The | |
| # interview_log stores a list of dictionaries with keys | |
| # "question", "answer" and "evaluation". Each evaluation is | |
| # expected to include a "score" field containing text such | |
| # as "Poor", "Medium", "Good" or "Excellent". Convert | |
| # these descriptors into numeric values in the range [0.2, 1.0] | |
| # similar to the logic used in report generation. | |
| qa_scores = [] | |
| try: | |
| if application.interview_log: | |
| try: | |
| log_data = json.loads(application.interview_log) | |
| except Exception: | |
| log_data = [] | |
| for entry in log_data: | |
| score_text = str(entry.get('evaluation', {}).get('score', '')).lower() | |
| # Map textual scores to numerical values | |
| if ('excellent' in score_text) or ('5' in score_text) or ('10' in score_text): | |
| qa_scores.append(1.0) | |
| elif ('good' in score_text) or ('4' in score_text) or ('8' in score_text) or ('9' in score_text): | |
| qa_scores.append(0.8) | |
| elif ('satisfactory' in score_text) or ('medium' in score_text) or ('3' in score_text) or ('6' in score_text) or ('7' in score_text): | |
| qa_scores.append(0.6) | |
| elif ('needs improvement' in score_text) or ('poor' in score_text) or ('2' in score_text): | |
| qa_scores.append(0.4) | |
| else: | |
| qa_scores.append(0.2) | |
| except Exception: | |
| qa_scores = [] | |
| # Average the QA scores. If no scores were recorded (e.g. if | |
| # the interview_log is empty or malformed), assume a neutral | |
| # average of 0.5 to avoid penalising the candidate for missing | |
| # data. | |
| qa_average = (sum(qa_scores) / len(qa_scores)) if qa_scores else 0.5 | |
| # Weight skills match (40%) and QA average (60%) to derive | |
| # the final overall score. Convert to a percentage for | |
| # display. | |
| overall = (ratio * 0.4) + (qa_average * 0.6) | |
| percentage = overall * 100.0 | |
| # Assign a descriptive label based on the overall score. | |
| if overall >= 0.8: | |
| label = 'Excellent' | |
| elif overall >= 0.65: | |
| label = 'Good' | |
| elif overall >= 0.45: | |
| label = 'Satisfactory' | |
| else: | |
| label = 'Needs Improvement' | |
| # Format the score as a whole‑number percentage. For example | |
| # 0.753 becomes "75%". Note that rounding is applied. | |
| score = f"{percentage:.0f}%" | |
| feedback_summary = label | |
| except Exception as calc_err: | |
| # If any error occurs during calculation, fall back to None values. | |
| logging.error(f"Error computing overall interview score: {calc_err}") | |
| return render_template( | |
| "closing.html", | |
| score=score, | |
| feedback_summary=feedback_summary | |
| ) |