Spaces:
Sleeping
Sleeping
File size: 14,839 Bytes
629d435 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
import os
import instructor
import pymupdf4llm
from diskcache import Cache
from resumer.schemas.sections_schemas import ResumeSchema
from resumer.schemas.job_details_schema import JobDetails
from resumer.prompts.resume_prompt import RESUME_DETAILS_EXTRACTOR, JOB_DETAILS_EXTRACTOR
from resumer.utils.scraper import scrape_job_details
from resumer.utils.latex_ops import json_to_latex_pdf
from resumer.variables import section_mapping
from typing import Callable, Optional
class ResumeTailorPipeline:
"""
Args:
aclient: used to make llm calls
resume_path: path to the original resume (pdf file location)
output_dir: folder where we can store the final taliored resume
"""
def __init__(
self,
aclient: instructor.AsyncInstructor,
model_name: str,
resume_path: str,
output_dir: str,
log_callback: Optional[Callable[[str], None]] = None,
max_concurrent_sections: int = 3
):
self.aclient = aclient
self.model_name = model_name
self.resume_path = resume_path
self.output_dir = output_dir
self.log_callback = log_callback
self.max_concurrent_sections = max_concurrent_sections
self.resume_md = None
self.resume_info = None
self.resume_details = None
self.tailored_resume_path = None
self.tailored_resume_tex_path = None
# make the output_dir
os.makedirs(self.output_dir, exist_ok=True)
# Initialize Cache in a subfolder of your output_dir
cache_dir = os.path.join(self.output_dir, ".resume_cache")
self.cache = Cache(cache_dir)
def _log(self, message: str):
"""Internal logging method that uses callback if available"""
print(message)
if self.log_callback:
self.log_callback(message)
def _read_resume_pdf(self):
self._log("π Reading resume PDF...")
if not self.resume_md:
self._log("π Converting PDF to markdown...")
self.resume_md = pymupdf4llm.to_markdown(self.resume_path)
self._log(f"β
Successfully converted resume ({len(self.resume_md)} characters)")
return self.resume_md
async def _extract_resume_json(self):
self._log("π Starting resume extraction...")
resume_md = self._read_resume_pdf()
# Use the resume markdown string as the key
cached_json = self.cache.get(resume_md)
if cached_json:
self._log("β‘ Loading resume info from disk cache...")
self.resume_info = ResumeSchema.model_validate(cached_json)
self._log("β
Resume loaded from cache")
return self.resume_info
self._log("π€ Cache miss: Extracting resume via LLM...(this may take a while)")
self.resume_info = await self.aclient.chat.completions.create(
model=self.model_name,
response_model=ResumeSchema,
messages=[
{"role": "system", "content": RESUME_DETAILS_EXTRACTOR},
{"role": "user", "content": resume_md},
],
)
self._log("β
Resume structure extracted successfully")
# Store as a Dictionary/JSON instead of a Class Object
self.cache.set(resume_md, self.resume_info.model_dump())
return self.resume_info
async def _extract_job_json(self, url: str = None, job_site_content: str = None):
"""Scrapes and structures job details."""
if not url and not job_site_content:
raise ValueError("You must provide either a URL or raw job content.")
if not job_site_content:
self._log(f"π Scraping job details from: {url}")
job_site_content = scrape_job_details(url)
self._log(f"β
Job page scraped ({len(job_site_content)} characters)")
self._log("π€ Extracting job info via LLM...")
self.job_info = await self.aclient.chat.completions.create(
model=self.model_name,
response_model=JobDetails,
messages=[
{"role": "system", "content": JOB_DETAILS_EXTRACTOR},
{"role": "user", "content": job_site_content},
],
)
self._log("β
Job structure extracted successfully")
# Logic check for valid content
if getattr(self.job_info, "is_noise_only", False):
self._log("β οΈ Warning: Content identified as noise")
raise ValueError("LLM identified the content as noise (ads/login walls) rather than a job post.")
# Return the 'data' field if it exists, otherwise the whole object
self.job_info = getattr(self.job_info, "data", self.job_info)
return self.job_info
def _get_all_sections(self):
"""Get all resume sections"""
sections = list(self.resume_info.model_dump().keys())
if "custom_sections" in sections:
sections.remove("custom_sections")
custom_sections = []
if getattr(self.resume_info, "custom_sections"):
for section in getattr(self.resume_info, "custom_sections"):
sec_name = section.section_name.plain_text
custom_sections.append(sec_name)
return sections, custom_sections
async def _process_section(self, section_title: str, section_data: str, mapping_key: str):
"""
Helper method to tailor a single section using the LLM.
Args:
section_title: The name of the section (used for XML tags).
section_data: The content of the section.
mapping_key: The key to look up prompt and schema in section_mapping.
"""
self._log(f"π Processing section: {section_title}")
section_system_prompt = section_mapping.get(mapping_key).get("prompt")
section_schema = section_mapping.get(mapping_key).get("schema")
section_user_prompt = f"""
<{section_title.upper()}>
{section_data}
</{section_title.upper()}>
<JOB_DESCRIPTION>
{self.job_info.model_dump_json()}
</JOB_DESCRIPTION>
"""
# make a llm call to get the section
section_info = await self.aclient.chat.completions.create(
model=self.model_name,
response_model=section_schema,
messages=[
{"role": "system", "content": section_system_prompt},
{"role": "user", "content": section_user_prompt},
],
)
section_info = section_info.model_dump()
# first check if this section is relevant
if section_info.get("is_relevant", False):
self._log(f"β
{section_title}: Tailored and included")
return section_info.get("data", None)
else:
self._log(f"βοΈ {section_title}: Not relevant to job, skipping")
return None
# async def resume_builder(self):
# """Build the tailored resume from all sections"""
# self._log("ποΈ Starting resume builder...")
# section_names, custom_section_names = self._get_all_sections()
# # remove keywords from section_names
# if "keywords" in section_names:
# section_names.remove("keywords")
# resume_details = dict()
# # add personal info
# self._log("π€ Adding personal information...")
# resume_details["personal_info"] = getattr(self.resume_info, "personal_info").model_dump()
# if "personal_info" in section_names:
# section_names.remove("personal_info")
# # Process other sections
# self._log(f"π Processing {len(section_names)} standard sections...")
# for section_name in section_names:
# if getattr(self.resume_info, section_name) is None:
# continue
# if section_name == "summary":
# _section_data = self.resume_info.model_dump_json()
# else:
# _section_data = self.resume_info.model_dump_json(include={section_name})
# result = await self._process_section(section_name, _section_data, section_name)
# if result:
# resume_details[section_name] = result
# # Process custom sections
# resume_details["custom_sections"] = {}
# if getattr(self.resume_info, "custom_sections") is not None:
# self._log(f"π Processing {len(custom_section_names)} custom sections...")
# for csection in getattr(self.resume_info, "custom_sections"):
# section_name = csection.section_name.plain_text
# _section_data = str(csection.model_dump()["section_detail"])
# result = await self._process_section(section_name, _section_data, "custom_sections")
# if result:
# resume_details["custom_sections"][section_name] = result
# self.resume_details = resume_details
# self._log("β
Resume building complete")
# return self.resume_details
async def resume_builder(self):
"""Build the tailored resume from all sections with parallel processing"""
import asyncio
self._log("ποΈ Starting resume builder...")
section_names, custom_section_names = self._get_all_sections()
# remove keywords from section_names
if "keywords" in section_names:
section_names.remove("keywords")
resume_details = dict()
# add personal info
self._log("π€ Adding personal information...")
resume_details["personal_info"] = getattr(self.resume_info, "personal_info").model_dump()
if "personal_info" in section_names:
section_names.remove("personal_info")
# Create a semaphore to limit concurrent LLM calls
semaphore = asyncio.Semaphore(self.max_concurrent_sections)
async def process_section_with_semaphore(section_name, section_data, mapping_key):
"""Wrapper to limit concurrent calls"""
async with semaphore:
return await self._process_section(section_name, section_data, mapping_key)
# Process standard sections in parallel
self._log(f"π Processing {len(section_names)} standard sections (max {self.max_concurrent_sections} concurrent)...")
# Create tasks for all sections
tasks = []
for section_name in section_names:
if getattr(self.resume_info, section_name) is None:
continue
if section_name == "summary":
_section_data = self.resume_info.model_dump_json()
else:
_section_data = self.resume_info.model_dump_json(include={section_name})
# Create a task for this section
task = process_section_with_semaphore(section_name, _section_data, section_name)
tasks.append((section_name, task))
# Run all standard section tasks concurrently
if tasks:
results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
for (section_name, _), result in zip(tasks, results):
if isinstance(result, Exception):
self._log(f"β Error processing {section_name}: {str(result)}")
elif result:
resume_details[section_name] = result
# Process custom sections in parallel
resume_details["custom_sections"] = {}
if getattr(self.resume_info, "custom_sections") is not None:
self._log(f"π Processing {len(custom_section_names)} custom sections (max {self.max_concurrent_sections} concurrent)...")
custom_tasks = []
for csection in getattr(self.resume_info, "custom_sections"):
section_name = csection.section_name.plain_text
_section_data = str(csection.model_dump()["section_detail"])
# Create a task for this custom section
task = process_section_with_semaphore(section_name, _section_data, "custom_sections")
custom_tasks.append((section_name, task))
# Run all custom section tasks concurrently
if custom_tasks:
custom_results = await asyncio.gather(*[task for _, task in custom_tasks], return_exceptions=True)
for (section_name, _), result in zip(custom_tasks, custom_results):
if isinstance(result, Exception):
self._log(f"β Error processing {section_name}: {str(result)}")
elif result:
resume_details["custom_sections"][section_name] = result
self.resume_details = resume_details
self._log("β
Resume building complete")
return self.resume_details
async def generate_tailored_resume(self, job_url: str = None, job_site_content: str = None):
"""Generate the tailored resume"""
self._log("=" * 50)
self._log("π Starting Resume Tailoring Pipeline")
self._log("=" * 50)
try:
# Step 1: Extract job details
self._log("\nπ STEP 1: Extract Job Details")
await self._extract_job_json(job_url, job_site_content)
# Step 2: Extract resume details
self._log("\nπ STEP 2: Extract Resume Details")
await self._extract_resume_json()
self._log("\nβ
Successfully extracted both Resume and Job data")
# Step 3: Build tailored resume
self._log("\nπ STEP 3: Build Tailored Resume")
await self.resume_builder()
# Step 4: Generate PDF
self._log("\nπ STEP 4: Generate PDF")
self._log("π Converting to LaTeX and generating PDF...")
self.tailored_resume_path, self.tailored_resume_tex_path = json_to_latex_pdf(
self.resume_details,
os.path.join(self.output_dir, "tailored_resume.pdf")
)
self._log(f"β
PDF generated at: {self.tailored_resume_path}")
self._log("\n" + "=" * 50)
self._log("π Resume Tailoring Complete!")
self._log("=" * 50)
return self.tailored_resume_path, self.tailored_resume_tex_path
except Exception as e:
self._log(f"\nβ Error during pipeline execution: {str(e)}")
raise
def close_cache(self):
"""Cleanly close the cache connection."""
self.cache.close()
|