Muhammed Sameer
Revert NLTK auto-download to fix port timeout
efb9c07
# api.py
import os
from dotenv import load_dotenv
# Load env BEFORE importing modules that depend on it
load_dotenv()
from fastapi import FastAPI, HTTPException, UploadFile, Form, File, BackgroundTasks
from pydantic import BaseModel
from supabase import create_client
from fastapi.middleware.cors import CORSMiddleware
from supabase_ingest import process_resume
from src.extraction.job_extractor import process_single_job
from src.services.ats_service import analyze_ats_compatibility
from src.services.analysis import identify_missing_skills, generate_ai_analysis
from src.matching.similarity import calculate_granular_match_score
from src.services.clustering_service import ClusteringService
from typing import Dict, Any, Optional, List
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for dev; restrict in prod
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup Supabase Client
SUPABASE_URL = os.environ.get("SUPABASE_URL")
# Use Service Role Key if available to bypass RLS
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
raise RuntimeError("SUPABASE_URL and SUPABASE_KEY (or SUPABASE_SERVICE_ROLE_KEY) must be set in .env")
client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Define the data we expect from the frontend
class ResumeRequest(BaseModel):
user_id: str
file_path: str # e.g., "user_123/resume.pdf"
@app.post("/process-resume")
async def process_resume_endpoint(request: ResumeRequest):
print(f"πŸ”” Signal received: Process resume for {request.user_id}")
try:
# Delegate everything to the unified function
extracted_data = process_resume(client, request.user_id, request.file_path)
return {"status": "success", "data": extracted_data}
except Exception as e:
print(f"❌ Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------
# WEBHOOK ENDPOINTS (Called by Supabase)
# ---------------------------------------------------------------------
class WebhookRequest(BaseModel):
type: str
table: str
record: Dict[str, Any]
schema: str
old_record: Optional[Dict[str, Any]] = None
@app.post("/webhook/storage")
async def storage_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (storage.objects insert).
"""
print(f"πŸ”” Webhook received: {request.type} on {request.table}")
# We only care about INSERTs or UPDATEs (overwrites) to the 'resume' bucket
if request.type not in ["INSERT", "UPDATE"] or request.table != "objects":
return {"status": "ignored"}
# Extract file details from the record
# Object path example: "user_123/123456_resume.pdf"
file_path = request.record.get("name")
bucket_id = request.record.get("bucket_id")
# Check bucket
if bucket_id != "resume":
print(f"⚠️ Ignoring upload to bucket: {bucket_id}")
return {"status": "ignored", "reason": "wrong bucket"}
# Extract User ID (assuming folder structure: user_id/filename)
try:
user_id = file_path.split("/")[0]
except Exception:
print(f"❌ Could not extract user_id from {file_path}")
return {"status": "error", "message": "invalid file path structure"}
print(f"▢️ Triggering processing for {file_path}")
# Call the processing logic
try:
process_resume(client, user_id, file_path)
return {"status": "success"}
except Exception as e:
print(f"❌ Processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/jobs")
async def jobs_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (jobs table UPDATE/INSERT).
"""
print(f"πŸ”” Webhook received: {request.type} on {request.table}")
if request.table != "jobs":
return {"status": "ignored", "reason": "wrong table"}
# We care about INSERT and UPDATE
# For UPDATE, we might want to check if description changed, but for now we runs it anyway
new_record = request.record
job_id = new_record.get("id")
description = new_record.get("description")
experience_level = new_record.get("experience_level")
if not job_id:
print("❌ Webhook missing job_id")
return {"status": "error", "message": "missing id"}
print(f"▢️ Triggering job extraction for Job ID: {job_id}")
try:
# Re-use global client from line 32
process_single_job(client, job_id, description, experience_level)
return {"status": "success"}
except Exception as e:
print(f"❌ Job processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# --- CLUSTERING ENDPOINT ---
@app.post("/run-clustering")
async def run_clustering_endpoint(background_tasks: BackgroundTasks):
"""
Trigger candidate clustering asynchronously.
Returns immediately while clustering runs in the background.
"""
print("πŸš€ Starting clustering pipeline...")
def run_clustering():
"""Background task to run clustering."""
try:
service = ClusteringService()
service.run_clustering_pipeline(n_clusters=5)
print("βœ… Clustering pipeline completed successfully!")
except Exception as e:
print(f"❌ Clustering failed: {e}")
# Add to background tasks
background_tasks.add_task(run_clustering)
return {
"status": "started",
"message": "Clustering pipeline started. This may take a few minutes. Refresh the page to see updated clusters."
}
@app.post("/analyze-ats")
async def analyze_ats_endpoint(
resume: UploadFile = File(...),
job_description: str = Form(...)
):
"""
Real-time ATS analysis endpoint.
Does not save to DB (unless you want to add that logic).
"""
print(f"πŸ” Analyzing ATS compatibility for: {resume.filename}")
try:
result = await analyze_ats_compatibility(resume, job_description)
return {"status": "success", "data": result}
except Exception as e:
print(f"❌ ATS Analysis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------
# ---------------------------------------------------------------------
# CORE ANALYSIS LOGIC
# ---------------------------------------------------------------------
async def perform_candidate_analysis(candidate_id: str, job_id: str, force_refresh: bool = False):
"""
Shared logic to analyze a candidate for a job.
Checks for cached results first unless force_refresh is True.
"""
# 0. Check for existing analysis in applications table
if not force_refresh:
app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
if app_resp.data and app_resp.data[0].get("ai_summary"):
app_data = app_resp.data[0]
insights = app_data.get("ai_insights") or {}
# Use cached missing_skills if available
if "missing_skills" in insights:
print(f"βœ… Found fully cached analysis for {candidate_id}")
return {
"summary": app_data.get("ai_summary"),
"strengths": insights.get("strengths") or [],
"weaknesses": insights.get("weaknesses") or [],
"score": app_data.get("AI_score") or 0,
"semantic_score": app_data.get("semantic_score") or 0,
"score_breakdown": insights.get("score_breakdown") or {},
"missing_skills": insights.get("missing_skills") or []
}
# 1. Fetch Candidate Data
prof_resp = client.table("profiles").select("*").eq("id", candidate_id).execute()
if not prof_resp.data:
raise HTTPException(status_code=404, detail="Candidate not found")
# 0.5 Granular Match Score (Vector Similarity)
semantic_result = await calculate_granular_match_score(client, candidate_id, job_id)
profile = prof_resp.data[0]
# 2. Fetch Job Data
job_description = ""
job_skills = []
if job_id:
job_resp = client.table("jobs").select("*").eq("id", job_id).execute()
if job_resp.data:
job = job_resp.data[0]
job_description = job.get("description") or ""
raw_job_skills = job.get("skills") if job.get("skills") else job.get("technical_skills")
if isinstance(raw_job_skills, str):
job_skills = [s.strip() for s in raw_job_skills.split(",") if s.strip()]
elif isinstance(raw_job_skills, list):
job_skills = raw_job_skills
# 3. Prepare Profile Skills
profile_skills = []
raw_skills = profile.get("skills") or []
if isinstance(raw_skills, str):
profile_skills = [s.strip() for s in raw_skills.split(",") if s.strip()]
else:
profile_skills = raw_skills
raw_tech_skills = profile.get("technical_skills") or []
if isinstance(raw_tech_skills, str):
profile_skills.extend([s.strip() for s in raw_tech_skills.split(",") if s.strip()])
else:
profile_skills.extend(raw_tech_skills)
# 4. Identify Missing Skills (Semantic)
missing = []
if job_skills:
missing = identify_missing_skills(job_skills, profile_skills)
# 4a. If we had half-cached data (summary exists but missing_skills don't)
# We check cache again just in case it was partially filled
app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
if not force_refresh and app_resp.data and app_resp.data[0].get("ai_summary"):
app_data = app_resp.data[0]
insights = app_data.get("ai_insights") or {}
return {
"summary": app_data.get("ai_summary"),
"weaknesses": insights.get("weaknesses") or [],
"score": app_data.get("AI_score") or 0,
"semantic_score": semantic_result.get("total_score"),
"score_breakdown": semantic_result.get("breakdown"),
"missing_skills": missing
}
# 5. Generate fresh AI Insights
profile_summary = f"""
Role: {profile.get('role')}
Headline: {profile.get('headline')}
Summary: {profile.get('summary')}
Experience: {profile.get('experience_years')} years
Work Experience: {profile.get('work_experience')}
Education: {profile.get('education')}
Skills: {", ".join(profile_skills)}
"""
ai_insights = generate_ai_analysis(profile_summary, job_description)
# 6. Persist to Database
try:
breakdown = semantic_result.get("breakdown") or {}
data_to_save = {
"ai_summary": ai_insights.get("summary"),
"ai_insights": {
"weaknesses": ai_insights.get("weaknesses") or [],
"missing_skills": missing,
"score_breakdown": breakdown
},
"AI_score": int(ai_insights.get("score") or 0),
"match_score": int(semantic_result.get("total_score") or 0),
# Granular Scores mapping to table columns
"skills_match": int(breakdown.get("skills", 0)),
"technical_skills_match": int(breakdown.get("technical_skills", 0)),
"work_experience_match": int(breakdown.get("experience", 0)),
"education_match": int(breakdown.get("education", 0)),
"certifications_match": int(breakdown.get("certifications", 0)),
"project_match": int(breakdown.get("projects", 0))
}
client.table("applications").update(data_to_save).eq("user_id", candidate_id).eq("job_id", job_id).execute()
print(f"πŸ’Ύ Persisted AI analysis and granular scores for candidate {candidate_id}")
except Exception as db_err:
print(f"⚠️ Failed to persist AI analysis: {db_err}")
return {
"summary": ai_insights.get("summary"),
"strengths": ai_insights.get("strengths"),
"weaknesses": ai_insights.get("weaknesses"),
"score": ai_insights.get("score") or 0,
"semantic_score": semantic_result.get("total_score"),
"score_breakdown": semantic_result.get("breakdown"),
"missing_skills": missing
}
# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------
class AnalysisRequest(BaseModel):
candidate_id: str
job_id: Optional[str] = None
@app.post("/analyze-candidate")
async def analyze_candidate_endpoint(request: AnalysisRequest):
"""
Detailed candidate analysis: Summary + Missing Skills.
"""
print(f"πŸ”¬ Analyzing candidate {request.candidate_id} for job {request.job_id}")
if not request.job_id:
raise HTTPException(status_code=400, detail="job_id is required for analysis")
try:
result = await perform_candidate_analysis(request.candidate_id, request.job_id)
return {"status": "success", "data": result}
except Exception as e:
print(f"❌ Analysis endpoint failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/applications")
async def applications_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (applications table UPDATE/INSERT).
"""
print(f"πŸ”” Webhook received: {request.type} on {request.table}")
if request.table != "applications":
return {"status": "ignored", "reason": "wrong table"}
record = request.record
# Avoid infinite loop: if this update already contains ai_summary, ignore it
if request.type == "UPDATE" and record.get("ai_summary"):
return {"status": "skipped", "reason": "already analyzed"}
candidate_id = record.get("user_id")
job_id = record.get("job_id")
if not candidate_id or not job_id:
return {"status": "error", "message": "missing ids"}
print(f"▢️ Auto-triggering AI analysis for {candidate_id} / {job_id}")
try:
await perform_candidate_analysis(candidate_id, job_id)
return {"status": "success"}
except Exception as e:
print(f"❌ Webhook analysis failed: {e}")
return {"status": "error", "message": str(e)}
# Run with: uvicorn api:app --reload