File size: 14,843 Bytes
ea9ca44
 
 
 
 
 
 
ff85727
ea9ca44
 
 
 
 
 
35709fb
ad01d65
ff85727
35709fb
ea9ca44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3535722
ea9ca44
 
3535722
ea9ca44
 
 
 
 
 
 
3535722
ea9ca44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3535722
ea9ca44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff85727
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea9ca44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35709fb
 
 
 
3535722
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad01d65
 
3535722
 
 
 
 
 
 
 
ad01d65
 
 
3535722
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad01d65
 
3535722
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4b3a33f
3535722
 
 
 
ad01d65
4b3a33f
3535722
4b3a33f
 
 
 
 
 
 
 
 
3535722
 
4b3a33f
3535722
 
 
 
 
 
 
 
ad01d65
 
3535722
 
 
 
 
 
 
35709fb
 
 
 
 
 
 
 
 
 
 
3535722
 
 
35709fb
3535722
 
 
 
 
35709fb
3535722
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35709fb
3535722
 
 
 
35709fb
3535722
 
ea9ca44
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# api.py
import os
from dotenv import load_dotenv

# Load env BEFORE importing modules that depend on it
load_dotenv()

from fastapi import FastAPI, HTTPException, UploadFile, Form, File, BackgroundTasks
from pydantic import BaseModel
from supabase import create_client
from fastapi.middleware.cors import CORSMiddleware
from supabase_ingest import process_resume
from src.extraction.job_extractor import process_single_job
from src.services.ats_service import analyze_ats_compatibility
from src.services.analysis import identify_missing_skills, generate_ai_analysis
from src.matching.similarity import calculate_granular_match_score
from src.services.clustering_service import ClusteringService
from typing import Dict, Any, Optional, List


app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], # Allow all origins for dev; restrict in prod
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Setup Supabase Client
SUPABASE_URL = os.environ.get("SUPABASE_URL")
# Use Service Role Key if available to bypass RLS
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")

if not SUPABASE_URL or not SUPABASE_KEY:
    raise RuntimeError("SUPABASE_URL and SUPABASE_KEY (or SUPABASE_SERVICE_ROLE_KEY) must be set in .env")

client = create_client(SUPABASE_URL, SUPABASE_KEY)

# Define the data we expect from the frontend
class ResumeRequest(BaseModel):
    user_id: str
    file_path: str  # e.g., "user_123/resume.pdf"

@app.post("/process-resume")
async def process_resume_endpoint(request: ResumeRequest):
    print(f"πŸ”” Signal received: Process resume for {request.user_id}")

    try:
        # Delegate everything to the unified function
        extracted_data = process_resume(client, request.user_id, request.file_path)
        return {"status": "success", "data": extracted_data}

    except Exception as e:
        print(f"❌ Error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# ---------------------------------------------------------------------
# WEBHOOK ENDPOINTS (Called by Supabase)
# ---------------------------------------------------------------------

class WebhookRequest(BaseModel):
    type: str
    table: str
    record: Dict[str, Any]
    schema: str
    old_record: Optional[Dict[str, Any]] = None

@app.post("/webhook/storage")
async def storage_webhook(request: WebhookRequest):
    """
    Handles Database Webhooks from Supabase (storage.objects insert).
    """
    print(f"πŸ”” Webhook received: {request.type} on {request.table}")

    # We only care about INSERTs or UPDATEs (overwrites) to the 'resume' bucket
    if request.type not in ["INSERT", "UPDATE"] or request.table != "objects":
        return {"status": "ignored"}
    
    # Extract file details from the record
    # Object path example: "user_123/123456_resume.pdf"
    file_path = request.record.get("name")
    bucket_id = request.record.get("bucket_id")
    
    # Check bucket
    if bucket_id != "resume":
        print(f"⚠️ Ignoring upload to bucket: {bucket_id}")
        return {"status": "ignored", "reason": "wrong bucket"}

    # Extract User ID (assuming folder structure: user_id/filename)
    try:
        user_id = file_path.split("/")[0]
    except Exception:
        print(f"❌ Could not extract user_id from {file_path}")
        return {"status": "error", "message": "invalid file path structure"}

    print(f"▢️ Triggering processing for {file_path}")
    
    # Call the processing logic
    try:
        process_resume(client, user_id, file_path)
        return {"status": "success"}
    except Exception as e:
        print(f"❌ Processing failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/webhook/jobs")
async def jobs_webhook(request: WebhookRequest):
    """
    Handles Database Webhooks from Supabase (jobs table UPDATE/INSERT).
    """
    print(f"πŸ”” Webhook received: {request.type} on {request.table}")

    if request.table != "jobs":
        return {"status": "ignored", "reason": "wrong table"}
    
    # We care about INSERT and UPDATE
    # For UPDATE, we might want to check if description changed, but for now we runs it anyway
    
    new_record = request.record
    job_id = new_record.get("id")
    description = new_record.get("description")
    experience_level = new_record.get("experience_level")
    
    if not job_id:
        print("❌ Webhook missing job_id")
        return {"status": "error", "message": "missing id"}

    print(f"▢️ Triggering job extraction for Job ID: {job_id}")

    try:
        # Re-use global client from line 32
        process_single_job(client, job_id, description, experience_level)
        return {"status": "success"}
    except Exception as e:
        print(f"❌ Job processing failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# --- CLUSTERING ENDPOINT ---
@app.post("/run-clustering")
async def run_clustering_endpoint(background_tasks: BackgroundTasks):
    """
    Trigger candidate clustering asynchronously.
    Returns immediately while clustering runs in the background.
    """
    print("πŸš€ Starting clustering pipeline...")
    
    def run_clustering():
        """Background task to run clustering."""
        try:
            service = ClusteringService()
            service.run_clustering_pipeline(n_clusters=5)
            print("βœ… Clustering pipeline completed successfully!")
        except Exception as e:
            print(f"❌ Clustering failed: {e}")
    
    # Add to background tasks
    background_tasks.add_task(run_clustering)
    
    return {
        "status": "started",
        "message": "Clustering pipeline started. This may take a few minutes. Refresh the page to see updated clusters."
    }

@app.post("/analyze-ats")
async def analyze_ats_endpoint(
    resume: UploadFile = File(...), 
    job_description: str = Form(...)
):
    """
    Real-time ATS analysis endpoint. 
    Does not save to DB (unless you want to add that logic).
    """
    print(f"πŸ” Analyzing ATS compatibility for: {resume.filename}")
    try:
        result = await analyze_ats_compatibility(resume, job_description)
        return {"status": "success", "data": result}
    except Exception as e:
        print(f"❌ ATS Analysis failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------

# ---------------------------------------------------------------------
# CORE ANALYSIS LOGIC
# ---------------------------------------------------------------------

async def perform_candidate_analysis(candidate_id: str, job_id: str, force_refresh: bool = False):
    """
    Shared logic to analyze a candidate for a job.
    Checks for cached results first unless force_refresh is True.
    """
    # 0. Check for existing analysis in applications table
    if not force_refresh:
        app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
        
        if app_resp.data and app_resp.data[0].get("ai_summary"):
            app_data = app_resp.data[0]
            insights = app_data.get("ai_insights") or {}
            
            # Use cached missing_skills if available
            if "missing_skills" in insights:
                print(f"βœ… Found fully cached analysis for {candidate_id}")
                return {
                    "summary": app_data.get("ai_summary"),
                    "strengths": insights.get("strengths") or [],
                    "weaknesses": insights.get("weaknesses") or [],
                    "score": app_data.get("AI_score") or 0,
                    "semantic_score": app_data.get("semantic_score") or 0,
                    "score_breakdown": insights.get("score_breakdown") or {},
                    "missing_skills": insights.get("missing_skills") or []
                }

    # 1. Fetch Candidate Data
    prof_resp = client.table("profiles").select("*").eq("id", candidate_id).execute()
    if not prof_resp.data:
        raise HTTPException(status_code=404, detail="Candidate not found")
    
    # 0.5 Granular Match Score (Vector Similarity)
    semantic_result = await calculate_granular_match_score(client, candidate_id, job_id)
    
    profile = prof_resp.data[0]
    
    # 2. Fetch Job Data
    job_description = ""
    job_skills = []
    
    if job_id:
        job_resp = client.table("jobs").select("*").eq("id", job_id).execute()
        if job_resp.data:
            job = job_resp.data[0]
            job_description = job.get("description") or ""
            raw_job_skills = job.get("skills") if job.get("skills") else job.get("technical_skills")
            if isinstance(raw_job_skills, str):
                job_skills = [s.strip() for s in raw_job_skills.split(",") if s.strip()]
            elif isinstance(raw_job_skills, list):
                job_skills = raw_job_skills

    # 3. Prepare Profile Skills
    profile_skills = []
    raw_skills = profile.get("skills") or []
    if isinstance(raw_skills, str):
        profile_skills = [s.strip() for s in raw_skills.split(",") if s.strip()]
    else:
        profile_skills = raw_skills
        
    raw_tech_skills = profile.get("technical_skills") or []
    if isinstance(raw_tech_skills, str):
        profile_skills.extend([s.strip() for s in raw_tech_skills.split(",") if s.strip()])
    else:
        profile_skills.extend(raw_tech_skills)

    # 4. Identify Missing Skills (Semantic)
    missing = []
    if job_skills:
        missing = identify_missing_skills(job_skills, profile_skills)

    # 4a. If we had half-cached data (summary exists but missing_skills don't)
    # We check cache again just in case it was partially filled
    app_resp = client.table("applications").select("ai_summary, ai_insights, AI_score").eq("user_id", candidate_id).eq("job_id", job_id).execute()
    if not force_refresh and app_resp.data and app_resp.data[0].get("ai_summary"):
        app_data = app_resp.data[0]
        insights = app_data.get("ai_insights") or {}
        return {
            "summary": app_data.get("ai_summary"),
            "weaknesses": insights.get("weaknesses") or [],
            "score": app_data.get("AI_score") or 0,
            "semantic_score": semantic_result.get("total_score"),
            "score_breakdown": semantic_result.get("breakdown"),
            "missing_skills": missing
        }

    # 5. Generate fresh AI Insights
    profile_summary = f"""
    Role: {profile.get('role')}
    Headline: {profile.get('headline')}
    Summary: {profile.get('summary')}
    Experience: {profile.get('experience_years')} years
    Work Experience: {profile.get('work_experience')}
    Education: {profile.get('education')}
    Skills: {", ".join(profile_skills)}
    """
    
    ai_insights = generate_ai_analysis(profile_summary, job_description)
    
    # 6. Persist to Database
    try:
        breakdown = semantic_result.get("breakdown") or {}
        data_to_save = {
            "ai_summary": ai_insights.get("summary"),
            "ai_insights": {
                "weaknesses": ai_insights.get("weaknesses") or [],
                "missing_skills": missing,
                "score_breakdown": breakdown
            },
            "AI_score": int(ai_insights.get("score") or 0),
            "match_score": int(semantic_result.get("total_score") or 0),
            # Granular Scores mapping to table columns
            "skills_match": int(breakdown.get("skills", 0)),
            "technical_skills_match": int(breakdown.get("technical_skills", 0)),
            "work_experience_match": int(breakdown.get("experience", 0)),
            "education_match": int(breakdown.get("education", 0)),
            "certifications_match": int(breakdown.get("certifications", 0)),
            "project_match": int(breakdown.get("projects", 0))
        }
        client.table("applications").update(data_to_save).eq("user_id", candidate_id).eq("job_id", job_id).execute()
        print(f"πŸ’Ύ Persisted AI analysis and granular scores for candidate {candidate_id}")
    except Exception as db_err:
        print(f"⚠️ Failed to persist AI analysis: {db_err}")

    return {
        "summary": ai_insights.get("summary"),
        "strengths": ai_insights.get("strengths"),
        "weaknesses": ai_insights.get("weaknesses"),
        "score": ai_insights.get("score") or 0,
        "semantic_score": semantic_result.get("total_score"),
        "score_breakdown": semantic_result.get("breakdown"),
        "missing_skills": missing
    }

# ---------------------------------------------------------------------
# CANDIDATE ANALYSIS ENDPOINT
# ---------------------------------------------------------------------

class AnalysisRequest(BaseModel):
    candidate_id: str
    job_id: Optional[str] = None

@app.post("/analyze-candidate")
async def analyze_candidate_endpoint(request: AnalysisRequest):
    """
    Detailed candidate analysis: Summary + Missing Skills.
    """
    print(f"πŸ”¬ Analyzing candidate {request.candidate_id} for job {request.job_id}")
    
    if not request.job_id:
        raise HTTPException(status_code=400, detail="job_id is required for analysis")

    try:
        result = await perform_candidate_analysis(request.candidate_id, request.job_id)
        return {"status": "success", "data": result}
    except Exception as e:
        print(f"❌ Analysis endpoint failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/webhook/applications")
async def applications_webhook(request: WebhookRequest):
    """
    Handles Database Webhooks from Supabase (applications table UPDATE/INSERT).
    """
    print(f"πŸ”” Webhook received: {request.type} on {request.table}")
    
    if request.table != "applications":
        return {"status": "ignored", "reason": "wrong table"}
    
    record = request.record
    # Avoid infinite loop: if this update already contains ai_summary, ignore it
    if request.type == "UPDATE" and record.get("ai_summary"):
        return {"status": "skipped", "reason": "already analyzed"}

    candidate_id = record.get("user_id")
    job_id = record.get("job_id")

    if not candidate_id or not job_id:
        return {"status": "error", "message": "missing ids"}

    print(f"▢️ Auto-triggering AI analysis for {candidate_id} / {job_id}")
    try:
        await perform_candidate_analysis(candidate_id, job_id)
        return {"status": "success"}
    except Exception as e:
        print(f"❌ Webhook analysis failed: {e}")
        return {"status": "error", "message": str(e)}

# Run with: uvicorn api:app --reload