Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| from enum import Enum | |
| from typing import Dict, Any, Optional | |
| from redis.asyncio import Redis | |
| from app.core.redis import get_redis | |
| class ProgressStep(str, Enum): | |
| PARSING_RESUME = "parsing_resume" | |
| SCRAPING_JOB = "scraping_job" | |
| CALCULATING_ORIGINAL_SCORE = "calculating_original_score" | |
| CUSTOMIZING = "customizing" | |
| CALCULATING_NEW_SCORE = "calculating_new_score" | |
| FINALIZING = "finalizing" | |
| COMPLETE = "complete" | |
| ERROR = "error" | |
| STEP_PROGRESS: Dict[ProgressStep, int] = { | |
| ProgressStep.PARSING_RESUME: 15, | |
| ProgressStep.SCRAPING_JOB: 30, | |
| ProgressStep.CALCULATING_ORIGINAL_SCORE: 40, | |
| ProgressStep.CUSTOMIZING: 80, | |
| ProgressStep.CALCULATING_NEW_SCORE: 90, | |
| ProgressStep.FINALIZING: 95, | |
| ProgressStep.COMPLETE: 100, | |
| } | |
| class ProgressService: | |
| def __init__(self, task_id: str, redis_client: Optional[Redis] = None): | |
| self.task_id = task_id | |
| self.channel = f"progress:{task_id}" | |
| self._redis = redis_client | |
| async def _get_redis(self) -> Redis: | |
| if self._redis: | |
| return self._redis | |
| return await get_redis() | |
| async def update(self, step: ProgressStep, message: str = "", result_id: str = ""): | |
| redis = await self._get_redis() | |
| percent = STEP_PROGRESS.get(step, 0) | |
| data: Dict[str, Any] = { | |
| "step": step.value, | |
| "percent": percent, | |
| "message": message, | |
| } | |
| if result_id: | |
| data["result_id"] = result_id | |
| # Publish to channel for SSE subscribers | |
| await redis.publish(self.channel, json.dumps(data)) | |
| # Store current state for late subscribers | |
| await redis.set( | |
| f"progress_state:{self.task_id}", | |
| json.dumps(data), | |
| ex=3600, # 1 hour TTL | |
| ) | |
| async def error(self, code: str, message: str, recoverable: bool = True): | |
| redis = await self._get_redis() | |
| data: Dict[str, Any] = { | |
| "step": ProgressStep.ERROR.value, | |
| "percent": 0, | |
| "error": { | |
| "code": code, | |
| "message": message, | |
| "recoverable": recoverable, | |
| }, | |
| } | |
| await redis.publish(self.channel, json.dumps(data)) | |
| await redis.set( | |
| f"progress_state:{self.task_id}", | |
| json.dumps(data), | |
| ex=3600, | |
| ) | |