Spaces:
Paused
Paused
| """In-memory async task manager for captcha solving tasks.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from typing import Any, Protocol | |
| log = logging.getLogger(__name__) | |
| class TaskStatus(str, Enum): | |
| PROCESSING = "processing" | |
| READY = "ready" | |
| FAILED = "failed" | |
| class Task: | |
| id: str | |
| type: str | |
| params: dict[str, Any] | |
| status: TaskStatus = TaskStatus.PROCESSING | |
| solution: dict[str, Any] | None = None | |
| error_code: str | None = None | |
| error_description: str | None = None | |
| created_at: datetime = field(default_factory=datetime.utcnow) | |
| class Solver(Protocol): | |
| async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ... | |
| class TaskManager: | |
| TASK_TTL = timedelta(minutes=10) | |
| def __init__(self) -> None: | |
| self._tasks: dict[str, Task] = {} | |
| self._solvers: dict[str, Solver] = {} | |
| def register_solver(self, task_type: str, solver: Solver) -> None: | |
| self._solvers[task_type] = solver | |
| def create_task(self, task_type: str, params: dict[str, Any]) -> str: | |
| self._cleanup_expired() | |
| task_id = str(uuid.uuid4()) | |
| task = Task(id=task_id, type=task_type, params=params) | |
| self._tasks[task_id] = task | |
| asyncio.create_task(self._process_task(task)) | |
| return task_id | |
| def get_task(self, task_id: str) -> Task | None: | |
| return self._tasks.get(task_id) | |
| def supported_types(self) -> list[str]: | |
| return list(self._solvers.keys()) | |
| async def _process_task(self, task: Task) -> None: | |
| solver = self._solvers.get(task.type) | |
| if not solver: | |
| task.status = TaskStatus.FAILED | |
| task.error_code = "ERROR_TASK_NOT_SUPPORTED" | |
| task.error_description = f"Task type '{task.type}' is not supported" | |
| return | |
| try: | |
| solution = await solver.solve(task.params) | |
| task.solution = solution | |
| task.status = TaskStatus.READY | |
| log.info("Task %s completed successfully", task.id) | |
| except Exception as exc: | |
| task.status = TaskStatus.FAILED | |
| task.error_code = "ERROR_CAPTCHA_UNSOLVABLE" | |
| task.error_description = str(exc) | |
| log.error("Task %s failed: %s", task.id, exc) | |
| def _cleanup_expired(self) -> None: | |
| now = datetime.utcnow() | |
| expired = [ | |
| tid | |
| for tid, t in self._tasks.items() | |
| if now - t.created_at > self.TASK_TTL | |
| ] | |
| for tid in expired: | |
| del self._tasks[tid] | |
| task_manager = TaskManager() | |