Ai-Resume-Ranking / core /crew_pipeline.py
yhng2525's picture
Upload 15 files
1595f22 verified
import json
import os
from pathlib import Path
from crewai import Agent, Task, Crew, Process
from utils.file_loader import load_text_from_file
from core.crew_tools import (
generate_jd_rubric_tool,
parse_resume_tool,
match_candidate_tool,
build_ranking_tool,
)
# ---------- Helpers ----------
def _ensure_openai_env():
# CrewAI expects OPENAI_API_KEY to exist (you already use this in app.py)
if not os.getenv("OPENAI_API_KEY"):
raise RuntimeError("OPENAI_API_KEY is not set (HF: Settings → Secrets → OPENAI_API_KEY).")
# Optional: pin model for CrewAI if you want consistency
# (CrewAI supports different LLM providers; leaving it unset is fine if defaults work in your env)
os.environ.setdefault("OPENAI_MODEL_NAME", "gpt-4o-mini")
def _json(s: str):
return json.loads(s)
# ---------- Agents ----------
def _jd_agent():
return Agent(
role="JD Analyst",
goal="Convert JD text into a structured rubric JSON.",
backstory="You are an HR analyst who produces consistent rubric structures for automated screening.",
tools=[generate_jd_rubric_tool],
verbose=False,
)
def _resume_agent():
return Agent(
role="Resume Parser",
goal="Extract structured candidate profile JSON from resume text.",
backstory="You are an ATS-style parser; you output consistent JSON that downstream scoring depends on.",
tools=[parse_resume_tool],
verbose=False,
)
def _matcher_agent():
return Agent(
role="JD-Candidate Matcher",
goal="Create a JD-aligned match JSON for each candidate.",
backstory="You score alignment and produce structured evidence for ranking.",
tools=[match_candidate_tool],
verbose=False,
)
def _ranker_agent():
return Agent(
role="Ranker",
goal="Build a final ranking JSON from match files.",
backstory="You turn match outputs into a clean Top-K ranking.",
tools=[build_ranking_tool],
verbose=False,
)
# ---------- Crew wrappers (called by app.py) ----------
def crew_step1_generate_jd(jd_text: str, jd_path: str) -> dict:
_ensure_openai_env()
jd_path = Path(jd_path)
jd_path.parent.mkdir(parents=True, exist_ok=True)
task = Task(
description=(
"Use generate_jd_rubric_tool on the provided JD text and return the JSON rubric.\n"
"Return ONLY the JSON."
),
expected_output="A valid JSON object as a string.",
agent=_jd_agent(),
)
crew = Crew(
agents=[_jd_agent()],
tasks=[task],
process=Process.sequential,
verbose=False,
planning=False,
)
result = crew.kickoff(inputs={"jd_text": jd_text})
rubric = _json(str(result))
with open(jd_path, "w", encoding="utf-8") as f:
json.dump(rubric, f, indent=2, ensure_ascii=False)
return rubric
def crew_step3_parse_resumes(resume_dir: str, cand_dir: str) -> str:
_ensure_openai_env()
resume_dir = Path(resume_dir)
cand_dir = Path(cand_dir)
cand_dir.mkdir(parents=True, exist_ok=True)
resume_files = sorted([p for p in resume_dir.iterdir() if p.suffix.lower() in [".pdf", ".docx"]])
if not resume_files:
return "No resumes found in data/resumes. Upload resumes first."
agent = _resume_agent()
ok, failed = 0, 0
for path in resume_files:
try:
with open(path, "rb") as f:
resume_text = load_text_from_file(f)
task = Task(
description=(
f"Parse this resume into candidate JSON.\n"
f"Filename: {path.name}\n"
"Call parse_resume_tool(resume_text, filename) and return ONLY JSON."
),
expected_output="A valid candidate JSON object as a string.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=False, planning=False)
result = crew.kickoff(inputs={"resume_text": resume_text, "filename": path.name})
candidate = _json(str(result))
out_path = cand_dir / (path.stem + ".json")
with open(out_path, "w", encoding="utf-8") as out:
json.dump(candidate, out, indent=2, ensure_ascii=False)
ok += 1
except Exception:
failed += 1
return f"✅ Parsed {ok} resume(s) into {cand_dir} | ⚠️ Failed: {failed}"
def crew_step4_generate_matches(jd_path: str, cand_dir: str, match_dir: str) -> str:
_ensure_openai_env()
jd_path = Path(jd_path)
cand_dir = Path(cand_dir)
match_dir = Path(match_dir)
match_dir.mkdir(parents=True, exist_ok=True)
if not jd_path.exists():
return "No JD rubric found. Run Step 1 first."
cand_files = sorted([p for p in cand_dir.iterdir() if p.suffix.lower() == ".json"])
if not cand_files:
return "No candidates found. Run Step 3 first."
with open(jd_path, "r", encoding="utf-8") as f:
jd_rubric = json.load(f)
jd_rubric_json = json.dumps(jd_rubric, ensure_ascii=False)
agent = _matcher_agent()
created, failed = 0, 0
for cf in cand_files:
try:
with open(cf, "r", encoding="utf-8") as f:
candidate = json.load(f)
candidate_json = json.dumps(candidate, ensure_ascii=False)
task = Task(
description=(
f"Create a JD-aligned match JSON for candidate file {cf.name}.\n"
"Call match_candidate_tool(jd_rubric_json, candidate_json) and return ONLY JSON."
),
expected_output="A valid match JSON object as a string.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=False, planning=False)
result = crew.kickoff(inputs={"jd_rubric_json": jd_rubric_json, "candidate_json": candidate_json})
match_obj = _json(str(result))
out_path = match_dir / (cf.stem + "_match.json")
with open(out_path, "w", encoding="utf-8") as out:
json.dump(match_obj, out, indent=2, ensure_ascii=False)
created += 1
except Exception:
failed += 1
return f"✅ Created {created} match file(s) in {match_dir} | ⚠️ Failed: {failed}"
def crew_step5_rank(top_k: int, match_dir: str) -> dict:
_ensure_openai_env()
# build_ranking() already reads from data/matches internally in your current design.
# This crew step just triggers it via a tool and then returns JSON.
agent = _ranker_agent()
task = Task(
description=f"Build Top-{int(top_k)} ranking JSON using build_ranking_tool(top_k). Return ONLY JSON.",
expected_output="A valid ranking JSON object as a string.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential, verbose=False, planning=False)
result = crew.kickoff(inputs={"top_k": int(top_k)})
ranking = _json(str(result))
# Save alongside your current behavior
out_path = Path("data/ranking.json")
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(ranking, f, indent=2, ensure_ascii=False)
return ranking