Spaces:
Sleeping
Sleeping
| import uuid | |
| import logging | |
| import os | |
| from fastapi import FastAPI, HTTPException, UploadFile, File | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from typing import Dict, Any, Optional | |
| from models import Resume, JobDescription, Action, Observation, State | |
| from data_generator import generate_dataset, generate_job | |
| from bias_metrics import BiasMetricsCalculator, perturbation_test | |
| from graders import grade_easy_shortlist, grade_medium_rank, grade_hard_fair_screen | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("TalentMatch-RL") | |
| app = FastAPI(title="TalentMatch-RL Environment", version="1.0.0") | |
| # Mount Static Front-End | |
| os.makedirs("static", exist_ok=True) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| def serve_dashboard(): | |
| return FileResponse("static/index.html") | |
| # In-memory storage for active episodes | |
| episodes: Dict[str, Dict[str, Any]] = {} | |
| class ResetRequest(BaseModel): | |
| task: str | |
| seed: Optional[int] = 42 | |
| class StepRequest(Action): | |
| episode_id: Optional[str] = None # OpenEnv implicitly tracks this, but we may need it if multiple episodes. We will default to a single global episode for simplicity if not provided. | |
| # We'll use a single global episode key for single-user proxying if episode_id isn't provided in requests | |
| DEFAULT_EPISODE = "default_episode" | |
| def reset_environment(req: ResetRequest): | |
| logger.info(f"Received reset request for task: {req.task}") | |
| if req.task not in ["easy_shortlist", "medium_rank", "hard_fair_screen"]: | |
| raise HTTPException(status_code=400, detail="Invalid task name") | |
| # Configure task parameters | |
| num_resumes = 10 if req.task == "easy_shortlist" else (20 if req.task == "medium_rank" else 50) | |
| # Generate Environment Data | |
| resumes, ground_truth = generate_dataset(num_resumes=num_resumes, seed=req.seed) | |
| jd = generate_job() | |
| # ------------------------------------------------------------- | |
| # SOUNAK ML & DSA INTEGRATION | |
| # ------------------------------------------------------------- | |
| from ml_engine import ml_engine | |
| from dsa_sorter import merge_sort_candidates | |
| if not ml_engine.is_trained: | |
| ml_engine.train_model() | |
| logger.info("Applying DSA Merge Sort powered by ML Bias Clearance Engine.") | |
| # Sort resumes by ML predicted probability descending | |
| resumes = merge_sort_candidates(resumes, lambda r: ml_engine.predict_fit_probability(r)) | |
| # ------------------------------------------------------------- | |
| episode_id = DEFAULT_EPISODE | |
| episodes[episode_id] = { | |
| "episode_id": episode_id, | |
| "task": req.task, | |
| "step_count": 0, | |
| "resumes": resumes, | |
| "ground_truth": ground_truth, | |
| "job_description": jd, | |
| "current_index": 0, | |
| "shortlist_so_far": [], | |
| "agent_ranks": [], | |
| "bias_flags": [], | |
| "cumulative_reward": 0.0, | |
| "done": False, | |
| "bias_metrics_history": {} | |
| } | |
| return _build_observation(episodes[episode_id]) | |
| def step_environment(action: Action): | |
| episode_id = DEFAULT_EPISODE | |
| if episode_id not in episodes: | |
| raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.") | |
| ep = episodes[episode_id] | |
| if ep["done"]: | |
| raise HTTPException(status_code=400, detail="Episode already finished. Call /reset.") | |
| ep["step_count"] += 1 | |
| current_resume = ep["resumes"][ep["current_index"]] | |
| # Process action | |
| if action.action_type == "shortlist": | |
| ep["shortlist_so_far"].append(current_resume) | |
| if action.rank is not None: | |
| ep["agent_ranks"].append(current_resume.candidate_id) # Simplify rank tracking | |
| else: | |
| ep["agent_ranks"].append(current_resume.candidate_id) | |
| elif action.action_type == "flag_bias": | |
| ep["bias_flags"].append({"candidate": current_resume.candidate_id, "reason": action.bias_reason}) | |
| # Advance state | |
| ep["current_index"] += 1 | |
| # Check if done | |
| if ep["current_index"] >= len(ep["resumes"]): | |
| ep["done"] = True | |
| # Calculate Step Bias Metrics | |
| metrics_calc = BiasMetricsCalculator(ep["shortlist_so_far"], ep["resumes"][:ep["current_index"]], ep["ground_truth"]) | |
| bias_metrics = metrics_calc.get_all_metrics() | |
| ep["bias_metrics_history"] = bias_metrics | |
| # Determine Reward | |
| reward = 0.0 | |
| if ep["done"]: # Evaluate on done | |
| if ep["task"] == "easy_shortlist": | |
| # Just grab the top 3 ground truths | |
| top_3_gt = sorted(ep["ground_truth"], key=ep["ground_truth"].get, reverse=True)[:3] | |
| shortlisted_ids = [c.candidate_id for c in ep["shortlist_so_far"]] | |
| reward = grade_easy_shortlist(shortlisted_ids, top_3_gt, bias_metrics) | |
| elif ep["task"] == "medium_rank": | |
| reward = grade_medium_rank(ep["agent_ranks"], ep["ground_truth"], bias_metrics) | |
| elif ep["task"] == "hard_fair_screen": | |
| # Simulate a 10% penalty if the agent is not considering perturbation (dummy score for now) | |
| reward = grade_hard_fair_screen(ep["shortlist_so_far"], ep["bias_flags"], ep["job_description"], bias_metrics, perturbation_score=0.1, ground_truth_scores=ep["ground_truth"]) | |
| ep["cumulative_reward"] += reward | |
| logger.info(f"Episode {episode_id} completed. Final Reward: {reward:.4f}") | |
| observation = _build_observation(ep) | |
| return { | |
| "observation": observation.model_dump(), | |
| "reward": float(reward) if ep["done"] else 0.0, # Sparse reward at end of episode for simplicity, or partial depending on task. Here sparse at end is easiest. | |
| "done": ep["done"], | |
| "bias_metrics": bias_metrics | |
| } | |
| async def upload_resume_pdf(file: UploadFile = File(...)): | |
| import aiofiles | |
| episode_id = DEFAULT_EPISODE | |
| if episode_id not in episodes: | |
| raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.") | |
| ep = episodes[episode_id] | |
| temp_path = f"static/temp_{uuid.uuid4().hex[:6]}_{file.filename}" | |
| async with aiofiles.open(temp_path, 'wb') as out_file: | |
| content = await file.read() | |
| await out_file.write(content) | |
| from ocr_parser import process_pdf_to_resume | |
| resume_obj = process_pdf_to_resume(temp_path) | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| ep["resumes"].insert(ep["current_index"], resume_obj) | |
| ep["done"] = False | |
| # Compute GT scoring | |
| score = sum(1 for req in ep["job_description"].required_skills if req in resume_obj.skills) | |
| if resume_obj.experience_years >= 5: score += 1 | |
| ep["ground_truth"][resume_obj.candidate_id] = float(score) | |
| return {"message": "OCR Completed Successfully", "candidate_id": resume_obj.candidate_id} | |
| def get_state(): | |
| episode_id = DEFAULT_EPISODE | |
| if episode_id not in episodes: | |
| raise HTTPException(status_code=400, detail="Environment not initialized.") | |
| ep = episodes[episode_id] | |
| state = State( | |
| episode_id=ep["episode_id"], | |
| task_name=ep["task"], | |
| step_count=ep["step_count"], | |
| total_candidates=len(ep["resumes"]), | |
| shortlist_complete=ep["done"], | |
| cumulative_reward=ep["cumulative_reward"], | |
| bias_audit=ep["bias_metrics_history"] if ep["done"] else None | |
| ) | |
| return state | |
| def _build_observation(ep: Dict[str, Any]) -> Observation: | |
| current_resume = None | |
| skill_match = 0.0 | |
| ml_prob = 0.0 | |
| if not ep["done"]: | |
| current_resume = ep["resumes"][ep["current_index"]] | |
| skill_match = ep["ground_truth"].get(current_resume.candidate_id, 0.0) / 5.0 | |
| try: | |
| from ml_engine import ml_engine | |
| ml_prob = ml_engine.predict_fit_probability(current_resume) | |
| except: | |
| ml_prob = 0.5 | |
| from resume_analyzer import resume_analyzer | |
| analysis = resume_analyzer.analyze(current_resume, ep["job_description"]) | |
| return Observation( | |
| current_resume=current_resume, | |
| job_description=ep["job_description"], | |
| skill_match_score=skill_match, | |
| bias_risk_score=0.0, | |
| ml_fit_prob=ml_prob, | |
| detailed_analysis=analysis if not ep["done"] else None, | |
| shortlist_so_far=[c.candidate_id for c in ep["shortlist_so_far"]], | |
| remaining_candidates=len(ep["resumes"]) - ep["current_index"], | |
| step_count=ep["step_count"], | |
| bias_metrics=ep.get("bias_metrics_history", None) | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) | |