mycaptcha / src /services /task_manager.py
dragg2's picture
Upload 18 files
3a04f21 verified
"""In-memory async task manager for captcha solving tasks."""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Protocol
log = logging.getLogger(__name__)
class TaskStatus(str, Enum):
PROCESSING = "processing"
READY = "ready"
FAILED = "failed"
@dataclass
class Task:
id: str
type: str
params: dict[str, Any]
status: TaskStatus = TaskStatus.PROCESSING
solution: dict[str, Any] | None = None
error_code: str | None = None
error_description: str | None = None
created_at: datetime = field(default_factory=datetime.utcnow)
class Solver(Protocol):
async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ...
class TaskManager:
TASK_TTL = timedelta(minutes=10)
def __init__(self) -> None:
self._tasks: dict[str, Task] = {}
self._solvers: dict[str, Solver] = {}
def register_solver(self, task_type: str, solver: Solver) -> None:
self._solvers[task_type] = solver
def create_task(self, task_type: str, params: dict[str, Any]) -> str:
self._cleanup_expired()
task_id = str(uuid.uuid4())
task = Task(id=task_id, type=task_type, params=params)
self._tasks[task_id] = task
asyncio.create_task(self._process_task(task))
return task_id
def get_task(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)
def supported_types(self) -> list[str]:
return list(self._solvers.keys())
async def _process_task(self, task: Task) -> None:
solver = self._solvers.get(task.type)
if not solver:
task.status = TaskStatus.FAILED
task.error_code = "ERROR_TASK_NOT_SUPPORTED"
task.error_description = f"Task type '{task.type}' is not supported"
return
try:
solution = await solver.solve(task.params)
task.solution = solution
task.status = TaskStatus.READY
log.info("Task %s completed successfully", task.id)
except Exception as exc:
task.status = TaskStatus.FAILED
task.error_code = "ERROR_CAPTCHA_UNSOLVABLE"
task.error_description = str(exc)
log.error("Task %s failed: %s", task.id, exc)
def _cleanup_expired(self) -> None:
now = datetime.utcnow()
expired = [
tid
for tid, t in self._tasks.items()
if now - t.created_at > self.TASK_TTL
]
for tid in expired:
del self._tasks[tid]
task_manager = TaskManager()