kioess / task_manager.py
kines9661's picture
Upload 20 files
f301753 verified
"""
任務管理系統 - 支持提交-輪詢模式
"""
import uuid
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from enum import Enum
from dataclasses import dataclass, asdict
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""任務狀態"""
PENDING = "pending" # 等待處理
PROCESSING = "processing" # 處理中
READY = "ready" # 完成
FAILED = "failed" # 失敗
@dataclass
class CaptchaTask:
"""驗證碼任務"""
task_id: str
url: str
sitekey: str
status: str = TaskStatus.PENDING.value
token: Optional[str] = None
error: Optional[str] = None
created_at: str = None
updated_at: str = None
expires_at: str = None
timeout: int = 30000
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
if self.updated_at is None:
self.updated_at = datetime.utcnow().isoformat()
if self.expires_at is None:
# 默認 30 分鐘過期
self.expires_at = (datetime.utcnow() + timedelta(minutes=30)).isoformat()
def to_dict(self) -> Dict[str, Any]:
"""轉換為字典"""
return asdict(self)
def is_expired(self) -> bool:
"""檢查是否過期"""
return datetime.fromisoformat(self.expires_at) < datetime.utcnow()
def set_processing(self):
"""設置為處理中"""
self.status = TaskStatus.PROCESSING.value
self.updated_at = datetime.utcnow().isoformat()
def set_ready(self, token: str):
"""設置為完成"""
self.status = TaskStatus.READY.value
self.token = token
self.updated_at = datetime.utcnow().isoformat()
def set_failed(self, error: str):
"""設置為失敗"""
self.status = TaskStatus.FAILED.value
self.error = error
self.updated_at = datetime.utcnow().isoformat()
class TaskManager:
"""任務管理器"""
def __init__(self, cleanup_interval: int = 300):
"""
初始化任務管理器
參數:
- cleanup_interval: 清理過期任務的間隔(秒)
"""
self.tasks: Dict[str, CaptchaTask] = {}
self.cleanup_interval = cleanup_interval
self._cleanup_task = None
async def start(self):
"""啟動任務管理器"""
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("任務管理器已啟動")
async def stop(self):
"""停止任務管理器"""
if self._cleanup_task:
self._cleanup_task.cancel()
logger.info("任務管理器已停止")
async def _cleanup_loop(self):
"""定期清理過期任務"""
while True:
try:
await asyncio.sleep(self.cleanup_interval)
self._cleanup_expired_tasks()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"清理任務失敗: {str(e)}")
def _cleanup_expired_tasks(self):
"""清理過期任務"""
expired_ids = [
task_id for task_id, task in self.tasks.items()
if task.is_expired()
]
for task_id in expired_ids:
del self.tasks[task_id]
logger.info(f"已清理過期任務: {task_id}")
def create_task(self, url: str, sitekey: str, timeout: int = 30000) -> str:
"""
創建新任務
參數:
- url: 網頁 URL
- sitekey: Turnstile sitekey
- timeout: 超時時間(毫秒)
返回:
- task_id: 任務 ID
"""
task_id = str(uuid.uuid4())
task = CaptchaTask(
task_id=task_id,
url=url,
sitekey=sitekey,
timeout=timeout
)
self.tasks[task_id] = task
logger.info(f"已創建任務: {task_id}")
return task_id
def get_task(self, task_id: str) -> Optional[CaptchaTask]:
"""
獲取任務
參數:
- task_id: 任務 ID
返回:
- 任務對象或 None
"""
return self.tasks.get(task_id)
def update_task_status(self, task_id: str, status: TaskStatus, **kwargs):
"""
更新任務狀態
參數:
- task_id: 任務 ID
- status: 新狀態
- kwargs: 其他更新字段(token, error 等)
"""
task = self.get_task(task_id)
if not task:
logger.warning(f"任務不存在: {task_id}")
return
task.status = status.value
task.updated_at = datetime.utcnow().isoformat()
if status == TaskStatus.READY and 'token' in kwargs:
task.token = kwargs['token']
elif status == TaskStatus.FAILED and 'error' in kwargs:
task.error = kwargs['error']
logger.info(f"已更新任務狀態: {task_id} -> {status.value}")
def get_pending_tasks(self, limit: int = 10) -> list:
"""
獲取待處理任務
參數:
- limit: 最多返回任務數
返回:
- 待處理任務列表
"""
pending = [
task for task in self.tasks.values()
if task.status == TaskStatus.PENDING.value and not task.is_expired()
]
return pending[:limit]
def get_task_stats(self) -> Dict[str, Any]:
"""
獲取任務統計信息
返回:
- 統計信息字典
"""
stats = {
"total": len(self.tasks),
"pending": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PENDING.value),
"processing": sum(1 for t in self.tasks.values() if t.status == TaskStatus.PROCESSING.value),
"ready": sum(1 for t in self.tasks.values() if t.status == TaskStatus.READY.value),
"failed": sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED.value),
}
return stats
def delete_task(self, task_id: str) -> bool:
"""
刪除任務
參數:
- task_id: 任務 ID
返回:
- 是否成功刪除
"""
if task_id in self.tasks:
del self.tasks[task_id]
logger.info(f"已刪除任務: {task_id}")
return True
return False
# 全局任務管理器實例
task_manager = TaskManager()