gk / app /core /batch_tasks.py
nanoppa's picture
Upload 89 files
a1e98ae verified
"""
Batch task manager for admin batch operations (SSE progress).
"""
from __future__ import annotations
import asyncio
import time
import uuid
from typing import Any, Dict, List, Optional
class BatchTask:
def __init__(self, total: int):
self.id = uuid.uuid4().hex
self.total = int(total)
self.processed = 0
self.ok = 0
self.fail = 0
self.status = "running"
self.warning: Optional[str] = None
self.result: Optional[Dict[str, Any]] = None
self.error: Optional[str] = None
self.created_at = time.time()
self._queues: List[asyncio.Queue] = []
self._final_event: Optional[Dict[str, Any]] = None
self.cancelled = False
def snapshot(self) -> Dict[str, Any]:
return {
"task_id": self.id,
"status": self.status,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"warning": self.warning,
}
def attach(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=200)
self._queues.append(q)
return q
def detach(self, q: asyncio.Queue) -> None:
if q in self._queues:
self._queues.remove(q)
def _publish(self, event: Dict[str, Any]) -> None:
for q in list(self._queues):
try:
q.put_nowait(event)
except Exception:
# Drop if queue is full or closed
pass
def record(
self, ok: bool, *, item: Any = None, detail: Any = None, error: str = ""
) -> None:
self.processed += 1
if ok:
self.ok += 1
else:
self.fail += 1
event: Dict[str, Any] = {
"type": "progress",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
}
if item is not None:
event["item"] = item
if detail is not None:
event["detail"] = detail
if error:
event["error"] = error
self._publish(event)
def finish(self, result: Dict[str, Any], *, warning: Optional[str] = None) -> None:
self.status = "done"
self.result = result
self.warning = warning
event = {
"type": "done",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"warning": self.warning,
"result": result,
}
self._final_event = event
self._publish(event)
def fail_task(self, error: str) -> None:
self.status = "error"
self.error = error
event = {
"type": "error",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
"error": error,
}
self._final_event = event
self._publish(event)
def cancel(self) -> None:
self.cancelled = True
def finish_cancelled(self) -> None:
self.status = "cancelled"
event = {
"type": "cancelled",
"task_id": self.id,
"total": self.total,
"processed": self.processed,
"ok": self.ok,
"fail": self.fail,
}
self._final_event = event
self._publish(event)
def final_event(self) -> Optional[Dict[str, Any]]:
return self._final_event
_TASKS: Dict[str, BatchTask] = {}
def create_task(total: int) -> BatchTask:
task = BatchTask(total)
_TASKS[task.id] = task
return task
def get_task(task_id: str) -> Optional[BatchTask]:
return _TASKS.get(task_id)
def delete_task(task_id: str) -> None:
_TASKS.pop(task_id, None)
async def expire_task(task_id: str, delay: int = 300) -> None:
await asyncio.sleep(delay)
delete_task(task_id)