from __future__ import annotations import json from enum import Enum from typing import Dict, Any, Optional from redis.asyncio import Redis from app.core.redis import get_redis class ProgressStep(str, Enum): PARSING_RESUME = "parsing_resume" SCRAPING_JOB = "scraping_job" CALCULATING_ORIGINAL_SCORE = "calculating_original_score" CUSTOMIZING = "customizing" CALCULATING_NEW_SCORE = "calculating_new_score" FINALIZING = "finalizing" COMPLETE = "complete" ERROR = "error" STEP_PROGRESS: Dict[ProgressStep, int] = { ProgressStep.PARSING_RESUME: 15, ProgressStep.SCRAPING_JOB: 30, ProgressStep.CALCULATING_ORIGINAL_SCORE: 40, ProgressStep.CUSTOMIZING: 80, ProgressStep.CALCULATING_NEW_SCORE: 90, ProgressStep.FINALIZING: 95, ProgressStep.COMPLETE: 100, } class ProgressService: def __init__(self, task_id: str, redis_client: Optional[Redis] = None): self.task_id = task_id self.channel = f"progress:{task_id}" self._redis = redis_client async def _get_redis(self) -> Redis: if self._redis: return self._redis return await get_redis() async def update(self, step: ProgressStep, message: str = "", result_id: str = ""): redis = await self._get_redis() percent = STEP_PROGRESS.get(step, 0) data: Dict[str, Any] = { "step": step.value, "percent": percent, "message": message, } if result_id: data["result_id"] = result_id # Publish to channel for SSE subscribers await redis.publish(self.channel, json.dumps(data)) # Store current state for late subscribers await redis.set( f"progress_state:{self.task_id}", json.dumps(data), ex=3600, # 1 hour TTL ) async def error(self, code: str, message: str, recoverable: bool = True): redis = await self._get_redis() data: Dict[str, Any] = { "step": ProgressStep.ERROR.value, "percent": 0, "error": { "code": code, "message": message, "recoverable": recoverable, }, } await redis.publish(self.channel, json.dumps(data)) await redis.set( f"progress_state:{self.task_id}", json.dumps(data), ex=3600, )