# api.py import os from dotenv import load_dotenv # Load env BEFORE importing modules that depend on it load_dotenv() from fastapi import FastAPI, HTTPException, UploadFile, Form, File, BackgroundTasks from pydantic import BaseModel from supabase import create_client from fastapi.middleware.cors import CORSMiddleware from supabase_ingest import process_resume from src.extraction.job_extractor import process_single_job from src.services.ats_service import analyze_ats_compatibility from src.services.analysis import identify_missing_skills, generate_ai_analysis from src.matching.similarity import calculate_granular_match_score from src.services.clustering_service import ClusteringService from typing import Dict, Any, Optional, List app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins for dev; restrict in prod allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Setup Supabase Client SUPABASE_URL = os.environ.get("SUPABASE_URL") # Use Service Role Key if available to bypass RLS SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY") if not SUPABASE_URL or not SUPABASE_KEY: raise RuntimeError("SUPABASE_URL and SUPABASE_KEY (or SUPABASE_SERVICE_ROLE_KEY) must be set in .env") client = create_client(SUPABASE_URL, SUPABASE_KEY) # Define the data we expect from the frontend class ResumeRequest(BaseModel): user_id: str file_path: str # e.g., "user_123/resume.pdf" @app.post("/process-resume") async def process_resume_endpoint(request: ResumeRequest): print(f"🔔 Signal received: Process resume for {request.user_id}") try: # Delegate everything to the unified function extracted_data = process_resume(client, request.user_id, request.file_path) return {"status": "success", "data": extracted_data} except Exception as e: print(f"❌ Error: {e}") raise HTTPException(status_code=500, detail=str(e)) # --------------------------------------------------------------------- # WEBHOOK ENDPOINTS (Called by Supabase) # --------------------------------------------------------------------- class WebhookRequest(BaseModel): type: str table: str record: Dict[str, Any] schema: str old_record: Optional[Dict[str, Any]] = None @app.post("/webhook/storage") async def storage_webhook(request: WebhookRequest): """ Handles Database Webhooks from Supabase (storage.objects insert). """ print(f"🔔 Webhook received: {request.type} on {request.table}") # We only care about INSERTs or UPDATEs (overwrites) to the 'resume' bucket if request.type not in ["INSERT", "UPDATE"] or request.table != "objects": return {"status": "ignored"} # Extract file details from the record # Object path example: "user_123/123456_resume.pdf" file_path = request.record.get("name") bucket_id = request.record.get("bucket_id") # Check bucket if bucket_id != "resume": print(f"⚠️ Ignoring upload to bucket: {bucket_id}") return {"status": "ignored", "reason": "wrong bucket"} # Extract User ID (assuming folder structure: user_id/filename) try: user_id = file_path.split("/")[0] except Exception: print(f"❌ Could not extract user_id from {file_path}") return {"status": "error", "message": "invalid file path structure"} print(f"▶️ Triggering processing for {file_path}") # Call the processing logic try: process_resume(client, user_id, file_path) return {"status": "success"} except Exception as e: print(f"❌ Processing failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/webhook/jobs") async def jobs_webhook(request: WebhookRequest): """ Handles Database Webhooks from Supabase (jobs table UPDATE/INSERT). """ print(f"🔔 Webhook received: {request.type} on {request.table}") if request.table != "jobs": return {"status": "ignored", "reason": "wrong table"} # We care about INSERT and UPDATE # For UPDATE, we might want to check if description changed, but for now we runs it anyway new_record = request.record job_id = new_record.get("id") description = new_record.get("description") experience_level = new_record.get("experience_level") if not job_id: print("❌ Webhook missing job_id") return {"status": "error", "message": "missing id"} print(f"▶️ Triggering job extraction for Job ID: {job_id}") try: # Re-use global client from line 32 process_single_job(client, job_id, description, experience_level) return {"status": "success"} except Exception as e: print(f"❌ Job processing failed: {e}") raise HTTPException(status_code=500, detail=str(e)) # --- CLUSTERING ENDPOINT --- @app.post("/run-clustering") async def run_clustering_endpoint(background_tasks: BackgroundTasks): """ Trigger candidate clustering asynchronously. Returns immediately while clustering runs in the background. """ print("🚀 Starting clustering pipeline...") def run_clustering(): """Background task to run clustering.""" try: service = ClusteringService() service.run_clustering_pipeline(n_clusters=5) print("✅ Clustering pipeline completed successfully!") except Exception as e: print(f"❌ Clustering failed: {e}") # Add to background tasks background_tasks.add_task(run_clustering) return { "status": "started", "message": "Clustering pipeline started. This may take a few minutes. Refresh the page to see updated clusters." } @app.post("/analyze-ats") async def analyze_ats_endpoint( resume: UploadFile = File(...), job_description: str = Form(...) ): """ Real-time ATS analysis endpoint. Does not save to DB (unless you want to add that logic). """ print(f"🔍 Analyzing ATS compatibility for: {resume.filename}") try: result = await analyze_ats_compatibility(resume, job_description) return {"status": "success", "data": result} except Exception as e: print(f"❌ ATS Analysis failed: {e}") raise HTTPException(status_code=500, detail=str(e)) # --------------------------------------------------------------------- # CANDIDATE ANALYSIS ENDPOINT # --------------------------------------------------------------------- # --------------------------------------------------------------------- # CORE ANALYSIS LOGIC # --------------------------------------------------------------------- async def perform_candidate_analysis(candidate_id: str, job_id: str, force_refresh: bool = False): """ Shared logic to analyze a candidate for a job. Checks for cached results first unless force_refresh is True. """ # 0. Check for existing analysis in applications table if not force_refresh: app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute() if app_resp.data and app_resp.data[0].get("ai_summary"): app_data = app_resp.data[0] insights = app_data.get("ai_insights") or {} # Use cached missing_skills if available if "missing_skills" in insights: print(f"✅ Found fully cached analysis for {candidate_id}") return { "summary": app_data.get("ai_summary"), "strengths": insights.get("strengths") or [], "weaknesses": insights.get("weaknesses") or [], "score": app_data.get("AI_score") or 0, "semantic_score": app_data.get("semantic_score") or 0, "score_breakdown": insights.get("score_breakdown") or {}, "missing_skills": insights.get("missing_skills") or [] } # 1. Fetch Candidate Data prof_resp = client.table("profiles").select("*").eq("id", candidate_id).execute() if not prof_resp.data: raise HTTPException(status_code=404, detail="Candidate not found") # 0.5 Granular Match Score (Vector Similarity) semantic_result = await calculate_granular_match_score(client, candidate_id, job_id) profile = prof_resp.data[0] # 2. Fetch Job Data job_description = "" job_skills = [] if job_id: job_resp = client.table("jobs").select("*").eq("id", job_id).execute() if job_resp.data: job = job_resp.data[0] job_description = job.get("description") or "" raw_job_skills = job.get("skills") if job.get("skills") else job.get("technical_skills") if isinstance(raw_job_skills, str): job_skills = [s.strip() for s in raw_job_skills.split(",") if s.strip()] elif isinstance(raw_job_skills, list): job_skills = raw_job_skills # 3. Prepare Profile Skills profile_skills = [] raw_skills = profile.get("skills") or [] if isinstance(raw_skills, str): profile_skills = [s.strip() for s in raw_skills.split(",") if s.strip()] else: profile_skills = raw_skills raw_tech_skills = profile.get("technical_skills") or [] if isinstance(raw_tech_skills, str): profile_skills.extend([s.strip() for s in raw_tech_skills.split(",") if s.strip()]) else: profile_skills.extend(raw_tech_skills) # 4. Identify Missing Skills (Semantic) missing = [] if job_skills: missing = identify_missing_skills(job_skills, profile_skills) # 4a. If we had half-cached data (summary exists but missing_skills don't) # We check cache again just in case it was partially filled app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute() if not force_refresh and app_resp.data and app_resp.data[0].get("ai_summary"): app_data = app_resp.data[0] insights = app_data.get("ai_insights") or {} return { "summary": app_data.get("ai_summary"), "weaknesses": insights.get("weaknesses") or [], "score": app_data.get("AI_score") or 0, "semantic_score": semantic_result.get("total_score"), "score_breakdown": semantic_result.get("breakdown"), "missing_skills": missing } # 5. Generate fresh AI Insights profile_summary = f""" Role: {profile.get('role')} Headline: {profile.get('headline')} Summary: {profile.get('summary')} Experience: {profile.get('experience_years')} years Work Experience: {profile.get('work_experience')} Education: {profile.get('education')} Skills: {", ".join(profile_skills)} """ ai_insights = generate_ai_analysis(profile_summary, job_description) # 6. Persist to Database try: breakdown = semantic_result.get("breakdown") or {} data_to_save = { "ai_summary": ai_insights.get("summary"), "ai_insights": { "weaknesses": ai_insights.get("weaknesses") or [], "missing_skills": missing, "score_breakdown": breakdown }, "AI_score": int(ai_insights.get("score") or 0), "match_score": int(semantic_result.get("total_score") or 0), # Granular Scores mapping to table columns "skills_match": int(breakdown.get("skills", 0)), "technical_skills_match": int(breakdown.get("technical_skills", 0)), "work_experience_match": int(breakdown.get("experience", 0)), "education_match": int(breakdown.get("education", 0)), "certifications_match": int(breakdown.get("certifications", 0)), "project_match": int(breakdown.get("projects", 0)) } client.table("applications").update(data_to_save).eq("user_id", candidate_id).eq("job_id", job_id).execute() print(f"💾 Persisted AI analysis and granular scores for candidate {candidate_id}") except Exception as db_err: print(f"⚠️ Failed to persist AI analysis: {db_err}") return { "summary": ai_insights.get("summary"), "strengths": ai_insights.get("strengths"), "weaknesses": ai_insights.get("weaknesses"), "score": ai_insights.get("score") or 0, "semantic_score": semantic_result.get("total_score"), "score_breakdown": semantic_result.get("breakdown"), "missing_skills": missing } # --------------------------------------------------------------------- # CANDIDATE ANALYSIS ENDPOINT # --------------------------------------------------------------------- class AnalysisRequest(BaseModel): candidate_id: str job_id: Optional[str] = None @app.post("/analyze-candidate") async def analyze_candidate_endpoint(request: AnalysisRequest): """ Detailed candidate analysis: Summary + Missing Skills. """ print(f"🔬 Analyzing candidate {request.candidate_id} for job {request.job_id}") if not request.job_id: raise HTTPException(status_code=400, detail="job_id is required for analysis") try: result = await perform_candidate_analysis(request.candidate_id, request.job_id) return {"status": "success", "data": result} except Exception as e: print(f"❌ Analysis endpoint failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/webhook/applications") async def applications_webhook(request: WebhookRequest): """ Handles Database Webhooks from Supabase (applications table UPDATE/INSERT). """ print(f"🔔 Webhook received: {request.type} on {request.table}") if request.table != "applications": return {"status": "ignored", "reason": "wrong table"} record = request.record # Avoid infinite loop: if this update already contains ai_summary, ignore it if request.type == "UPDATE" and record.get("ai_summary"): return {"status": "skipped", "reason": "already analyzed"} candidate_id = record.get("user_id") job_id = record.get("job_id") if not candidate_id or not job_id: return {"status": "error", "message": "missing ids"} print(f"▶️ Auto-triggering AI analysis for {candidate_id} / {job_id}") try: await perform_candidate_analysis(candidate_id, job_id) return {"status": "success"} except Exception as e: print(f"❌ Webhook analysis failed: {e}") return {"status": "error", "message": str(e)} # Run with: uvicorn api:app --reload