| """In-memory async task manager for captcha solving tasks."""
|
|
|
| from __future__ import annotations
|
|
|
| import asyncio
|
| import logging
|
| import uuid
|
| from dataclasses import dataclass, field
|
| from datetime import datetime, timedelta
|
| from enum import Enum
|
| from typing import Any, Protocol
|
|
|
| log = logging.getLogger(__name__)
|
|
|
|
|
| class TaskStatus(str, Enum):
|
| PROCESSING = "processing"
|
| READY = "ready"
|
| FAILED = "failed"
|
|
|
|
|
| @dataclass
|
| class Task:
|
| id: str
|
| type: str
|
| params: dict[str, Any]
|
| status: TaskStatus = TaskStatus.PROCESSING
|
| solution: dict[str, Any] | None = None
|
| error_code: str | None = None
|
| error_description: str | None = None
|
| created_at: datetime = field(default_factory=datetime.utcnow)
|
|
|
|
|
| class Solver(Protocol):
|
| async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ...
|
|
|
|
|
| class TaskManager:
|
| TASK_TTL = timedelta(minutes=10)
|
|
|
| def __init__(self) -> None:
|
| self._tasks: dict[str, Task] = {}
|
| self._solvers: dict[str, Solver] = {}
|
|
|
| def register_solver(self, task_type: str, solver: Solver) -> None:
|
| self._solvers[task_type] = solver
|
|
|
| def create_task(self, task_type: str, params: dict[str, Any]) -> str:
|
| self._cleanup_expired()
|
| task_id = str(uuid.uuid4())
|
| task = Task(id=task_id, type=task_type, params=params)
|
| self._tasks[task_id] = task
|
| asyncio.create_task(self._process_task(task))
|
| return task_id
|
|
|
| def get_task(self, task_id: str) -> Task | None:
|
| return self._tasks.get(task_id)
|
|
|
| def supported_types(self) -> list[str]:
|
| return list(self._solvers.keys())
|
|
|
| async def _process_task(self, task: Task) -> None:
|
| solver = self._solvers.get(task.type)
|
| if not solver:
|
| task.status = TaskStatus.FAILED
|
| task.error_code = "ERROR_TASK_NOT_SUPPORTED"
|
| task.error_description = f"Task type '{task.type}' is not supported"
|
| return
|
|
|
| try:
|
| solution = await solver.solve(task.params)
|
| task.solution = solution
|
| task.status = TaskStatus.READY
|
| log.info("Task %s completed successfully", task.id)
|
| except Exception as exc:
|
| task.status = TaskStatus.FAILED
|
| task.error_code = "ERROR_CAPTCHA_UNSOLVABLE"
|
| task.error_description = str(exc)
|
| log.error("Task %s failed: %s", task.id, exc)
|
|
|
| def _cleanup_expired(self) -> None:
|
| now = datetime.utcnow()
|
| expired = [
|
| tid
|
| for tid, t in self._tasks.items()
|
| if now - t.created_at > self.TASK_TTL
|
| ]
|
| for tid in expired:
|
| del self._tasks[tid]
|
|
|
|
|
| task_manager = TaskManager()
|
|
|