| from __future__ import annotations |
|
|
| import asyncio |
| from dataclasses import dataclass |
| from datetime import datetime, timezone |
| from typing import Any |
|
|
| from .models import RunnerTaskResult, TaskStatus |
|
|
|
|
| def utc_now_iso() -> str: |
| return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") |
|
|
|
|
| @dataclass |
| class TaskState: |
| task_id: str |
| site: str |
| question_id: str |
| question_text: str |
| profile_id: str |
| created_at: str |
| updated_at: str |
| status: TaskStatus |
| result: RunnerTaskResult | None = None |
| resume_token: str | None = None |
| paused: bool = False |
|
|
|
|
| class InMemoryTaskStore: |
| def __init__(self) -> None: |
| self._lock = asyncio.Lock() |
| self._tasks: dict[str, TaskState] = {} |
|
|
| async def create(self, task: TaskState) -> None: |
| async with self._lock: |
| self._tasks[task.task_id] = task |
|
|
| async def get(self, task_id: str) -> TaskState | None: |
| async with self._lock: |
| return self._tasks.get(task_id) |
|
|
| async def update(self, task_id: str, **kwargs: Any) -> TaskState | None: |
| async with self._lock: |
| t = self._tasks.get(task_id) |
| if not t: |
| return None |
| for k, v in kwargs.items(): |
| setattr(t, k, v) |
| t.updated_at = utc_now_iso() |
| return t |
|
|
|
|