File size: 2,395 Bytes
021b7e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from enum import Enum
import logging
from datetime import datetime
import asyncio

logger = logging.getLogger(__name__)

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

class TaskResult:
    def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None):
        self.status = status
        self.data = data or {}
        self.error = error
        self.timestamp = datetime.now()

class BaseTask(ABC):
    def __init__(self, task_id: str, name: str, dependencies: List[str] = None):
        self.task_id = task_id
        self.name = name
        self.dependencies = dependencies or []
        self.status = TaskStatus.PENDING
        self.result: Optional[TaskResult] = None
        self.created_at = datetime.now()
        self.started_at: Optional[datetime] = None
        self.completed_at: Optional[datetime] = None
    
    @abstractmethod
    async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
        """Execute the task and return result"""
        pass
    
    def can_execute(self, completed_tasks: List[str]) -> bool:
        """Check if all dependencies are satisfied"""
        return all(dep in completed_tasks for dep in self.dependencies)
    
    async def run(self, context: Dict[str, Any] = None) -> TaskResult:
        """Wrapper method that handles status tracking"""
        try:
            self.status = TaskStatus.RUNNING
            self.started_at = datetime.now()
            logger.info(f"Starting task: {self.name} ({self.task_id})")
            
            self.result = await self.execute(context or {})
            self.status = self.result.status
            self.completed_at = datetime.now()
            
            duration = (self.completed_at - self.started_at).total_seconds()
            logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s")
            
            return self.result
            
        except Exception as e:
            self.status = TaskStatus.FAILED
            self.completed_at = datetime.now()
            self.result = TaskResult(TaskStatus.FAILED, error=str(e))
            logger.error(f"Task {self.name} failed: {str(e)}")
            return self.result