Spaces:
Sleeping
Sleeping
| # api.py | |
| import os | |
| from dotenv import load_dotenv | |
| # Load env BEFORE importing modules that depend on it | |
| load_dotenv() | |
| from fastapi import FastAPI, HTTPException, UploadFile, Form, File, BackgroundTasks | |
| from pydantic import BaseModel | |
| from supabase import create_client | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from supabase_ingest import process_resume | |
| from src.extraction.job_extractor import process_single_job | |
| from src.services.ats_service import analyze_ats_compatibility | |
| from src.services.analysis import identify_missing_skills, generate_ai_analysis | |
| from src.matching.similarity import calculate_granular_match_score | |
| from src.services.clustering_service import ClusteringService | |
| from typing import Dict, Any, Optional, List | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins for dev; restrict in prod | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Setup Supabase Client | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| # Use Service Role Key if available to bypass RLS | |
| SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY") | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| raise RuntimeError("SUPABASE_URL and SUPABASE_KEY (or SUPABASE_SERVICE_ROLE_KEY) must be set in .env") | |
| client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| # Define the data we expect from the frontend | |
| class ResumeRequest(BaseModel): | |
| user_id: str | |
| file_path: str # e.g., "user_123/resume.pdf" | |
| async def process_resume_endpoint(request: ResumeRequest): | |
| print(f"π Signal received: Process resume for {request.user_id}") | |
| try: | |
| # Delegate everything to the unified function | |
| extracted_data = process_resume(client, request.user_id, request.file_path) | |
| return {"status": "success", "data": extracted_data} | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --------------------------------------------------------------------- | |
| # WEBHOOK ENDPOINTS (Called by Supabase) | |
| # --------------------------------------------------------------------- | |
| class WebhookRequest(BaseModel): | |
| type: str | |
| table: str | |
| record: Dict[str, Any] | |
| schema: str | |
| old_record: Optional[Dict[str, Any]] = None | |
| async def storage_webhook(request: WebhookRequest): | |
| """ | |
| Handles Database Webhooks from Supabase (storage.objects insert). | |
| """ | |
| print(f"π Webhook received: {request.type} on {request.table}") | |
| # We only care about INSERTs or UPDATEs (overwrites) to the 'resume' bucket | |
| if request.type not in ["INSERT", "UPDATE"] or request.table != "objects": | |
| return {"status": "ignored"} | |
| # Extract file details from the record | |
| # Object path example: "user_123/123456_resume.pdf" | |
| file_path = request.record.get("name") | |
| bucket_id = request.record.get("bucket_id") | |
| # Check bucket | |
| if bucket_id != "resume": | |
| print(f"β οΈ Ignoring upload to bucket: {bucket_id}") | |
| return {"status": "ignored", "reason": "wrong bucket"} | |
| # Extract User ID (assuming folder structure: user_id/filename) | |
| try: | |
| user_id = file_path.split("/")[0] | |
| except Exception: | |
| print(f"β Could not extract user_id from {file_path}") | |
| return {"status": "error", "message": "invalid file path structure"} | |
| print(f"βΆοΈ Triggering processing for {file_path}") | |
| # Call the processing logic | |
| try: | |
| process_resume(client, user_id, file_path) | |
| return {"status": "success"} | |
| except Exception as e: | |
| print(f"β Processing failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def jobs_webhook(request: WebhookRequest): | |
| """ | |
| Handles Database Webhooks from Supabase (jobs table UPDATE/INSERT). | |
| """ | |
| print(f"π Webhook received: {request.type} on {request.table}") | |
| if request.table != "jobs": | |
| return {"status": "ignored", "reason": "wrong table"} | |
| # We care about INSERT and UPDATE | |
| # For UPDATE, we might want to check if description changed, but for now we runs it anyway | |
| new_record = request.record | |
| job_id = new_record.get("id") | |
| description = new_record.get("description") | |
| experience_level = new_record.get("experience_level") | |
| if not job_id: | |
| print("β Webhook missing job_id") | |
| return {"status": "error", "message": "missing id"} | |
| print(f"βΆοΈ Triggering job extraction for Job ID: {job_id}") | |
| try: | |
| # Re-use global client from line 32 | |
| process_single_job(client, job_id, description, experience_level) | |
| return {"status": "success"} | |
| except Exception as e: | |
| print(f"β Job processing failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --- CLUSTERING ENDPOINT --- | |
| async def run_clustering_endpoint(background_tasks: BackgroundTasks): | |
| """ | |
| Trigger candidate clustering asynchronously. | |
| Returns immediately while clustering runs in the background. | |
| """ | |
| print("π Starting clustering pipeline...") | |
| def run_clustering(): | |
| """Background task to run clustering.""" | |
| try: | |
| service = ClusteringService() | |
| service.run_clustering_pipeline(n_clusters=5) | |
| print("β Clustering pipeline completed successfully!") | |
| except Exception as e: | |
| print(f"β Clustering failed: {e}") | |
| # Add to background tasks | |
| background_tasks.add_task(run_clustering) | |
| return { | |
| "status": "started", | |
| "message": "Clustering pipeline started. This may take a few minutes. Refresh the page to see updated clusters." | |
| } | |
| async def analyze_ats_endpoint( | |
| resume: UploadFile = File(...), | |
| job_description: str = Form(...) | |
| ): | |
| """ | |
| Real-time ATS analysis endpoint. | |
| Does not save to DB (unless you want to add that logic). | |
| """ | |
| print(f"π Analyzing ATS compatibility for: {resume.filename}") | |
| try: | |
| result = await analyze_ats_compatibility(resume, job_description) | |
| return {"status": "success", "data": result} | |
| except Exception as e: | |
| print(f"β ATS Analysis failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # --------------------------------------------------------------------- | |
| # CANDIDATE ANALYSIS ENDPOINT | |
| # --------------------------------------------------------------------- | |
| # --------------------------------------------------------------------- | |
| # CORE ANALYSIS LOGIC | |
| # --------------------------------------------------------------------- | |
| async def perform_candidate_analysis(candidate_id: str, job_id: str, force_refresh: bool = False): | |
| """ | |
| Shared logic to analyze a candidate for a job. | |
| Checks for cached results first unless force_refresh is True. | |
| """ | |
| # 0. Check for existing analysis in applications table | |
| if not force_refresh: | |
| app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute() | |
| if app_resp.data and app_resp.data[0].get("ai_summary"): | |
| app_data = app_resp.data[0] | |
| insights = app_data.get("ai_insights") or {} | |
| # Use cached missing_skills if available | |
| if "missing_skills" in insights: | |
| print(f"β Found fully cached analysis for {candidate_id}") | |
| return { | |
| "summary": app_data.get("ai_summary"), | |
| "strengths": insights.get("strengths") or [], | |
| "weaknesses": insights.get("weaknesses") or [], | |
| "score": app_data.get("AI_score") or 0, | |
| "semantic_score": app_data.get("semantic_score") or 0, | |
| "score_breakdown": insights.get("score_breakdown") or {}, | |
| "missing_skills": insights.get("missing_skills") or [] | |
| } | |
| # 1. Fetch Candidate Data | |
| prof_resp = client.table("profiles").select("*").eq("id", candidate_id).execute() | |
| if not prof_resp.data: | |
| raise HTTPException(status_code=404, detail="Candidate not found") | |
| # 0.5 Granular Match Score (Vector Similarity) | |
| semantic_result = await calculate_granular_match_score(client, candidate_id, job_id) | |
| profile = prof_resp.data[0] | |
| # 2. Fetch Job Data | |
| job_description = "" | |
| job_skills = [] | |
| if job_id: | |
| job_resp = client.table("jobs").select("*").eq("id", job_id).execute() | |
| if job_resp.data: | |
| job = job_resp.data[0] | |
| job_description = job.get("description") or "" | |
| raw_job_skills = job.get("skills") if job.get("skills") else job.get("technical_skills") | |
| if isinstance(raw_job_skills, str): | |
| job_skills = [s.strip() for s in raw_job_skills.split(",") if s.strip()] | |
| elif isinstance(raw_job_skills, list): | |
| job_skills = raw_job_skills | |
| # 3. Prepare Profile Skills | |
| profile_skills = [] | |
| raw_skills = profile.get("skills") or [] | |
| if isinstance(raw_skills, str): | |
| profile_skills = [s.strip() for s in raw_skills.split(",") if s.strip()] | |
| else: | |
| profile_skills = raw_skills | |
| raw_tech_skills = profile.get("technical_skills") or [] | |
| if isinstance(raw_tech_skills, str): | |
| profile_skills.extend([s.strip() for s in raw_tech_skills.split(",") if s.strip()]) | |
| else: | |
| profile_skills.extend(raw_tech_skills) | |
| # 4. Identify Missing Skills (Semantic) | |
| missing = [] | |
| if job_skills: | |
| missing = identify_missing_skills(job_skills, profile_skills) | |
| # 4a. If we had half-cached data (summary exists but missing_skills don't) | |
| # We check cache again just in case it was partially filled | |
| app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute() | |
| if not force_refresh and app_resp.data and app_resp.data[0].get("ai_summary"): | |
| app_data = app_resp.data[0] | |
| insights = app_data.get("ai_insights") or {} | |
| return { | |
| "summary": app_data.get("ai_summary"), | |
| "weaknesses": insights.get("weaknesses") or [], | |
| "score": app_data.get("AI_score") or 0, | |
| "semantic_score": semantic_result.get("total_score"), | |
| "score_breakdown": semantic_result.get("breakdown"), | |
| "missing_skills": missing | |
| } | |
| # 5. Generate fresh AI Insights | |
| profile_summary = f""" | |
| Role: {profile.get('role')} | |
| Headline: {profile.get('headline')} | |
| Summary: {profile.get('summary')} | |
| Experience: {profile.get('experience_years')} years | |
| Work Experience: {profile.get('work_experience')} | |
| Education: {profile.get('education')} | |
| Skills: {", ".join(profile_skills)} | |
| """ | |
| ai_insights = generate_ai_analysis(profile_summary, job_description) | |
| # 6. Persist to Database | |
| try: | |
| breakdown = semantic_result.get("breakdown") or {} | |
| data_to_save = { | |
| "ai_summary": ai_insights.get("summary"), | |
| "ai_insights": { | |
| "weaknesses": ai_insights.get("weaknesses") or [], | |
| "missing_skills": missing, | |
| "score_breakdown": breakdown | |
| }, | |
| "AI_score": int(ai_insights.get("score") or 0), | |
| "match_score": int(semantic_result.get("total_score") or 0), | |
| # Granular Scores mapping to table columns | |
| "skills_match": int(breakdown.get("skills", 0)), | |
| "technical_skills_match": int(breakdown.get("technical_skills", 0)), | |
| "work_experience_match": int(breakdown.get("experience", 0)), | |
| "education_match": int(breakdown.get("education", 0)), | |
| "certifications_match": int(breakdown.get("certifications", 0)), | |
| "project_match": int(breakdown.get("projects", 0)) | |
| } | |
| client.table("applications").update(data_to_save).eq("user_id", candidate_id).eq("job_id", job_id).execute() | |
| print(f"πΎ Persisted AI analysis and granular scores for candidate {candidate_id}") | |
| except Exception as db_err: | |
| print(f"β οΈ Failed to persist AI analysis: {db_err}") | |
| return { | |
| "summary": ai_insights.get("summary"), | |
| "strengths": ai_insights.get("strengths"), | |
| "weaknesses": ai_insights.get("weaknesses"), | |
| "score": ai_insights.get("score") or 0, | |
| "semantic_score": semantic_result.get("total_score"), | |
| "score_breakdown": semantic_result.get("breakdown"), | |
| "missing_skills": missing | |
| } | |
| # --------------------------------------------------------------------- | |
| # CANDIDATE ANALYSIS ENDPOINT | |
| # --------------------------------------------------------------------- | |
| class AnalysisRequest(BaseModel): | |
| candidate_id: str | |
| job_id: Optional[str] = None | |
| async def analyze_candidate_endpoint(request: AnalysisRequest): | |
| """ | |
| Detailed candidate analysis: Summary + Missing Skills. | |
| """ | |
| print(f"π¬ Analyzing candidate {request.candidate_id} for job {request.job_id}") | |
| if not request.job_id: | |
| raise HTTPException(status_code=400, detail="job_id is required for analysis") | |
| try: | |
| result = await perform_candidate_analysis(request.candidate_id, request.job_id) | |
| return {"status": "success", "data": result} | |
| except Exception as e: | |
| print(f"β Analysis endpoint failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def applications_webhook(request: WebhookRequest): | |
| """ | |
| Handles Database Webhooks from Supabase (applications table UPDATE/INSERT). | |
| """ | |
| print(f"π Webhook received: {request.type} on {request.table}") | |
| if request.table != "applications": | |
| return {"status": "ignored", "reason": "wrong table"} | |
| record = request.record | |
| # Avoid infinite loop: if this update already contains ai_summary, ignore it | |
| if request.type == "UPDATE" and record.get("ai_summary"): | |
| return {"status": "skipped", "reason": "already analyzed"} | |
| candidate_id = record.get("user_id") | |
| job_id = record.get("job_id") | |
| if not candidate_id or not job_id: | |
| return {"status": "error", "message": "missing ids"} | |
| print(f"βΆοΈ Auto-triggering AI analysis for {candidate_id} / {job_id}") | |
| try: | |
| await perform_candidate_analysis(candidate_id, job_id) | |
| return {"status": "success"} | |
| except Exception as e: | |
| print(f"β Webhook analysis failed: {e}") | |
| return {"status": "error", "message": str(e)} | |
| # Run with: uvicorn api:app --reload |