from __future__ import annotations import asyncio import json import uuid from typing import Optional, Dict, Any from app.workers.celery_app import celery_app from app.core.redis import get_redis_for_worker from app.services.progress import ProgressService, ProgressStep from app.services.resume_parser import ResumeParser from app.services.job_scraper import JobScraper from app.services.resume_customizer import ResumeCustomizer from app.services.layout_scanner import LayoutScanner from app.models.customization import Intensity from app.models.analysis import SafetyScan def run_async(coro): """Run async function in sync context.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() @celery_app.task(bind=True, max_retries=3) def analyze_and_customize( self, session_id: str, job_url: Optional[str], job_text: Optional[str], intensity: str, ) -> Dict[str, Any]: """Main task chain for resume customization.""" task_id = self.request.id async def _run(): async with get_redis_for_worker() as redis: progress = ProgressService(task_id, redis) try: # Step 1: Get resume data from session await progress.update(ProgressStep.PARSING_RESUME, "Loading your resume...") resume_data = await redis.get(f"session:{session_id}:resume") if not resume_data: raise ValueError("Session expired or resume not found") from app.models.resume import ResumeData resume = ResumeData(**json.loads(resume_data)) # Scan layout for ATS compatibility issues (optional feature) safety_scan = SafetyScan() try: raw_file = await redis.get(f"session:{session_id}:file") content_type_bytes = await redis.get(f"session:{session_id}:content_type") if raw_file and content_type_bytes: # Decode content_type from bytes to string content_type = content_type_bytes.decode('utf-8') if isinstance(content_type_bytes, bytes) else content_type_bytes scanner = LayoutScanner() safety_scan = scanner.scan(raw_file, content_type) except Exception as scan_error: import logging logging.warning(f"Layout scan failed (non-critical): {scan_error}") # Step 2: Scrape job posting await progress.update(ProgressStep.SCRAPING_JOB, "Analyzing job posting...") scraper = JobScraper() if job_url: job = await scraper.scrape(job_url) elif job_text: job = await scraper.parse_text(job_text) else: raise ValueError("No job URL or text provided") # Step 3-5: Customize resume (includes scoring) await progress.update(ProgressStep.CUSTOMIZING, "Customizing your resume...") customizer = ResumeCustomizer() result = await customizer.customize( resume=resume, job=job, intensity=Intensity(intensity), ) # Step 6: Store result await progress.update(ProgressStep.FINALIZING, "Preparing results...") result_id = str(uuid.uuid4()) result_data = { "original": result.original.model_dump(), "customized": result.customized.model_dump(), "changes": [c.model_dump() for c in result.changes], "original_score": result.original_score.model_dump(), "customized_score": result.customized_score.model_dump(), "job": job.model_dump(), # Enhanced analysis fields "bullet_analysis": [b.model_dump() for b in result.bullet_analysis], "safety_scan": safety_scan.model_dump(), "keyword_quality": [k.model_dump() for k in result.keyword_quality], } from app.core.config import settings await redis.set( f"result:{result_id}", json.dumps(result_data), ex=settings.session_ttl_seconds, ) await progress.update(ProgressStep.COMPLETE, "Done!", result_id=result_id) return {"result_id": result_id} except Exception as e: await progress.error( code="PROCESSING_ERROR", message=str(e), recoverable=True, ) raise return run_async(_run())