Spaces:
Sleeping
Sleeping
File size: 4,746 Bytes
bdc2878 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """Async batch task model + in-memory store for SSE progress streaming."""
import asyncio
import time
import uuid
from typing import Any, Dict, List, Optional
class AsyncTask:
"""Tracks progress of an async batch operation with fan-out SSE support."""
__slots__ = (
"id", "total", "processed", "ok", "fail", "status",
"warning", "result", "error", "created_at",
"cancelled", "_queues", "_final_event",
)
def __init__(self, total: int) -> None:
self.id = uuid.uuid4().hex
self.total = int(total)
self.processed = 0
self.ok = 0
self.fail = 0
self.status = "running"
self.warning: Optional[str] = None
self.result: Optional[Dict[str, Any]] = None
self.error: Optional[str] = None
self.created_at = time.time()
self.cancelled = False
self._queues: List[asyncio.Queue] = []
self._final_event: Optional[Dict[str, Any]] = None
# -- Fan-out pub/sub ---------------------------------------------------
def _publish(self, event: Dict[str, Any]) -> None:
for q in list(self._queues):
try:
q.put_nowait(event)
except Exception:
pass
def attach(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=200)
self._queues.append(q)
return q
def detach(self, q: asyncio.Queue) -> None:
if q in self._queues:
self._queues.remove(q)
# -- Recording ---------------------------------------------------------
def record(
self,
success: bool,
*,
item: Any = None,
detail: Any = None,
error: str = "",
) -> None:
self.processed += 1
if success:
self.ok += 1
else:
self.fail += 1
event: Dict[str, Any] = {
"type": "progress",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
}
if item is not None:
event["item"] = item
if detail is not None:
event["detail"] = detail
if error:
event["error"] = error
self._publish(event)
def finish(self, result: Dict[str, Any], *, warning: Optional[str] = None) -> None:
self.status = "done"
self.result = result
self.warning = warning
event = {
"type": "done",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"warning": self.warning,
"result": result,
}
self._final_event = event
self._publish(event)
def fail_task(self, msg: str) -> None:
self.status = "error"
self.error = msg
event = {
"type": "error",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"error": msg,
}
self._final_event = event
self._publish(event)
def cancel(self) -> None:
self.cancelled = True
def finish_cancelled(self) -> None:
self.status = "cancelled"
event = {
"type": "cancelled",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
}
self._final_event = event
self._publish(event)
# -- Snapshots ---------------------------------------------------------
def snapshot(self) -> Dict[str, Any]:
return {
"task_id": self.id,
"status": self.status,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"warning": self.warning,
}
def final_event(self) -> Optional[Dict[str, Any]]:
return self._final_event
# ---------------------------------------------------------------------------
# In-memory task store
# ---------------------------------------------------------------------------
_TASKS: Dict[str, AsyncTask] = {}
def create_task(total: int) -> AsyncTask:
task = AsyncTask(total)
_TASKS[task.id] = task
return task
def get_task(task_id: str) -> Optional[AsyncTask]:
return _TASKS.get(task_id)
async def expire_task(task_id: str, ttl_s: int = 300) -> None:
await asyncio.sleep(ttl_s)
_TASKS.pop(task_id, None)
__all__ = ["AsyncTask", "create_task", "get_task", "expire_task"]
|