File size: 5,276 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from __future__ import annotations

import time
from enum import Enum
from typing import Any, Dict, Optional

from pydantic import BaseModel, Field, model_validator


class TaskStatus(str, Enum):
    queued = "queued"
    running = "running"
    retrying = "retrying"
    fallback_running = "fallback_running"
    waiting_rpa = "waiting_rpa"
    rpa_running = "rpa_running"
    rpa_imported = "rpa_imported"
    rpa_failed = "rpa_failed"
    risk_paused = "risk_paused"
    succeeded = "succeeded"
    failed = "failed"


IN_PROGRESS_STATUSES = frozenset(
    (
        TaskStatus.running,
        TaskStatus.retrying,
        TaskStatus.fallback_running,
        TaskStatus.waiting_rpa,
        TaskStatus.rpa_running,
        TaskStatus.risk_paused,
    )
)

TERMINAL_STATUSES = frozenset(
    (
        TaskStatus.succeeded,
        TaskStatus.failed,
        TaskStatus.rpa_imported,
        TaskStatus.rpa_failed,
    )
)


def canonical_task_status_value(value: Any) -> str | None:
    if value is None:
        return None
    if hasattr(value, "value"):
        value = value.value
    raw = str(value).strip().lower()
    if raw == "":
        return None
    aliases = {
        "pending": TaskStatus.queued.value,
        "waiting": TaskStatus.queued.value,
        "in_progress": TaskStatus.running.value,
        "processing": TaskStatus.running.value,
        "done": TaskStatus.succeeded.value,
        "success": TaskStatus.succeeded.value,
        "error": TaskStatus.failed.value,
        "failed": TaskStatus.failed.value,
        "running": TaskStatus.running.value,
        "queued": TaskStatus.queued.value,
    }
    if raw in aliases:
        raw = aliases[raw]
    valid = {s.value for s in TaskStatus}
    if raw in valid:
        return raw
    return None



class CallbackStatus(str, Enum):
    pending = "pending"
    succeeded = "succeeded"
    failed = "failed"


class CallbackState(BaseModel):
    status: CallbackStatus = CallbackStatus.pending
    callback_url: Optional[str] = None
    idempotency_key: Optional[str] = None
    attempts: int = 0
    last_attempt_at: Optional[float] = None
    last_http_status: Optional[int] = None
    last_error: Optional[str] = None
    next_retry_at: Optional[float] = None


class TaskRecord(BaseModel):
    id: str
    status: TaskStatus
    task_type: str
    target: str = ""
    payload: Dict[str, Any] = Field(default_factory=dict)
    engine: Optional[str] = None
    callback: Optional[CallbackState] = None
    created: float = Field(default_factory=lambda: time.time())
    started: float | None = None
    finished: float | None = None
    retry_count: int = 0
    error: Dict[str, Any] | None = None

    @model_validator(mode="before")
    @classmethod
    def _coerce_legacy_fields(cls, data: Any) -> Any:
        if not isinstance(data, dict):
            return data

        status_raw = data.get("status")
        status_val = canonical_task_status_value(status_raw)
        if status_val is None:
            finished_any = data.get("finished") or data.get("finished_at")
            started_any = data.get("started") or data.get("started_at")
            if finished_any or data.get("error") not in (None, "", {}):
                data["status"] = TaskStatus.failed.value
            elif started_any:
                data["status"] = TaskStatus.running.value
            else:
                data["status"] = TaskStatus.queued.value
        else:
            data["status"] = status_val

        if "created" not in data and "created_at" in data:
            data["created"] = data.get("created_at")
        if "started" not in data and "started_at" in data:
            data["started"] = data.get("started_at")
        if "finished" not in data and "finished_at" in data:
            data["finished"] = data.get("finished_at")

        retry_val = data.get("retry_count")
        if retry_val is None:
            retry_val = data.get("retries")
        if retry_val is not None and "retry_count" not in data:
            data["retry_count"] = retry_val

        err = data.get("error")
        if isinstance(err, str) and err.strip() != "":
            data["error"] = {"message": err}

        payload = data.get("payload")
        if payload is None:
            data["payload"] = {}
        elif not isinstance(payload, dict):
            data["payload"] = {}

        if not data.get("target"):
            task_type = str(data.get("task_type") or "").strip().lower()
            payload_obj = data.get("payload") if isinstance(data.get("payload"), dict) else {}
            inferred = ""
            if task_type == "note_url":
                inferred = str(payload_obj.get("note_url") or payload_obj.get("url") or "")
            elif task_type == "search":
                inferred = str(payload_obj.get("query") or payload_obj.get("keyword") or "")
            elif task_type == "user_profile":
                inferred = str(
                    payload_obj.get("user_id")
                    or payload_obj.get("uid")
                    or payload_obj.get("user_url")
                    or payload_obj.get("url")
                    or ""
                )
            data["target"] = inferred

        return data


def now_ts() -> float:
    return time.time()