| """ |
| 任務管理系統 - 支持提交-輪詢模式 |
| """ |
|
|
| import uuid |
| import asyncio |
| import logging |
| from datetime import datetime, timedelta |
| from typing import Optional, Dict, Any |
| from enum import Enum |
| from dataclasses import dataclass, asdict |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TaskStatus(Enum): |
| """任務狀態""" |
| PENDING = "pending" |
| PROCESSING = "processing" |
| READY = "ready" |
| FAILED = "failed" |
|
|
|
|
| @dataclass |
| class CaptchaTask: |
| """驗證碼任務""" |
| task_id: str |
| url: str |
| sitekey: str |
| status: str = TaskStatus.PENDING.value |
| token: Optional[str] = None |
| error: Optional[str] = None |
| created_at: str = None |
| updated_at: str = None |
| expires_at: str = None |
| timeout: int = 30000 |
| |
| def __post_init__(self): |
| if self.created_at is None: |
| self.created_at = datetime.utcnow().isoformat() |
| if self.updated_at is None: |
| self.updated_at = datetime.utcnow().isoformat() |
| if self.expires_at is None: |
| |
| self.expires_at = (datetime.utcnow() + timedelta(minutes=30)).isoformat() |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """轉換為字典""" |
| return asdict(self) |
| |
| def is_expired(self) -> bool: |
| """檢查是否過期""" |
| return datetime.fromisoformat(self.expires_at) < datetime.utcnow() |
| |
| def set_processing(self): |
| """設置為處理中""" |
| self.status = TaskStatus.PROCESSING.value |
| self.updated_at = datetime.utcnow().isoformat() |
| |
| def set_ready(self, token: str): |
| """設置為完成""" |
| self.status = TaskStatus.READY.value |
| self.token = token |
| self.updated_at = datetime.utcnow().isoformat() |
| |
| def set_failed(self, error: str): |
| """設置為失敗""" |
| self.status = TaskStatus.FAILED.value |
| self.error = error |
| self.updated_at = datetime.utcnow().isoformat() |
|
|
|
|
| class TaskManager: |
| """任務管理器""" |
| |
| def __init__(self, cleanup_interval: int = 300): |
| """ |
| 初始化任務管理器 |
| |
| 參數: |
| - cleanup_interval: 清理過期任務的間隔(秒) |
| """ |
| self.tasks: Dict[str, CaptchaTask] = {} |
| self.cleanup_interval = cleanup_interval |
| self._cleanup_task = None |
| |
| async def start(self): |
| """啟動任務管理器""" |
| self._cleanup_task = asyncio.create_task(self._cleanup_loop()) |
| logger.info("任務管理器已啟動") |
| |
| async def stop(self): |
| """停止任務管理器""" |
| if self._cleanup_task: |
| self._cleanup_task.cancel() |
| logger.info("任務管理器已停止") |
| |
| async def _cleanup_loop(self): |
| """定期清理過期任務""" |
| while True: |
| try: |
| await asyncio.sleep(self.cleanup_interval) |
| self._cleanup_expired_tasks() |
| except asyncio.CancelledError: |
| break |
| except Exception as e: |
| logger.error(f"清理任務失敗: {str(e)}") |
| |
| def _cleanup_expired_tasks(self): |
| """清理過期任務""" |
| expired_ids = [ |
| task_id for task_id, task in self.tasks.items() |
| if task.is_expired() |
| ] |
| |
| for task_id in expired_ids: |
| del self.tasks[task_id] |
| logger.info(f"已清理過期任務: {task_id}") |
| |
| def create_task(self, url: str, sitekey: str, timeout: int = 30000) -> str: |
| """ |
| 創建新任務 |
| |
| 參數: |
| - url: 網頁 URL |
| - sitekey: Turnstile sitekey |
| - timeout: 超時時間(毫秒) |
| |
| 返回: |
| - task_id: 任務 ID |
| """ |
| task_id = str(uuid.uuid4()) |
| task = CaptchaTask( |
| task_id=task_id, |
| url=url, |
| sitekey=sitekey, |
| timeout=timeout |
| ) |
| self.tasks[task_id] = task |
| logger.info(f"已創建任務: {task_id}") |
| return task_id |
| |
| def get_task(self, task_id: str) -> Optional[CaptchaTask]: |
| """ |
| 獲取任務 |
| |
| 參數: |
| - task_id: 任務 ID |
| |
| 返回: |
| - 任務對象或 None |
| """ |
| return self.tasks.get(task_id) |
| |
| def update_task_status(self, task_id: str, status: TaskStatus, **kwargs): |
| """ |
| 更新任務狀態 |
| |
| 參數: |
| - task_id: 任務 ID |
| - status: 新狀態 |
| - kwargs: 其他更新字段(token, error 等) |
| """ |
| task = self.get_task(task_id) |
| if not task: |
| logger.warning(f"任務不存在: {task_id}") |
| return |
| |
| task.status = status.value |
| task.updated_at = datetime.utcnow().isoformat() |
| |
| if status == TaskStatus.READY and 'token' in kwargs: |
| task.token = kwargs['token'] |
| elif status == TaskStatus.FAILED and 'error' in kwargs: |
| task.error = kwargs['error'] |
| |
| logger.info(f"已更新任務狀態: {task_id} -> {status.value}") |
| |
| def get_pending_tasks(self, limit: int = 10) -> list: |
| """ |
| 獲取待處理任務 |
| |
| 參數: |
| - limit: 最多返回任務數 |
| |
| 返回: |
| - 待處理任務列表 |
| """ |
| pending = [ |
| task for task in self.tasks.values() |
| if task.status == TaskStatus.PENDING.value and not task.is_expired() |
| ] |
| return pending[:limit] |
| |
| def get_task_stats(self) -> Dict[str, Any]: |
| """ |
| 獲取任務統計信息 |
| |
| 返回: |
| - 統計信息字典 |
| """ |
| stats = { |
| "total": len(self.tasks), |
| "pending": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PENDING.value), |
| "processing": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PROCESSING.value), |
| "ready": sum(1 for t in self.tasks.values() if t.status == TaskStatus.READY.value), |
| "failed": sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED.value), |
| } |
| return stats |
| |
| def delete_task(self, task_id: str) -> bool: |
| """ |
| 刪除任務 |
| |
| 參數: |
| - task_id: 任務 ID |
| |
| 返回: |
| - 是否成功刪除 |
| """ |
| if task_id in self.tasks: |
| del self.tasks[task_id] |
| logger.info(f"已刪除任務: {task_id}") |
| return True |
| return False |
|
|
|
|
| |
| task_manager = TaskManager() |
|
|