Momal's picture
Deploy cv-buddy backend
366c43e
from __future__ import annotations
import asyncio
import json
import uuid
from typing import Optional, Dict, Any
from app.workers.celery_app import celery_app
from app.core.redis import get_redis_for_worker
from app.services.progress import ProgressService, ProgressStep
from app.services.resume_parser import ResumeParser
from app.services.job_scraper import JobScraper
from app.services.resume_customizer import ResumeCustomizer
from app.services.layout_scanner import LayoutScanner
from app.models.customization import Intensity
from app.models.analysis import SafetyScan
def run_async(coro):
"""Run async function in sync context."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
@celery_app.task(bind=True, max_retries=3)
def analyze_and_customize(
self,
session_id: str,
job_url: Optional[str],
job_text: Optional[str],
intensity: str,
) -> Dict[str, Any]:
"""Main task chain for resume customization."""
task_id = self.request.id
async def _run():
async with get_redis_for_worker() as redis:
progress = ProgressService(task_id, redis)
try:
# Step 1: Get resume data from session
await progress.update(ProgressStep.PARSING_RESUME, "Loading your resume...")
resume_data = await redis.get(f"session:{session_id}:resume")
if not resume_data:
raise ValueError("Session expired or resume not found")
from app.models.resume import ResumeData
resume = ResumeData(**json.loads(resume_data))
# Scan layout for ATS compatibility issues (optional feature)
safety_scan = SafetyScan()
try:
raw_file = await redis.get(f"session:{session_id}:file")
content_type_bytes = await redis.get(f"session:{session_id}:content_type")
if raw_file and content_type_bytes:
# Decode content_type from bytes to string
content_type = content_type_bytes.decode('utf-8') if isinstance(content_type_bytes, bytes) else content_type_bytes
scanner = LayoutScanner()
safety_scan = scanner.scan(raw_file, content_type)
except Exception as scan_error:
import logging
logging.warning(f"Layout scan failed (non-critical): {scan_error}")
# Step 2: Scrape job posting
await progress.update(ProgressStep.SCRAPING_JOB, "Analyzing job posting...")
scraper = JobScraper()
if job_url:
job = await scraper.scrape(job_url)
elif job_text:
job = await scraper.parse_text(job_text)
else:
raise ValueError("No job URL or text provided")
# Step 3-5: Customize resume (includes scoring)
await progress.update(ProgressStep.CUSTOMIZING, "Customizing your resume...")
customizer = ResumeCustomizer()
result = await customizer.customize(
resume=resume,
job=job,
intensity=Intensity(intensity),
)
# Step 6: Store result
await progress.update(ProgressStep.FINALIZING, "Preparing results...")
result_id = str(uuid.uuid4())
result_data = {
"original": result.original.model_dump(),
"customized": result.customized.model_dump(),
"changes": [c.model_dump() for c in result.changes],
"original_score": result.original_score.model_dump(),
"customized_score": result.customized_score.model_dump(),
"job": job.model_dump(),
# Enhanced analysis fields
"bullet_analysis": [b.model_dump() for b in result.bullet_analysis],
"safety_scan": safety_scan.model_dump(),
"keyword_quality": [k.model_dump() for k in result.keyword_quality],
}
from app.core.config import settings
await redis.set(
f"result:{result_id}",
json.dumps(result_data),
ex=settings.session_ttl_seconds,
)
await progress.update(ProgressStep.COMPLETE, "Done!", result_id=result_id)
return {"result_id": result_id}
except Exception as e:
await progress.error(
code="PROCESSING_ERROR",
message=str(e),
recoverable=True,
)
raise
return run_async(_run())