Spaces:
Runtime error
Runtime error
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, List, Optional | |
| from enum import Enum | |
| import logging | |
| from datetime import datetime | |
| import asyncio | |
| logger = logging.getLogger(__name__) | |
| class TaskStatus(Enum): | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| SKIPPED = "skipped" | |
| class TaskResult: | |
| def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None): | |
| self.status = status | |
| self.data = data or {} | |
| self.error = error | |
| self.timestamp = datetime.now() | |
| class BaseTask(ABC): | |
| def __init__(self, task_id: str, name: str, dependencies: List[str] = None): | |
| self.task_id = task_id | |
| self.name = name | |
| self.dependencies = dependencies or [] | |
| self.status = TaskStatus.PENDING | |
| self.result: Optional[TaskResult] = None | |
| self.created_at = datetime.now() | |
| self.started_at: Optional[datetime] = None | |
| self.completed_at: Optional[datetime] = None | |
| async def execute(self, context: Dict[str, Any] = None) -> TaskResult: | |
| """Execute the task and return result""" | |
| pass | |
| def can_execute(self, completed_tasks: List[str]) -> bool: | |
| """Check if all dependencies are satisfied""" | |
| return all(dep in completed_tasks for dep in self.dependencies) | |
| async def run(self, context: Dict[str, Any] = None) -> TaskResult: | |
| """Wrapper method that handles status tracking""" | |
| try: | |
| self.status = TaskStatus.RUNNING | |
| self.started_at = datetime.now() | |
| logger.info(f"Starting task: {self.name} ({self.task_id})") | |
| self.result = await self.execute(context or {}) | |
| self.status = self.result.status | |
| self.completed_at = datetime.now() | |
| duration = (self.completed_at - self.started_at).total_seconds() | |
| logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s") | |
| return self.result | |
| except Exception as e: | |
| self.status = TaskStatus.FAILED | |
| self.completed_at = datetime.now() | |
| self.result = TaskResult(TaskStatus.FAILED, error=str(e)) | |
| logger.error(f"Task {self.name} failed: {str(e)}") | |
| return self.result |