""" app.py ------ Main Flask application for HireScope AI — Resume Screening System. Includes MongoDB, Auth, Async Processing, and Sentence Transformers. """ import os import logging import concurrent.futures from functools import wraps from flask import ( Flask, render_template, request, redirect, url_for, flash, session, jsonify ) from werkzeug.middleware.proxy_fix import ProxyFix from dotenv import load_dotenv import cloudinary import cloudinary.uploader import cloudinary.api # Load environment variables load_dotenv() # Cloudinary Configuration cloudinary.config( cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"), api_key=os.getenv("CLOUDINARY_API_KEY"), api_secret=os.getenv("CLOUDINARY_API_SECRET"), secure=True, ) from db import ( create_user, authenticate_user, insert_candidate, get_all_candidates, get_candidate_by_id, update_candidate_audio, update_candidate_audio_error, set_candidate_audio_processing, clear_all_candidates ) from werkzeug.utils import secure_filename from resume_parser import extract_text, clean_text from skill_extractor import extract_all, SKILLS_LIST from job_matcher import calculate_match_score, find_skill_gaps, rank_candidates from audio_transcriber import transcribe_from_local_file app = Flask(__name__) logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s [%(name)s] %(message)s", ) logger = logging.getLogger(__name__) # Secret key app.secret_key = os.getenv("SECRET_KEY") # Hugging Face / Proxy Configuration app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Session Configuration for iframe compatibility (Hugging Face) app.config.update( SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='None', SESSION_COOKIE_HTTPONLY=True, ) # Optional: Initialize Google Generative AI if key is present GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") if GOOGLE_API_KEY: try: import google.generativeai as genai genai.configure(api_key=GOOGLE_API_KEY) except Exception: pass BASE_DIR = os.path.abspath(os.path.dirname(__file__)) UPLOAD_FOLDER_RESUMES = os.path.join(BASE_DIR, "uploads", "resumes") UPLOAD_FOLDER_AUDIO = os.path.join(BASE_DIR, "uploads", "audio") os.makedirs(UPLOAD_FOLDER_RESUMES, exist_ok=True) os.makedirs(UPLOAD_FOLDER_AUDIO, exist_ok=True) ALLOWED_RESUME_EXTENSIONS = {"pdf", "docx"} ALLOWED_AUDIO_EXTENSIONS = {"mp3", "wav", "m4a", "flac", "ogg", "webm"} # Thread pool for async audio transcription executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) def allowed_file(filename, allowed_extensions): return "." in filename and filename.rsplit(".", 1)[1].lower() in allowed_extensions # ── Authentication Helper ── def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if "user_id" not in session: flash("Please log in to access this page.", "warning") return redirect(url_for("login")) return f(*args, **kwargs) return decorated_function @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": email = request.form.get("email") password = request.form.get("password") user = authenticate_user(email, password) if user: session["user_id"] = user["_id"] session["username"] = user["username"] session["role"] = user["role"] flash("Logged in successfully!", "success") return redirect(url_for("index")) else: flash("Invalid email or password", "error") return render_template("login.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": username = request.form.get("username") email = request.form.get("email") password = request.form.get("password") success, msg = create_user(username, email, password) if success: flash("Registration successful. Please login.", "success") return redirect(url_for("login")) else: flash(msg, "error") return render_template("register.html") @app.route("/logout") def logout(): session.clear() flash("Logged out successfully.", "info") return redirect(url_for("login")) @app.route("/") @login_required def index(): candidates = get_all_candidates(session.get("user_id")) # Calculate stats avg_score = 0 if candidates: scores = [c.get("match_score", 0) for c in candidates] avg_score = round(sum(scores) / len(scores), 1) return render_template( "index.html", candidate_count=len(candidates), avg_score=avg_score, recent_candidates=candidates[:5] ) @app.route("/upload", methods=["POST"]) @login_required def upload_resume(): if "resume" not in request.files: flash("No file selected.", "error") return redirect(url_for("index")) file = request.files["resume"] if file.filename == "": flash("No file selected.", "error") return redirect(url_for("index")) if not allowed_file(file.filename, ALLOWED_RESUME_EXTENSIONS): flash("Invalid file type. Please upload PDF or DOCX.", "error") return redirect(url_for("index")) filename = secure_filename(file.filename) if not filename: filename = "resume_file" filepath = os.path.join(UPLOAD_FOLDER_RESUMES, filename) file.save(filepath) raw_text = extract_text(filepath) if not raw_text.strip(): flash("Could not extract text. Please ensure the PDF/DOCX is not just scanned images.", "error") return redirect(url_for("index")) cleaned_text = clean_text(raw_text) extracted_info = extract_all(cleaned_text) job_description = request.form.get("job_description", "").strip() match_score = 0.0 skill_gaps = {"matched": [], "missing": []} jd_skills = [] if job_description: from skill_extractor import extract_skills jd_skills = extract_skills(job_description) match_score = calculate_match_score(cleaned_text, job_description, extracted_info["skills"], jd_skills) skill_gaps = find_skill_gaps(extracted_info["skills"], job_description, SKILLS_LIST, jd_skills) # --- Cloudinary Upload Resume --- resume_url = "" try: cloudinary_response = cloudinary.uploader.upload( filepath, resource_type="auto", folder="resume_screener/resumes", use_filename=True, unique_filename=True, ) # Simply use the secure_url provided by Cloudinary resume_url = cloudinary_response.get("secure_url", "") except Exception as e: logger.error("Cloudinary upload failed: %s", e) resume_url = "" # Clean up local file after processing try: os.remove(filepath) except Exception: pass # AI Summary using Google Gen AI (if configured) ai_summary = "" if GOOGLE_API_KEY: try: import google.generativeai as genai model = genai.GenerativeModel('gemini-flash-latest') prompt = f"Summarize this candidate in 2 to 3 short sentences emphasizing their top skills, experience, and education based on this resume text:\n{cleaned_text[:3000]}" response = model.generate_content(prompt) ai_summary = response.text.strip() except Exception as e: logger.error(f"Generative AI Error (Summary): {e}") candidate_name = os.path.splitext(filename)[0].replace("_", " ").replace("-", " ").title() candidate_data = { "name": candidate_name, "filename": filename, "resume_url": resume_url, "resume_text": cleaned_text, "raw_text_preview": raw_text[:500], "ai_summary": ai_summary, "skills": extracted_info["skills"], "education": extracted_info["education"], "experience": extracted_info["experience"], "match_score": match_score, "skill_gaps": skill_gaps, "job_description": job_description, "audio_transcription": None, "uploaded_by": session["user_id"] } # Save to MongoDB candidate_id = insert_candidate(candidate_data) session["last_candidate_id"] = str(candidate_id) flash("Resume analyzed successfully!", "success") return redirect(url_for("results")) def process_audio_local(candidate_id, local_audio_path, audio_url): """ Process audio transcription from a LOCAL file (not URL). This avoids Cloudinary download issues entirely. The audio is saved locally first, transcribed, then cleaned up. """ logger.info("Starting LOCAL transcription for candidate_id=%s, file=%s", candidate_id, local_audio_path) result = transcribe_from_local_file(local_audio_path) if result["success"]: update_candidate_audio(candidate_id, result["text"], result["language"], audio_url) logger.info("Transcription saved for candidate_id=%s", candidate_id) else: update_candidate_audio_error(candidate_id, result["error"], audio_url) logger.error("Transcription failed for candidate_id=%s: %s", candidate_id, result["error"]) # Clean up local audio file after transcription try: if local_audio_path and os.path.exists(local_audio_path): os.remove(local_audio_path) logger.info("Cleaned up local audio: %s", local_audio_path) except Exception: pass def _handle_transcription_future(future, candidate_id, audio_url): exc = future.exception() if exc is None: return error_msg = f"Background transcription crashed: {exc}" logger.exception("Unhandled transcription error for candidate_id=%s", candidate_id) update_candidate_audio_error(candidate_id, error_msg, audio_url) @app.route("/upload_audio", methods=["POST"]) @login_required def upload_audio(): # Get candidate_id from form (sent from results page) or session candidate_id = request.form.get("candidate_id") or session.get("last_candidate_id") if not candidate_id: flash("Please upload and analyze a resume first before attaching audio.", "error") return redirect(url_for("index")) candidate = get_candidate_by_id(candidate_id) if not candidate: flash("Candidate not found. Please upload a resume first.", "error") return redirect(url_for("index")) if "audio" not in request.files: flash("No audio file selected.", "error") return redirect(url_for("results")) file = request.files["audio"] if file.filename == "": flash("No audio file selected.", "error") return redirect(url_for("results")) if not allowed_file(file.filename, ALLOWED_AUDIO_EXTENSIONS): flash("Invalid audio format. Supported: MP3, WAV, M4A, FLAC, OGG, WEBM", "error") return redirect(url_for("results")) filename = secure_filename(file.filename) or "audio_file" ext = os.path.splitext(filename)[1].lower() if not ext: ext = ".mp3" # === KEY FIX: Save audio LOCALLY first, then transcribe from local file === local_audio_path = os.path.join(UPLOAD_FOLDER_AUDIO, f"{candidate_id}_{filename}") file.save(local_audio_path) logger.info("Audio saved locally at: %s (%d bytes)", local_audio_path, os.path.getsize(local_audio_path)) # Upload to Cloudinary for storage (non-blocking for transcription) audio_url = "" try: cloudinary_response = cloudinary.uploader.upload( local_audio_path, resource_type="video", folder="resume_screener/audio", public_id=f"{candidate_id}_{os.path.splitext(filename)[0]}", use_filename=False, overwrite=True, ) audio_url = cloudinary_response.get("secure_url", "") except Exception as exc: logger.warning("Audio Cloudinary upload failed (will still transcribe locally): %s", exc) audio_url = "" # Not critical — transcription uses local file set_candidate_audio_processing(candidate_id, audio_url) # === Transcribe from LOCAL file (not from Cloudinary URL) === try: future = executor.submit(process_audio_local, candidate_id, local_audio_path, audio_url) future.add_done_callback( lambda f, cid=candidate_id, aurl=audio_url: _handle_transcription_future(f, cid, aurl) ) except Exception as exc: error_msg = f"Failed to queue transcription task: {exc}" logger.exception(error_msg) update_candidate_audio_error(candidate_id, error_msg, audio_url) flash(error_msg, "error") return redirect(url_for("results")) session["last_candidate_id"] = str(candidate_id) session["awaiting_transcription_for"] = str(candidate["_id"]) logger.info("Queued LOCAL transcription for candidate_id=%s", candidate_id) flash("Audio uploaded successfully! Transcription is processing in the background.", "info") return redirect(url_for("results")) @app.route("/results") @login_required def results(): candidate_id = session.get("last_candidate_id") candidate = get_candidate_by_id(candidate_id) if candidate_id else None transcription_pending = False awaiting_for = session.get("awaiting_transcription_for") if candidate and awaiting_for == str(candidate["_id"]): audio_transcription = candidate.get("audio_transcription") if audio_transcription and audio_transcription.get("status") in {"completed", "failed"}: session.pop("awaiting_transcription_for", None) else: transcription_pending = True elif not candidate: session.pop("awaiting_transcription_for", None) candidates = get_all_candidates(session.get("user_id")) return render_template( "results.html", candidate=candidate, candidate_count=len(candidates), transcription_pending=transcription_pending ) # ── API: Transcription Status (AJAX polling) ── @app.route("/api/transcription_status/") @login_required def transcription_status(candidate_id): candidate = get_candidate_by_id(candidate_id) if not candidate: return jsonify({"status": "not_found"}), 404 audio = candidate.get("audio_transcription") if not audio: return jsonify({"status": "none"}) return jsonify({ "status": audio.get("status", "unknown"), "text": audio.get("text", ""), "language": audio.get("language", ""), "error": audio.get("error"), }) # ── API: Candidate Profile (for modal) ── @app.route("/api/candidate/") @login_required def candidate_profile(candidate_id): candidate = get_candidate_by_id(candidate_id) if not candidate: return jsonify({"error": "not found"}), 404 # Don't send the full resume text to keep response small return jsonify({ "_id": candidate["_id"], "name": candidate.get("name", "Unknown"), "filename": candidate.get("filename", ""), "resume_url": candidate.get("resume_url", ""), "ai_summary": candidate.get("ai_summary", ""), "skills": candidate.get("skills", []), "education": candidate.get("education", []), "experience": candidate.get("experience", []), "match_score": candidate.get("match_score", 0), "skill_gaps": candidate.get("skill_gaps", {"matched": [], "missing": []}), "job_description": candidate.get("job_description", ""), "audio_transcription": candidate.get("audio_transcription"), "raw_text_preview": candidate.get("raw_text_preview", ""), }) @app.route("/ranking", methods=["GET", "POST"]) @login_required def ranking(): candidates = get_all_candidates(session.get("user_id")) job_description = "" ranked = list(candidates) if request.method == "POST": job_description = request.form.get("job_description", "").strip() if job_description and candidates: from skill_extractor import extract_skills, SKILLS_LIST from job_matcher import calculate_match_score, find_skill_gaps, rank_candidates from db import candidates_collection from bson.objectid import ObjectId jd_skills = extract_skills(job_description) # Recalculate score and gaps for all candidates and update in DB for candidate in candidates: candidate_id = candidate["_id"] candidate_skills = candidate.get("skills", []) resume_text = candidate.get("resume_text", "") # Calculate new metrics based on new JD new_match_score = calculate_match_score(resume_text, job_description, candidate_skills, jd_skills) new_skill_gaps = find_skill_gaps(candidate_skills, job_description, SKILLS_LIST, jd_skills) # Update DB directly try: candidates_collection.update_one( {"_id": ObjectId(candidate_id)}, {"$set": { "job_description": job_description, "match_score": new_match_score, "skill_gaps": new_skill_gaps }} ) except Exception as e: logger.error(f"Error updating candidate {candidate_id} during re-ranking: {e}") # Re-fetch the updated candidates from the database candidates = get_all_candidates(session.get("user_id")) # Rank the newly fetched candidates ranked = rank_candidates(candidates, job_description, jd_skills) return render_template("ranking.html", ranked=ranked, job_description=job_description, candidate_count=len(candidates)) @app.route("/clear") @login_required def clear(): clear_all_candidates(session.get("user_id")) session.pop("last_candidate_id", None) flash("All candidate data cleared successfully.", "info") return redirect(url_for("index")) if __name__ == "__main__": app.run(debug=True, port=5000)