TEST-FRANKO / scheduler /tasks /base_task.py
Franko Fišter
Added scheduling to data collection script
021b7e0
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from enum import Enum
import logging
from datetime import datetime
import asyncio
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class TaskResult:
def __init__(self, status: TaskStatus, data: Dict[Any, Any] = None, error: str = None):
self.status = status
self.data = data or {}
self.error = error
self.timestamp = datetime.now()
class BaseTask(ABC):
def __init__(self, task_id: str, name: str, dependencies: List[str] = None):
self.task_id = task_id
self.name = name
self.dependencies = dependencies or []
self.status = TaskStatus.PENDING
self.result: Optional[TaskResult] = None
self.created_at = datetime.now()
self.started_at: Optional[datetime] = None
self.completed_at: Optional[datetime] = None
@abstractmethod
async def execute(self, context: Dict[str, Any] = None) -> TaskResult:
"""Execute the task and return result"""
pass
def can_execute(self, completed_tasks: List[str]) -> bool:
"""Check if all dependencies are satisfied"""
return all(dep in completed_tasks for dep in self.dependencies)
async def run(self, context: Dict[str, Any] = None) -> TaskResult:
"""Wrapper method that handles status tracking"""
try:
self.status = TaskStatus.RUNNING
self.started_at = datetime.now()
logger.info(f"Starting task: {self.name} ({self.task_id})")
self.result = await self.execute(context or {})
self.status = self.result.status
self.completed_at = datetime.now()
duration = (self.completed_at - self.started_at).total_seconds()
logger.info(f"Task {self.name} completed with status {self.status.value} in {duration:.2f}s")
return self.result
except Exception as e:
self.status = TaskStatus.FAILED
self.completed_at = datetime.now()
self.result = TaskResult(TaskStatus.FAILED, error=str(e))
logger.error(f"Task {self.name} failed: {str(e)}")
return self.result