File size: 2,420 Bytes
366c43e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import annotations
import json
from enum import Enum
from typing import Dict, Any, Optional
from redis.asyncio import Redis
from app.core.redis import get_redis


class ProgressStep(str, Enum):
    PARSING_RESUME = "parsing_resume"
    SCRAPING_JOB = "scraping_job"
    CALCULATING_ORIGINAL_SCORE = "calculating_original_score"
    CUSTOMIZING = "customizing"
    CALCULATING_NEW_SCORE = "calculating_new_score"
    FINALIZING = "finalizing"
    COMPLETE = "complete"
    ERROR = "error"


STEP_PROGRESS: Dict[ProgressStep, int] = {
    ProgressStep.PARSING_RESUME: 15,
    ProgressStep.SCRAPING_JOB: 30,
    ProgressStep.CALCULATING_ORIGINAL_SCORE: 40,
    ProgressStep.CUSTOMIZING: 80,
    ProgressStep.CALCULATING_NEW_SCORE: 90,
    ProgressStep.FINALIZING: 95,
    ProgressStep.COMPLETE: 100,
}


class ProgressService:
    def __init__(self, task_id: str, redis_client: Optional[Redis] = None):
        self.task_id = task_id
        self.channel = f"progress:{task_id}"
        self._redis = redis_client

    async def _get_redis(self) -> Redis:
        if self._redis:
            return self._redis
        return await get_redis()

    async def update(self, step: ProgressStep, message: str = "", result_id: str = ""):
        redis = await self._get_redis()
        percent = STEP_PROGRESS.get(step, 0)

        data: Dict[str, Any] = {
            "step": step.value,
            "percent": percent,
            "message": message,
        }

        if result_id:
            data["result_id"] = result_id

        # Publish to channel for SSE subscribers
        await redis.publish(self.channel, json.dumps(data))

        # Store current state for late subscribers
        await redis.set(
            f"progress_state:{self.task_id}",
            json.dumps(data),
            ex=3600,  # 1 hour TTL
        )

    async def error(self, code: str, message: str, recoverable: bool = True):
        redis = await self._get_redis()

        data: Dict[str, Any] = {
            "step": ProgressStep.ERROR.value,
            "percent": 0,
            "error": {
                "code": code,
                "message": message,
                "recoverable": recoverable,
            },
        }

        await redis.publish(self.channel, json.dumps(data))
        await redis.set(
            f"progress_state:{self.task_id}",
            json.dumps(data),
            ex=3600,
        )