from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional from enum import Enum import logging from datetime import datetime import asyncio logger = logging.getLogger(__name__) class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" class TaskResult: def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None): self.status = status self.data = data or {} self.error = error self.timestamp = datetime.now() class BaseTask(ABC): def __init__(self, task_id: str, name: str, dependencies: List[str] = None): self.task_id = task_id self.name = name self.dependencies = dependencies or [] self.status = TaskStatus.PENDING self.result: Optional[TaskResult] = None self.created_at = datetime.now() self.started_at: Optional[datetime] = None self.completed_at: Optional[datetime] = None @abstractmethod async def execute(self, context: Dict[str, Any] = None) -> TaskResult: """Execute the task and return result""" pass def can_execute(self, completed_tasks: List[str]) -> bool: """Check if all dependencies are satisfied""" return all(dep in completed_tasks for dep in self.dependencies) async def run(self, context: Dict[str, Any] = None) -> TaskResult: """Wrapper method that handles status tracking""" try: self.status = TaskStatus.RUNNING self.started_at = datetime.now() logger.info(f"Starting task: {self.name} ({self.task_id})") self.result = await self.execute(context or {}) self.status = self.result.status self.completed_at = datetime.now() duration = (self.completed_at - self.started_at).total_seconds() logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s") return self.result except Exception as e: self.status = TaskStatus.FAILED self.completed_at = datetime.now() self.result = TaskResult(TaskStatus.FAILED, error=str(e)) logger.error(f"Task {self.name} failed: {str(e)}") return self.result