lea-GEO / runner_service /state.py
hsmm's picture
Initial commit for HF Space
35bdde1
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from .models import RunnerTaskResult, TaskStatus
def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
@dataclass
class TaskState:
task_id: str
site: str
question_id: str
question_text: str
profile_id: str
created_at: str
updated_at: str
status: TaskStatus
result: RunnerTaskResult | None = None
resume_token: str | None = None
paused: bool = False
class InMemoryTaskStore:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._tasks: dict[str, TaskState] = {}
async def create(self, task: TaskState) -> None:
async with self._lock:
self._tasks[task.task_id] = task
async def get(self, task_id: str) -> TaskState | None:
async with self._lock:
return self._tasks.get(task_id)
async def update(self, task_id: str, **kwargs: Any) -> TaskState | None:
async with self._lock:
t = self._tasks.get(task_id)
if not t:
return None
for k, v in kwargs.items():
setattr(t, k, v)
t.updated_at = utc_now_iso()
return t