""" 任務管理系統 - 支持提交-輪詢模式 """ import uuid import asyncio import logging from datetime import datetime, timedelta from typing import Optional, Dict, Any from enum import Enum from dataclasses import dataclass, asdict logger = logging.getLogger(__name__) class TaskStatus(Enum): """任務狀態""" PENDING = "pending" # 等待處理 PROCESSING = "processing" # 處理中 READY = "ready" # 完成 FAILED = "failed" # 失敗 @dataclass class CaptchaTask: """驗證碼任務""" task_id: str url: str sitekey: str status: str = TaskStatus.PENDING.value token: Optional[str] = None error: Optional[str] = None created_at: str = None updated_at: str = None expires_at: str = None timeout: int = 30000 def __post_init__(self): if self.created_at is None: self.created_at = datetime.utcnow().isoformat() if self.updated_at is None: self.updated_at = datetime.utcnow().isoformat() if self.expires_at is None: # 默認 30 分鐘過期 self.expires_at = (datetime.utcnow() + timedelta(minutes=30)).isoformat() def to_dict(self) -> Dict[str, Any]: """轉換為字典""" return asdict(self) def is_expired(self) -> bool: """檢查是否過期""" return datetime.fromisoformat(self.expires_at) < datetime.utcnow() def set_processing(self): """設置為處理中""" self.status = TaskStatus.PROCESSING.value self.updated_at = datetime.utcnow().isoformat() def set_ready(self, token: str): """設置為完成""" self.status = TaskStatus.READY.value self.token = token self.updated_at = datetime.utcnow().isoformat() def set_failed(self, error: str): """設置為失敗""" self.status = TaskStatus.FAILED.value self.error = error self.updated_at = datetime.utcnow().isoformat() class TaskManager: """任務管理器""" def __init__(self, cleanup_interval: int = 300): """ 初始化任務管理器 參數: - cleanup_interval: 清理過期任務的間隔(秒) """ self.tasks: Dict[str, CaptchaTask] = {} self.cleanup_interval = cleanup_interval self._cleanup_task = None async def start(self): """啟動任務管理器""" self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("任務管理器已啟動") async def stop(self): """停止任務管理器""" if self._cleanup_task: self._cleanup_task.cancel() logger.info("任務管理器已停止") async def _cleanup_loop(self): """定期清理過期任務""" while True: try: await asyncio.sleep(self.cleanup_interval) self._cleanup_expired_tasks() except asyncio.CancelledError: break except Exception as e: logger.error(f"清理任務失敗: {str(e)}") def _cleanup_expired_tasks(self): """清理過期任務""" expired_ids = [ task_id for task_id, task in self.tasks.items() if task.is_expired() ] for task_id in expired_ids: del self.tasks[task_id] logger.info(f"已清理過期任務: {task_id}") def create_task(self, url: str, sitekey: str, timeout: int = 30000) -> str: """ 創建新任務 參數: - url: 網頁 URL - sitekey: Turnstile sitekey - timeout: 超時時間(毫秒) 返回: - task_id: 任務 ID """ task_id = str(uuid.uuid4()) task = CaptchaTask( task_id=task_id, url=url, sitekey=sitekey, timeout=timeout ) self.tasks[task_id] = task logger.info(f"已創建任務: {task_id}") return task_id def get_task(self, task_id: str) -> Optional[CaptchaTask]: """ 獲取任務 參數: - task_id: 任務 ID 返回: - 任務對象或 None """ return self.tasks.get(task_id) def update_task_status(self, task_id: str, status: TaskStatus, **kwargs): """ 更新任務狀態 參數: - task_id: 任務 ID - status: 新狀態 - kwargs: 其他更新字段(token, error 等) """ task = self.get_task(task_id) if not task: logger.warning(f"任務不存在: {task_id}") return task.status = status.value task.updated_at = datetime.utcnow().isoformat() if status == TaskStatus.READY and 'token' in kwargs: task.token = kwargs['token'] elif status == TaskStatus.FAILED and 'error' in kwargs: task.error = kwargs['error'] logger.info(f"已更新任務狀態: {task_id} -> {status.value}") def get_pending_tasks(self, limit: int = 10) -> list: """ 獲取待處理任務 參數: - limit: 最多返回任務數 返回: - 待處理任務列表 """ pending = [ task for task in self.tasks.values() if task.status == TaskStatus.PENDING.value and not task.is_expired() ] return pending[:limit] def get_task_stats(self) -> Dict[str, Any]: """ 獲取任務統計信息 返回: - 統計信息字典 """ stats = { "total": len(self.tasks), "pending": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PENDING.value), "processing": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PROCESSING.value), "ready": sum(1 for t in self.tasks.values() if t.status == TaskStatus.READY.value), "failed": sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED.value), } return stats def delete_task(self, task_id: str) -> bool: """ 刪除任務 參數: - task_id: 任務 ID 返回: - 是否成功刪除 """ if task_id in self.tasks: del self.tasks[task_id] logger.info(f"已刪除任務: {task_id}") return True return False # 全局任務管理器實例 task_manager = TaskManager()