| """In-memory async task manager for captcha solving tasks.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import uuid |
| from dataclasses import dataclass, field |
| from datetime import datetime, timedelta |
| from enum import Enum |
| from typing import Any, Protocol |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| class TaskStatus(str, Enum): |
| PROCESSING = "processing" |
| READY = "ready" |
| FAILED = "failed" |
|
|
|
|
| @dataclass |
| class Task: |
| id: str |
| type: str |
| params: dict[str, Any] |
| status: TaskStatus = TaskStatus.PROCESSING |
| solution: dict[str, Any] | None = None |
| error_code: str | None = None |
| error_description: str | None = None |
| created_at: datetime = field(default_factory=datetime.utcnow) |
|
|
|
|
| class Solver(Protocol): |
| async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ... |
|
|
|
|
| class TaskManager: |
| TASK_TTL = timedelta(minutes=10) |
|
|
| def __init__(self) -> None: |
| self._tasks: dict[str, Task] = {} |
| self._solvers: dict[str, Solver] = {} |
|
|
| def register_solver(self, task_type: str, solver: Solver) -> None: |
| self._solvers[task_type] = solver |
|
|
| def create_task(self, task_type: str, params: dict[str, Any]) -> str: |
| self._cleanup_expired() |
| task_id = str(uuid.uuid4()) |
| task = Task(id=task_id, type=task_type, params=params) |
| self._tasks[task_id] = task |
| asyncio.create_task(self._process_task(task)) |
| return task_id |
|
|
| def get_task(self, task_id: str) -> Task | None: |
| return self._tasks.get(task_id) |
|
|
| def supported_types(self) -> list[str]: |
| return list(self._solvers.keys()) |
|
|
| async def _process_task(self, task: Task) -> None: |
| solver = self._solvers.get(task.type) |
| if not solver: |
| task.status = TaskStatus.FAILED |
| task.error_code = "ERROR_TASK_NOT_SUPPORTED" |
| task.error_description = f"Task type '{task.type}' is not supported" |
| return |
|
|
| try: |
| solution = await solver.solve(task.params) |
| task.solution = solution |
| task.status = TaskStatus.READY |
| log.info("Task %s completed successfully", task.id) |
| except Exception as exc: |
| task.status = TaskStatus.FAILED |
| task.error_code = "ERROR_CAPTCHA_UNSOLVABLE" |
| task.error_description = str(exc) |
| log.error("Task %s failed: %s", task.id, exc) |
|
|
| def _cleanup_expired(self) -> None: |
| now = datetime.utcnow() |
| expired = [ |
| tid |
| for tid, t in self._tasks.items() |
| if now - t.created_at > self.TASK_TTL |
| ] |
| for tid in expired: |
| del self._tasks[tid] |
|
|
|
|
| task_manager = TaskManager() |
|
|