Spaces:
Runtime error
Runtime error
File size: 2,395 Bytes
021b7e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from enum import Enum
import logging
from datetime import datetime
import asyncio
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class TaskResult:
def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None):
self.status = status
self.data = data or {}
self.error = error
self.timestamp = datetime.now()
class BaseTask(ABC):
def __init__(self, task_id: str, name: str, dependencies: List[str] = None):
self.task_id = task_id
self.name = name
self.dependencies = dependencies or []
self.status = TaskStatus.PENDING
self.result: Optional[TaskResult] = None
self.created_at = datetime.now()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
@abstractmethod
async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
"""Execute the task and return result"""
pass
def can_execute(self, completed_tasks: List[str]) -> bool:
"""Check if all dependencies are satisfied"""
return all(dep in completed_tasks for dep in self.dependencies)
async def run(self, context: Dict[str, Any] = None) -> TaskResult:
"""Wrapper method that handles status tracking"""
try:
self.status = TaskStatus.RUNNING
self.started_at = datetime.now()
logger.info(f"Starting task: {self.name} ({self.task_id})")
self.result = await self.execute(context or {})
self.status = self.result.status
self.completed_at = datetime.now()
duration = (self.completed_at - self.started_at).total_seconds()
logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s")
return self.result
except Exception as e:
self.status = TaskStatus.FAILED
self.completed_at = datetime.now()
self.result = TaskResult(TaskStatus.FAILED, error=str(e))
logger.error(f"Task {self.name} failed: {str(e)}")
return self.result |