Spaces:
Sleeping
Sleeping
File size: 14,843 Bytes
ea9ca44 ff85727 ea9ca44 35709fb ad01d65 ff85727 35709fb ea9ca44 3535722 ea9ca44 3535722 ea9ca44 3535722 ea9ca44 3535722 ea9ca44 ff85727 ea9ca44 35709fb 3535722 ad01d65 3535722 ad01d65 3535722 ad01d65 3535722 4b3a33f 3535722 ad01d65 4b3a33f 3535722 4b3a33f 3535722 4b3a33f 3535722 ad01d65 3535722 35709fb 3535722 35709fb 3535722 35709fb 3535722 35709fb 3535722 35709fb 3535722 ea9ca44 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | # api.py
import os
from dotenv import load_dotenv
# Load env BEFORE importing modules that depend on it
load_dotenv()
from fastapi import FastAPI, HTTPException, UploadFile, Form, File, BackgroundTasks
from pydantic import BaseModel
from supabase import create_client
from fastapi.middleware.cors import CORSMiddleware
from supabase_ingest import process_resume
from src.extraction.job_extractor import process_single_job
from src.services.ats_service import analyze_ats_compatibility
from src.services.analysis import identify_missing_skills, generate_ai_analysis
from src.matching.similarity import calculate_granular_match_score
from src.services.clustering_service import ClusteringService
from typing import Dict, Any, Optional, List
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for dev; restrict in prod
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup Supabase Client
SUPABASE_URL = os.environ.get("SUPABASE_URL")
# Use Service Role Key if available to bypass RLS
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
raise RuntimeError("SUPABASE_URL and SUPABASE_KEY (or SUPABASE_SERVICE_ROLE_KEY) must be set in .env")
client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Define the data we expect from the frontend
class ResumeRequest(BaseModel):
user_id: str
file_path: str # e.g., "user_123/resume.pdf"
@app.post("/process-resume")
async def process_resume_endpoint(request: ResumeRequest):
print(f"π Signal received: Process resume for {request.user_id}")
try:
# Delegate everything to the unified function
extracted_data = process_resume(client, request.user_id, request.file_path)
return {"status": "success", "data": extracted_data}
except Exception as e:
print(f"β Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------
# WEBHOOK ENDPOINTS (Called by Supabase)
# ---------------------------------------------------------------------
class WebhookRequest(BaseModel):
type: str
table: str
record: Dict[str, Any]
schema: str
old_record: Optional[Dict[str, Any]] = None
@app.post("/webhook/storage")
async def storage_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (storage.objects insert).
"""
print(f"π Webhook received: {request.type} on {request.table}")
# We only care about INSERTs or UPDATEs (overwrites) to the 'resume' bucket
if request.type not in ["INSERT", "UPDATE"] or request.table != "objects":
return {"status": "ignored"}
# Extract file details from the record
# Object path example: "user_123/123456_resume.pdf"
file_path = request.record.get("name")
bucket_id = request.record.get("bucket_id")
# Check bucket
if bucket_id != "resume":
print(f"β οΈ Ignoring upload to bucket: {bucket_id}")
return {"status": "ignored", "reason": "wrong bucket"}
# Extract User ID (assuming folder structure: user_id/filename)
try:
user_id = file_path.split("/")[0]
except Exception:
print(f"β Could not extract user_id from {file_path}")
return {"status": "error", "message": "invalid file path structure"}
print(f"βΆοΈ Triggering processing for {file_path}")
# Call the processing logic
try:
process_resume(client, user_id, file_path)
return {"status": "success"}
except Exception as e:
print(f"β Processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/jobs")
async def jobs_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (jobs table UPDATE/INSERT).
"""
print(f"π Webhook received: {request.type} on {request.table}")
if request.table != "jobs":
return {"status": "ignored", "reason": "wrong table"}
# We care about INSERT and UPDATE
# For UPDATE, we might want to check if description changed, but for now we runs it anyway
new_record = request.record
job_id = new_record.get("id")
description = new_record.get("description")
experience_level = new_record.get("experience_level")
if not job_id:
print("β Webhook missing job_id")
return {"status": "error", "message": "missing id"}
print(f"βΆοΈ Triggering job extraction for Job ID: {job_id}")
try:
# Re-use global client from line 32
process_single_job(client, job_id, description, experience_level)
return {"status": "success"}
except Exception as e:
print(f"β Job processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# --- CLUSTERING ENDPOINT ---
@app.post("/run-clustering")
async def run_clustering_endpoint(background_tasks: BackgroundTasks):
"""
Trigger candidate clustering asynchronously.
Returns immediately while clustering runs in the background.
"""
print("π Starting clustering pipeline...")
def run_clustering():
"""Background task to run clustering."""
try:
service = ClusteringService()
service.run_clustering_pipeline(n_clusters=5)
print("β
Clustering pipeline completed successfully!")
except Exception as e:
print(f"β Clustering failed: {e}")
# Add to background tasks
background_tasks.add_task(run_clustering)
return {
"status": "started",
"message": "Clustering pipeline started. This may take a few minutes. Refresh the page to see updated clusters."
}
@app.post("/analyze-ats")
async def analyze_ats_endpoint(
resume: UploadFile = File(...),
job_description: str = Form(...)
):
"""
Real-time ATS analysis endpoint.
Does not save to DB (unless you want to add that logic).
"""
print(f"π Analyzing ATS compatibility for: {resume.filename}")
try:
result = await analyze_ats_compatibility(resume, job_description)
return {"status": "success", "data": result}
except Exception as e:
print(f"β ATS Analysis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------
# ---------------------------------------------------------------------
# CORE ANALYSIS LOGIC
# ---------------------------------------------------------------------
async def perform_candidate_analysis(candidate_id: str, job_id: str, force_refresh: bool = False):
"""
Shared logic to analyze a candidate for a job.
Checks for cached results first unless force_refresh is True.
"""
# 0. Check for existing analysis in applications table
if not force_refresh:
app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
if app_resp.data and app_resp.data[0].get("ai_summary"):
app_data = app_resp.data[0]
insights = app_data.get("ai_insights") or {}
# Use cached missing_skills if available
if "missing_skills" in insights:
print(f"β
Found fully cached analysis for {candidate_id}")
return {
"summary": app_data.get("ai_summary"),
"strengths": insights.get("strengths") or [],
"weaknesses": insights.get("weaknesses") or [],
"score": app_data.get("AI_score") or 0,
"semantic_score": app_data.get("semantic_score") or 0,
"score_breakdown": insights.get("score_breakdown") or {},
"missing_skills": insights.get("missing_skills") or []
}
# 1. Fetch Candidate Data
prof_resp = client.table("profiles").select("*").eq("id", candidate_id).execute()
if not prof_resp.data:
raise HTTPException(status_code=404, detail="Candidate not found")
# 0.5 Granular Match Score (Vector Similarity)
semantic_result = await calculate_granular_match_score(client, candidate_id, job_id)
profile = prof_resp.data[0]
# 2. Fetch Job Data
job_description = ""
job_skills = []
if job_id:
job_resp = client.table("jobs").select("*").eq("id", job_id).execute()
if job_resp.data:
job = job_resp.data[0]
job_description = job.get("description") or ""
raw_job_skills = job.get("skills") if job.get("skills") else job.get("technical_skills")
if isinstance(raw_job_skills, str):
job_skills = [s.strip() for s in raw_job_skills.split(",") if s.strip()]
elif isinstance(raw_job_skills, list):
job_skills = raw_job_skills
# 3. Prepare Profile Skills
profile_skills = []
raw_skills = profile.get("skills") or []
if isinstance(raw_skills, str):
profile_skills = [s.strip() for s in raw_skills.split(",") if s.strip()]
else:
profile_skills = raw_skills
raw_tech_skills = profile.get("technical_skills") or []
if isinstance(raw_tech_skills, str):
profile_skills.extend([s.strip() for s in raw_tech_skills.split(",") if s.strip()])
else:
profile_skills.extend(raw_tech_skills)
# 4. Identify Missing Skills (Semantic)
missing = []
if job_skills:
missing = identify_missing_skills(job_skills, profile_skills)
# 4a. If we had half-cached data (summary exists but missing_skills don't)
# We check cache again just in case it was partially filled
app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
if not force_refresh and app_resp.data and app_resp.data[0].get("ai_summary"):
app_data = app_resp.data[0]
insights = app_data.get("ai_insights") or {}
return {
"summary": app_data.get("ai_summary"),
"weaknesses": insights.get("weaknesses") or [],
"score": app_data.get("AI_score") or 0,
"semantic_score": semantic_result.get("total_score"),
"score_breakdown": semantic_result.get("breakdown"),
"missing_skills": missing
}
# 5. Generate fresh AI Insights
profile_summary = f"""
Role: {profile.get('role')}
Headline: {profile.get('headline')}
Summary: {profile.get('summary')}
Experience: {profile.get('experience_years')} years
Work Experience: {profile.get('work_experience')}
Education: {profile.get('education')}
Skills: {", ".join(profile_skills)}
"""
ai_insights = generate_ai_analysis(profile_summary, job_description)
# 6. Persist to Database
try:
breakdown = semantic_result.get("breakdown") or {}
data_to_save = {
"ai_summary": ai_insights.get("summary"),
"ai_insights": {
"weaknesses": ai_insights.get("weaknesses") or [],
"missing_skills": missing,
"score_breakdown": breakdown
},
"AI_score": int(ai_insights.get("score") or 0),
"match_score": int(semantic_result.get("total_score") or 0),
# Granular Scores mapping to table columns
"skills_match": int(breakdown.get("skills", 0)),
"technical_skills_match": int(breakdown.get("technical_skills", 0)),
"work_experience_match": int(breakdown.get("experience", 0)),
"education_match": int(breakdown.get("education", 0)),
"certifications_match": int(breakdown.get("certifications", 0)),
"project_match": int(breakdown.get("projects", 0))
}
client.table("applications").update(data_to_save).eq("user_id", candidate_id).eq("job_id", job_id).execute()
print(f"πΎ Persisted AI analysis and granular scores for candidate {candidate_id}")
except Exception as db_err:
print(f"β οΈ Failed to persist AI analysis: {db_err}")
return {
"summary": ai_insights.get("summary"),
"strengths": ai_insights.get("strengths"),
"weaknesses": ai_insights.get("weaknesses"),
"score": ai_insights.get("score") or 0,
"semantic_score": semantic_result.get("total_score"),
"score_breakdown": semantic_result.get("breakdown"),
"missing_skills": missing
}
# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------
class AnalysisRequest(BaseModel):
candidate_id: str
job_id: Optional[str] = None
@app.post("/analyze-candidate")
async def analyze_candidate_endpoint(request: AnalysisRequest):
"""
Detailed candidate analysis: Summary + Missing Skills.
"""
print(f"π¬ Analyzing candidate {request.candidate_id} for job {request.job_id}")
if not request.job_id:
raise HTTPException(status_code=400, detail="job_id is required for analysis")
try:
result = await perform_candidate_analysis(request.candidate_id, request.job_id)
return {"status": "success", "data": result}
except Exception as e:
print(f"β Analysis endpoint failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/applications")
async def applications_webhook(request: WebhookRequest):
"""
Handles Database Webhooks from Supabase (applications table UPDATE/INSERT).
"""
print(f"π Webhook received: {request.type} on {request.table}")
if request.table != "applications":
return {"status": "ignored", "reason": "wrong table"}
record = request.record
# Avoid infinite loop: if this update already contains ai_summary, ignore it
if request.type == "UPDATE" and record.get("ai_summary"):
return {"status": "skipped", "reason": "already analyzed"}
candidate_id = record.get("user_id")
job_id = record.get("job_id")
if not candidate_id or not job_id:
return {"status": "error", "message": "missing ids"}
print(f"βΆοΈ Auto-triggering AI analysis for {candidate_id} / {job_id}")
try:
await perform_candidate_analysis(candidate_id, job_id)
return {"status": "success"}
except Exception as e:
print(f"β Webhook analysis failed: {e}")
return {"status": "error", "message": str(e)}
# Run with: uvicorn api:app --reload |