Spaces:
Sleeping
Sleeping
| """ | |
| app.py | |
| ------ | |
| Main Flask application for HireScope AI — Resume Screening System. | |
| Includes MongoDB, Auth, Async Processing, and Sentence Transformers. | |
| """ | |
| import os | |
| import logging | |
| import concurrent.futures | |
| from functools import wraps | |
| from flask import ( | |
| Flask, render_template, request, redirect, url_for, | |
| flash, session, jsonify | |
| ) | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| from dotenv import load_dotenv | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| # Load environment variables | |
| load_dotenv() | |
| # Cloudinary Configuration | |
| cloudinary.config( | |
| cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"), | |
| api_key=os.getenv("CLOUDINARY_API_KEY"), | |
| api_secret=os.getenv("CLOUDINARY_API_SECRET"), | |
| secure=True, | |
| ) | |
| from db import ( | |
| create_user, authenticate_user, insert_candidate, | |
| get_all_candidates, get_candidate_by_id, update_candidate_audio, | |
| update_candidate_audio_error, set_candidate_audio_processing, | |
| clear_all_candidates | |
| ) | |
| from werkzeug.utils import secure_filename | |
| from resume_parser import extract_text, clean_text | |
| from skill_extractor import extract_all, SKILLS_LIST | |
| from job_matcher import calculate_match_score, find_skill_gaps, rank_candidates | |
| from audio_transcriber import transcribe_from_local_file | |
| app = Flask(__name__) | |
| logging.basicConfig( | |
| level=os.getenv("LOG_LEVEL", "INFO"), | |
| format="%(asctime)s %(levelname)s [%(name)s] %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Secret key | |
| app.secret_key = os.getenv("SECRET_KEY") | |
| # Hugging Face / Proxy Configuration | |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) | |
| # Session Configuration for iframe compatibility (Hugging Face) | |
| app.config.update( | |
| SESSION_COOKIE_SECURE=True, | |
| SESSION_COOKIE_SAMESITE='None', | |
| SESSION_COOKIE_HTTPONLY=True, | |
| ) | |
| # Optional: Initialize Google Generative AI if key is present | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") | |
| if GOOGLE_API_KEY: | |
| try: | |
| import google.generativeai as genai | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| except Exception: | |
| pass | |
| BASE_DIR = os.path.abspath(os.path.dirname(__file__)) | |
| UPLOAD_FOLDER_RESUMES = os.path.join(BASE_DIR, "uploads", "resumes") | |
| UPLOAD_FOLDER_AUDIO = os.path.join(BASE_DIR, "uploads", "audio") | |
| os.makedirs(UPLOAD_FOLDER_RESUMES, exist_ok=True) | |
| os.makedirs(UPLOAD_FOLDER_AUDIO, exist_ok=True) | |
| ALLOWED_RESUME_EXTENSIONS = {"pdf", "docx"} | |
| ALLOWED_AUDIO_EXTENSIONS = {"mp3", "wav", "m4a", "flac", "ogg", "webm"} | |
| # Thread pool for async audio transcription | |
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) | |
| def allowed_file(filename, allowed_extensions): | |
| return "." in filename and filename.rsplit(".", 1)[1].lower() in allowed_extensions | |
| # ── Authentication Helper ── | |
| def login_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if "user_id" not in session: | |
| flash("Please log in to access this page.", "warning") | |
| return redirect(url_for("login")) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def login(): | |
| if request.method == "POST": | |
| email = request.form.get("email") | |
| password = request.form.get("password") | |
| user = authenticate_user(email, password) | |
| if user: | |
| session["user_id"] = user["_id"] | |
| session["username"] = user["username"] | |
| session["role"] = user["role"] | |
| flash("Logged in successfully!", "success") | |
| return redirect(url_for("index")) | |
| else: | |
| flash("Invalid email or password", "error") | |
| return render_template("login.html") | |
| def register(): | |
| if request.method == "POST": | |
| username = request.form.get("username") | |
| email = request.form.get("email") | |
| password = request.form.get("password") | |
| success, msg = create_user(username, email, password) | |
| if success: | |
| flash("Registration successful. Please login.", "success") | |
| return redirect(url_for("login")) | |
| else: | |
| flash(msg, "error") | |
| return render_template("register.html") | |
| def logout(): | |
| session.clear() | |
| flash("Logged out successfully.", "info") | |
| return redirect(url_for("login")) | |
| def index(): | |
| candidates = get_all_candidates(session.get("user_id")) | |
| # Calculate stats | |
| avg_score = 0 | |
| if candidates: | |
| scores = [c.get("match_score", 0) for c in candidates] | |
| avg_score = round(sum(scores) / len(scores), 1) | |
| return render_template( | |
| "index.html", | |
| candidate_count=len(candidates), | |
| avg_score=avg_score, | |
| recent_candidates=candidates[:5] | |
| ) | |
| def upload_resume(): | |
| if "resume" not in request.files: | |
| flash("No file selected.", "error") | |
| return redirect(url_for("index")) | |
| file = request.files["resume"] | |
| if file.filename == "": | |
| flash("No file selected.", "error") | |
| return redirect(url_for("index")) | |
| if not allowed_file(file.filename, ALLOWED_RESUME_EXTENSIONS): | |
| flash("Invalid file type. Please upload PDF or DOCX.", "error") | |
| return redirect(url_for("index")) | |
| filename = secure_filename(file.filename) | |
| if not filename: | |
| filename = "resume_file" | |
| filepath = os.path.join(UPLOAD_FOLDER_RESUMES, filename) | |
| file.save(filepath) | |
| raw_text = extract_text(filepath) | |
| if not raw_text.strip(): | |
| flash("Could not extract text. Please ensure the PDF/DOCX is not just scanned images.", "error") | |
| return redirect(url_for("index")) | |
| cleaned_text = clean_text(raw_text) | |
| extracted_info = extract_all(cleaned_text) | |
| job_description = request.form.get("job_description", "").strip() | |
| match_score = 0.0 | |
| skill_gaps = {"matched": [], "missing": []} | |
| jd_skills = [] | |
| if job_description: | |
| from skill_extractor import extract_skills | |
| jd_skills = extract_skills(job_description) | |
| match_score = calculate_match_score(cleaned_text, job_description, extracted_info["skills"], jd_skills) | |
| skill_gaps = find_skill_gaps(extracted_info["skills"], job_description, SKILLS_LIST, jd_skills) | |
| # --- Cloudinary Upload Resume --- | |
| resume_url = "" | |
| try: | |
| cloudinary_response = cloudinary.uploader.upload( | |
| filepath, | |
| resource_type="auto", | |
| folder="resume_screener/resumes", | |
| use_filename=True, | |
| unique_filename=True, | |
| ) | |
| # Simply use the secure_url provided by Cloudinary | |
| resume_url = cloudinary_response.get("secure_url", "") | |
| except Exception as e: | |
| logger.error("Cloudinary upload failed: %s", e) | |
| resume_url = "" | |
| # Clean up local file after processing | |
| try: | |
| os.remove(filepath) | |
| except Exception: | |
| pass | |
| # AI Summary using Google Gen AI (if configured) | |
| ai_summary = "" | |
| if GOOGLE_API_KEY: | |
| try: | |
| import google.generativeai as genai | |
| model = genai.GenerativeModel('gemini-flash-latest') | |
| prompt = f"Summarize this candidate in 2 to 3 short sentences emphasizing their top skills, experience, and education based on this resume text:\n{cleaned_text[:3000]}" | |
| response = model.generate_content(prompt) | |
| ai_summary = response.text.strip() | |
| except Exception as e: | |
| logger.error(f"Generative AI Error (Summary): {e}") | |
| candidate_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ").title() | |
| candidate_data = { | |
| "name": candidate_name, | |
| "filename": filename, | |
| "resume_url": resume_url, | |
| "resume_text": cleaned_text, | |
| "raw_text_preview": raw_text[:500], | |
| "ai_summary": ai_summary, | |
| "skills": extracted_info["skills"], | |
| "education": extracted_info["education"], | |
| "experience": extracted_info["experience"], | |
| "match_score": match_score, | |
| "skill_gaps": skill_gaps, | |
| "job_description": job_description, | |
| "audio_transcription": None, | |
| "uploaded_by": session["user_id"] | |
| } | |
| # Save to MongoDB | |
| candidate_id = insert_candidate(candidate_data) | |
| session["last_candidate_id"] = str(candidate_id) | |
| flash("Resume analyzed successfully!", "success") | |
| return redirect(url_for("results")) | |
| def process_audio_local(candidate_id, local_audio_path, audio_url): | |
| """ | |
| Process audio transcription from a LOCAL file (not URL). | |
| This avoids Cloudinary download issues entirely. | |
| The audio is saved locally first, transcribed, then cleaned up. | |
| """ | |
| logger.info("Starting LOCAL transcription for candidate_id=%s, file=%s", candidate_id, local_audio_path) | |
| result = transcribe_from_local_file(local_audio_path) | |
| if result["success"]: | |
| update_candidate_audio(candidate_id, result["text"], result["language"], audio_url) | |
| logger.info("Transcription saved for candidate_id=%s", candidate_id) | |
| else: | |
| update_candidate_audio_error(candidate_id, result["error"], audio_url) | |
| logger.error("Transcription failed for candidate_id=%s: %s", candidate_id, result["error"]) | |
| # Clean up local audio file after transcription | |
| try: | |
| if local_audio_path and os.path.exists(local_audio_path): | |
| os.remove(local_audio_path) | |
| logger.info("Cleaned up local audio: %s", local_audio_path) | |
| except Exception: | |
| pass | |
| def _handle_transcription_future(future, candidate_id, audio_url): | |
| exc = future.exception() | |
| if exc is None: | |
| return | |
| error_msg = f"Background transcription crashed: {exc}" | |
| logger.exception("Unhandled transcription error for candidate_id=%s", candidate_id) | |
| update_candidate_audio_error(candidate_id, error_msg, audio_url) | |
| def upload_audio(): | |
| # Get candidate_id from form (sent from results page) or session | |
| candidate_id = request.form.get("candidate_id") or session.get("last_candidate_id") | |
| if not candidate_id: | |
| flash("Please upload and analyze a resume first before attaching audio.", "error") | |
| return redirect(url_for("index")) | |
| candidate = get_candidate_by_id(candidate_id) | |
| if not candidate: | |
| flash("Candidate not found. Please upload a resume first.", "error") | |
| return redirect(url_for("index")) | |
| if "audio" not in request.files: | |
| flash("No audio file selected.", "error") | |
| return redirect(url_for("results")) | |
| file = request.files["audio"] | |
| if file.filename == "": | |
| flash("No audio file selected.", "error") | |
| return redirect(url_for("results")) | |
| if not allowed_file(file.filename, ALLOWED_AUDIO_EXTENSIONS): | |
| flash("Invalid audio format. Supported: MP3, WAV, M4A, FLAC, OGG, WEBM", "error") | |
| return redirect(url_for("results")) | |
| filename = secure_filename(file.filename) or "audio_file" | |
| ext = os.path.splitext(filename)[1].lower() | |
| if not ext: | |
| ext = ".mp3" | |
| # === KEY FIX: Save audio LOCALLY first, then transcribe from local file === | |
| local_audio_path = os.path.join(UPLOAD_FOLDER_AUDIO, f"{candidate_id}_{filename}") | |
| file.save(local_audio_path) | |
| logger.info("Audio saved locally at: %s (%d bytes)", local_audio_path, os.path.getsize(local_audio_path)) | |
| # Upload to Cloudinary for storage (non-blocking for transcription) | |
| audio_url = "" | |
| try: | |
| cloudinary_response = cloudinary.uploader.upload( | |
| local_audio_path, | |
| resource_type="video", | |
| folder="resume_screener/audio", | |
| public_id=f"{candidate_id}_{os.path.splitext(filename)[0]}", | |
| use_filename=False, | |
| overwrite=True, | |
| ) | |
| audio_url = cloudinary_response.get("secure_url", "") | |
| except Exception as exc: | |
| logger.warning("Audio Cloudinary upload failed (will still transcribe locally): %s", exc) | |
| audio_url = "" # Not critical — transcription uses local file | |
| set_candidate_audio_processing(candidate_id, audio_url) | |
| # === Transcribe from LOCAL file (not from Cloudinary URL) === | |
| try: | |
| future = executor.submit(process_audio_local, candidate_id, local_audio_path, audio_url) | |
| future.add_done_callback( | |
| lambda f, cid=candidate_id, aurl=audio_url: _handle_transcription_future(f, cid, aurl) | |
| ) | |
| except Exception as exc: | |
| error_msg = f"Failed to queue transcription task: {exc}" | |
| logger.exception(error_msg) | |
| update_candidate_audio_error(candidate_id, error_msg, audio_url) | |
| flash(error_msg, "error") | |
| return redirect(url_for("results")) | |
| session["last_candidate_id"] = str(candidate_id) | |
| session["awaiting_transcription_for"] = str(candidate["_id"]) | |
| logger.info("Queued LOCAL transcription for candidate_id=%s", candidate_id) | |
| flash("Audio uploaded successfully! Transcription is processing in the background.", "info") | |
| return redirect(url_for("results")) | |
| def results(): | |
| candidate_id = session.get("last_candidate_id") | |
| candidate = get_candidate_by_id(candidate_id) if candidate_id else None | |
| transcription_pending = False | |
| awaiting_for = session.get("awaiting_transcription_for") | |
| if candidate and awaiting_for == str(candidate["_id"]): | |
| audio_transcription = candidate.get("audio_transcription") | |
| if audio_transcription and audio_transcription.get("status") in {"completed", "failed"}: | |
| session.pop("awaiting_transcription_for", None) | |
| else: | |
| transcription_pending = True | |
| elif not candidate: | |
| session.pop("awaiting_transcription_for", None) | |
| candidates = get_all_candidates(session.get("user_id")) | |
| return render_template( | |
| "results.html", | |
| candidate=candidate, | |
| candidate_count=len(candidates), | |
| transcription_pending=transcription_pending | |
| ) | |
| # ── API: Transcription Status (AJAX polling) ── | |
| def transcription_status(candidate_id): | |
| candidate = get_candidate_by_id(candidate_id) | |
| if not candidate: | |
| return jsonify({"status": "not_found"}), 404 | |
| audio = candidate.get("audio_transcription") | |
| if not audio: | |
| return jsonify({"status": "none"}) | |
| return jsonify({ | |
| "status": audio.get("status", "unknown"), | |
| "text": audio.get("text", ""), | |
| "language": audio.get("language", ""), | |
| "error": audio.get("error"), | |
| }) | |
| # ── API: Candidate Profile (for modal) ── | |
| def candidate_profile(candidate_id): | |
| candidate = get_candidate_by_id(candidate_id) | |
| if not candidate: | |
| return jsonify({"error": "not found"}), 404 | |
| # Don't send the full resume text to keep response small | |
| return jsonify({ | |
| "_id": candidate["_id"], | |
| "name": candidate.get("name", "Unknown"), | |
| "filename": candidate.get("filename", ""), | |
| "resume_url": candidate.get("resume_url", ""), | |
| "ai_summary": candidate.get("ai_summary", ""), | |
| "skills": candidate.get("skills", []), | |
| "education": candidate.get("education", []), | |
| "experience": candidate.get("experience", []), | |
| "match_score": candidate.get("match_score", 0), | |
| "skill_gaps": candidate.get("skill_gaps", {"matched": [], "missing": []}), | |
| "job_description": candidate.get("job_description", ""), | |
| "audio_transcription": candidate.get("audio_transcription"), | |
| "raw_text_preview": candidate.get("raw_text_preview", ""), | |
| }) | |
| def ranking(): | |
| candidates = get_all_candidates(session.get("user_id")) | |
| job_description = "" | |
| ranked = list(candidates) | |
| if request.method == "POST": | |
| job_description = request.form.get("job_description", "").strip() | |
| if job_description and candidates: | |
| from skill_extractor import extract_skills, SKILLS_LIST | |
| from job_matcher import calculate_match_score, find_skill_gaps, rank_candidates | |
| from db import candidates_collection | |
| from bson.objectid import ObjectId | |
| jd_skills = extract_skills(job_description) | |
| # Recalculate score and gaps for all candidates and update in DB | |
| for candidate in candidates: | |
| candidate_id = candidate["_id"] | |
| candidate_skills = candidate.get("skills", []) | |
| resume_text = candidate.get("resume_text", "") | |
| # Calculate new metrics based on new JD | |
| new_match_score = calculate_match_score(resume_text, job_description, candidate_skills, jd_skills) | |
| new_skill_gaps = find_skill_gaps(candidate_skills, job_description, SKILLS_LIST, jd_skills) | |
| # Update DB directly | |
| try: | |
| candidates_collection.update_one( | |
| {"_id": ObjectId(candidate_id)}, | |
| {"$set": { | |
| "job_description": job_description, | |
| "match_score": new_match_score, | |
| "skill_gaps": new_skill_gaps | |
| }} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error updating candidate {candidate_id} during re-ranking: {e}") | |
| # Re-fetch the updated candidates from the database | |
| candidates = get_all_candidates(session.get("user_id")) | |
| # Rank the newly fetched candidates | |
| ranked = rank_candidates(candidates, job_description, jd_skills) | |
| return render_template("ranking.html", ranked=ranked, job_description=job_description, candidate_count=len(candidates)) | |
| def clear(): | |
| clear_all_candidates(session.get("user_id")) | |
| session.pop("last_candidate_id", None) | |
| flash("All candidate data cleared successfully.", "info") | |
| return redirect(url_for("index")) | |
| if __name__ == "__main__": | |
| app.run(debug=True, port=5000) | |