File size: 2,793 Bytes
3a04f21 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | """In-memory async task manager for captcha solving tasks."""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Protocol
log = logging.getLogger(__name__)
class TaskStatus(str, Enum):
PROCESSING = "processing"
READY = "ready"
FAILED = "failed"
@dataclass
class Task:
id: str
type: str
params: dict[str, Any]
status: TaskStatus = TaskStatus.PROCESSING
solution: dict[str, Any] | None = None
error_code: str | None = None
error_description: str | None = None
created_at: datetime = field(default_factory=datetime.utcnow)
class Solver(Protocol):
async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ...
class TaskManager:
TASK_TTL = timedelta(minutes=10)
def __init__(self) -> None:
self._tasks: dict[str, Task] = {}
self._solvers: dict[str, Solver] = {}
def register_solver(self, task_type: str, solver: Solver) -> None:
self._solvers[task_type] = solver
def create_task(self, task_type: str, params: dict[str, Any]) -> str:
self._cleanup_expired()
task_id = str(uuid.uuid4())
task = Task(id=task_id, type=task_type, params=params)
self._tasks[task_id] = task
asyncio.create_task(self._process_task(task))
return task_id
def get_task(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)
def supported_types(self) -> list[str]:
return list(self._solvers.keys())
async def _process_task(self, task: Task) -> None:
solver = self._solvers.get(task.type)
if not solver:
task.status = TaskStatus.FAILED
task.error_code = "ERROR_TASK_NOT_SUPPORTED"
task.error_description = f"Task type '{task.type}' is not supported"
return
try:
solution = await solver.solve(task.params)
task.solution = solution
task.status = TaskStatus.READY
log.info("Task %s completed successfully", task.id)
except Exception as exc:
task.status = TaskStatus.FAILED
task.error_code = "ERROR_CAPTCHA_UNSOLVABLE"
task.error_description = str(exc)
log.error("Task %s failed: %s", task.id, exc)
def _cleanup_expired(self) -> None:
now = datetime.utcnow()
expired = [
tid
for tid, t in self._tasks.items()
if now - t.created_at > self.TASK_TTL
]
for tid in expired:
del self._tasks[tid]
task_manager = TaskManager()
|