algotrix / app.py
sounnak100's picture
feat: Add Sounak's Renew System for Deep Resume Analysis
086d80e
import uuid
import logging
import os
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import Dict, Any, Optional
from models import Resume, JobDescription, Action, Observation, State
from data_generator import generate_dataset, generate_job
from bias_metrics import BiasMetricsCalculator, perturbation_test
from graders import grade_easy_shortlist, grade_medium_rank, grade_hard_fair_screen
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("TalentMatch-RL")
app = FastAPI(title="TalentMatch-RL Environment", version="1.0.0")
# Mount Static Front-End
os.makedirs("static", exist_ok=True)
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def serve_dashboard():
return FileResponse("static/index.html")
# In-memory storage for active episodes
episodes: Dict[str, Dict[str, Any]] = {}
class ResetRequest(BaseModel):
task: str
seed: Optional[int] = 42
class StepRequest(Action):
episode_id: Optional[str] = None # OpenEnv implicitly tracks this, but we may need it if multiple episodes. We will default to a single global episode for simplicity if not provided.
# We'll use a single global episode key for single-user proxying if episode_id isn't provided in requests
DEFAULT_EPISODE = "default_episode"
@app.post("/reset", response_model=Observation)
def reset_environment(req: ResetRequest):
logger.info(f"Received reset request for task: {req.task}")
if req.task not in ["easy_shortlist", "medium_rank", "hard_fair_screen"]:
raise HTTPException(status_code=400, detail="Invalid task name")
# Configure task parameters
num_resumes = 10 if req.task == "easy_shortlist" else (20 if req.task == "medium_rank" else 50)
# Generate Environment Data
resumes, ground_truth = generate_dataset(num_resumes=num_resumes, seed=req.seed)
jd = generate_job()
# -------------------------------------------------------------
# SOUNAK ML & DSA INTEGRATION
# -------------------------------------------------------------
from ml_engine import ml_engine
from dsa_sorter import merge_sort_candidates
if not ml_engine.is_trained:
ml_engine.train_model()
logger.info("Applying DSA Merge Sort powered by ML Bias Clearance Engine.")
# Sort resumes by ML predicted probability descending
resumes = merge_sort_candidates(resumes, lambda r: ml_engine.predict_fit_probability(r))
# -------------------------------------------------------------
episode_id = DEFAULT_EPISODE
episodes[episode_id] = {
"episode_id": episode_id,
"task": req.task,
"step_count": 0,
"resumes": resumes,
"ground_truth": ground_truth,
"job_description": jd,
"current_index": 0,
"shortlist_so_far": [],
"agent_ranks": [],
"bias_flags": [],
"cumulative_reward": 0.0,
"done": False,
"bias_metrics_history": {}
}
return _build_observation(episodes[episode_id])
@app.post("/step")
def step_environment(action: Action):
episode_id = DEFAULT_EPISODE
if episode_id not in episodes:
raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.")
ep = episodes[episode_id]
if ep["done"]:
raise HTTPException(status_code=400, detail="Episode already finished. Call /reset.")
ep["step_count"] += 1
current_resume = ep["resumes"][ep["current_index"]]
# Process action
if action.action_type == "shortlist":
ep["shortlist_so_far"].append(current_resume)
if action.rank is not None:
ep["agent_ranks"].append(current_resume.candidate_id) # Simplify rank tracking
else:
ep["agent_ranks"].append(current_resume.candidate_id)
elif action.action_type == "flag_bias":
ep["bias_flags"].append({"candidate": current_resume.candidate_id, "reason": action.bias_reason})
# Advance state
ep["current_index"] += 1
# Check if done
if ep["current_index"] >= len(ep["resumes"]):
ep["done"] = True
# Calculate Step Bias Metrics
metrics_calc = BiasMetricsCalculator(ep["shortlist_so_far"], ep["resumes"][:ep["current_index"]], ep["ground_truth"])
bias_metrics = metrics_calc.get_all_metrics()
ep["bias_metrics_history"] = bias_metrics
# Determine Reward
reward = 0.0
if ep["done"]: # Evaluate on done
if ep["task"] == "easy_shortlist":
# Just grab the top 3 ground truths
top_3_gt = sorted(ep["ground_truth"], key=ep["ground_truth"].get, reverse=True)[:3]
shortlisted_ids = [c.candidate_id for c in ep["shortlist_so_far"]]
reward = grade_easy_shortlist(shortlisted_ids, top_3_gt, bias_metrics)
elif ep["task"] == "medium_rank":
reward = grade_medium_rank(ep["agent_ranks"], ep["ground_truth"], bias_metrics)
elif ep["task"] == "hard_fair_screen":
# Simulate a 10% penalty if the agent is not considering perturbation (dummy score for now)
reward = grade_hard_fair_screen(ep["shortlist_so_far"], ep["bias_flags"], ep["job_description"], bias_metrics, perturbation_score=0.1, ground_truth_scores=ep["ground_truth"])
ep["cumulative_reward"] += reward
logger.info(f"Episode {episode_id} completed. Final Reward: {reward:.4f}")
observation = _build_observation(ep)
return {
"observation": observation.model_dump(),
"reward": float(reward) if ep["done"] else 0.0, # Sparse reward at end of episode for simplicity, or partial depending on task. Here sparse at end is easiest.
"done": ep["done"],
"bias_metrics": bias_metrics
}
@app.post("/upload_resume")
async def upload_resume_pdf(file: UploadFile = File(...)):
import aiofiles
episode_id = DEFAULT_EPISODE
if episode_id not in episodes:
raise HTTPException(status_code=400, detail="Environment not initialized. Call /reset first.")
ep = episodes[episode_id]
temp_path = f"static/temp_{uuid.uuid4().hex[:6]}_{file.filename}"
async with aiofiles.open(temp_path, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
from ocr_parser import process_pdf_to_resume
resume_obj = process_pdf_to_resume(temp_path)
if os.path.exists(temp_path):
os.remove(temp_path)
ep["resumes"].insert(ep["current_index"], resume_obj)
ep["done"] = False
# Compute GT scoring
score = sum(1 for req in ep["job_description"].required_skills if req in resume_obj.skills)
if resume_obj.experience_years >= 5: score += 1
ep["ground_truth"][resume_obj.candidate_id] = float(score)
return {"message": "OCR Completed Successfully", "candidate_id": resume_obj.candidate_id}
@app.get("/state", response_model=State)
def get_state():
episode_id = DEFAULT_EPISODE
if episode_id not in episodes:
raise HTTPException(status_code=400, detail="Environment not initialized.")
ep = episodes[episode_id]
state = State(
episode_id=ep["episode_id"],
task_name=ep["task"],
step_count=ep["step_count"],
total_candidates=len(ep["resumes"]),
shortlist_complete=ep["done"],
cumulative_reward=ep["cumulative_reward"],
bias_audit=ep["bias_metrics_history"] if ep["done"] else None
)
return state
def _build_observation(ep: Dict[str, Any]) -> Observation:
current_resume = None
skill_match = 0.0
ml_prob = 0.0
if not ep["done"]:
current_resume = ep["resumes"][ep["current_index"]]
skill_match = ep["ground_truth"].get(current_resume.candidate_id, 0.0) / 5.0
try:
from ml_engine import ml_engine
ml_prob = ml_engine.predict_fit_probability(current_resume)
except:
ml_prob = 0.5
from resume_analyzer import resume_analyzer
analysis = resume_analyzer.analyze(current_resume, ep["job_description"])
return Observation(
current_resume=current_resume,
job_description=ep["job_description"],
skill_match_score=skill_match,
bias_risk_score=0.0,
ml_fit_prob=ml_prob,
detailed_analysis=analysis if not ep["done"] else None,
shortlist_so_far=[c.candidate_id for c in ep["shortlist_so_far"]],
remaining_candidates=len(ep["resumes"]) - ep["current_index"],
step_count=ep["step_count"],
bias_metrics=ep.get("bias_metrics_history", None)
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)