cv-buddy-backend / app /services /progress.py
Momal's picture
Deploy cv-buddy backend
366c43e
from __future__ import annotations
import json
from enum import Enum
from typing import Dict, Any, Optional
from redis.asyncio import Redis
from app.core.redis import get_redis
class ProgressStep(str, Enum):
PARSING_RESUME = "parsing_resume"
SCRAPING_JOB = "scraping_job"
CALCULATING_ORIGINAL_SCORE = "calculating_original_score"
CUSTOMIZING = "customizing"
CALCULATING_NEW_SCORE = "calculating_new_score"
FINALIZING = "finalizing"
COMPLETE = "complete"
ERROR = "error"
STEP_PROGRESS: Dict[ProgressStep, int] = {
ProgressStep.PARSING_RESUME: 15,
ProgressStep.SCRAPING_JOB: 30,
ProgressStep.CALCULATING_ORIGINAL_SCORE: 40,
ProgressStep.CUSTOMIZING: 80,
ProgressStep.CALCULATING_NEW_SCORE: 90,
ProgressStep.FINALIZING: 95,
ProgressStep.COMPLETE: 100,
}
class ProgressService:
def __init__(self, task_id: str, redis_client: Optional[Redis] = None):
self.task_id = task_id
self.channel = f"progress:{task_id}"
self._redis = redis_client
async def _get_redis(self) -> Redis:
if self._redis:
return self._redis
return await get_redis()
async def update(self, step: ProgressStep, message: str = "", result_id: str = ""):
redis = await self._get_redis()
percent = STEP_PROGRESS.get(step, 0)
data: Dict[str, Any] = {
"step": step.value,
"percent": percent,
"message": message,
}
if result_id:
data["result_id"] = result_id
# Publish to channel for SSE subscribers
await redis.publish(self.channel, json.dumps(data))
# Store current state for late subscribers
await redis.set(
f"progress_state:{self.task_id}",
json.dumps(data),
ex=3600, # 1 hour TTL
)
async def error(self, code: str, message: str, recoverable: bool = True):
redis = await self._get_redis()
data: Dict[str, Any] = {
"step": ProgressStep.ERROR.value,
"percent": 0,
"error": {
"code": code,
"message": message,
"recoverable": recoverable,
},
}
await redis.publish(self.channel, json.dumps(data))
await redis.set(
f"progress_state:{self.task_id}",
json.dumps(data),
ex=3600,
)