Orchestrator / core /tasks.py
ZENLLC's picture
Create tasks.py
ee2322f verified
import random
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any
PRIORITIES = ["P0", "P1", "P2", "P3"]
@dataclass
class Task:
task_id: str
title: str
task_type: str
priority: str
created_t: int
sla_due_t: int
est_effort_min: int
value_usd: float
dependencies: List[str] = field(default_factory=list)
status: str = "QUEUED" # QUEUED | IN_PROGRESS | BLOCKED | DONE | REWORK
owner: Optional[str] = None
started_t: Optional[int] = None
completed_t: Optional[int] = None
attempts: int = 0
notes: Dict[str, Any] = field(default_factory=dict)
class TaskSystem:
def __init__(self, seed: int):
self.rng = random.Random(seed)
self.tasks: Dict[str, Task] = {}
self.counter = 0
def _next_id(self) -> str:
self.counter += 1
return f"T{self.counter:05d}"
def create_task(
self,
t_sim: int,
title: str,
task_type: str,
priority: str,
sla_ticks: int,
est_effort_min: int,
value_usd: float,
dependencies: Optional[List[str]] = None,
urgent: bool = False,
) -> Task:
tid = self._next_id()
sla_due = t_sim + max(1, int(sla_ticks))
if urgent:
priority = "P0"
task = Task(
task_id=tid,
title=title,
task_type=task_type,
priority=priority,
created_t=t_sim,
sla_due_t=sla_due,
est_effort_min=est_effort_min,
value_usd=value_usd,
dependencies=dependencies or [],
)
self.tasks[tid] = task
return task
def queued_tasks(self) -> List[Task]:
return [t for t in self.tasks.values() if t.status in ("QUEUED", "REWORK")]
def active_tasks(self) -> List[Task]:
return [t for t in self.tasks.values() if t.status in ("IN_PROGRESS", "BLOCKED")]
def done_tasks(self) -> List[Task]:
return [t for t in self.tasks.values() if t.status == "DONE"]
def overdue_tasks(self, t_sim: int) -> List[Task]:
return [t for t in self.tasks.values() if t.status != "DONE" and t_sim > t.sla_due_t]
def can_start(self, task: Task) -> bool:
# Dependencies must be DONE
for dep in task.dependencies:
if dep in self.tasks and self.tasks[dep].status != "DONE":
return False
return True
def pick_next_task(self, t_sim: int) -> Optional[Task]:
# Priority then due date
q = [t for t in self.queued_tasks() if self.can_start(t)]
if not q:
return None
pr_order = {p: i for i, p in enumerate(PRIORITIES)}
q.sort(key=lambda t: (pr_order.get(t.priority, 9), t.sla_due_t, t.created_t))
return q[0]
def to_compact_table(self, t_sim: int, limit: int = 14) -> str:
# Simple readable view for UI
items = sorted(self.tasks.values(), key=lambda t: (t.status != "DONE", t.sla_due_t))
items = items[:limit]
lines = ["id | pri | status | due(t) | owner | title"]
lines.append("---|-----|--------|--------|-------|------")
for t in items:
due = t.sla_due_t
lines.append(f"{t.task_id} | {t.priority} | {t.status} | {due} | {t.owner or '-'} | {t.title[:42]}")
overdue = len(self.overdue_tasks(t_sim))
lines.append(f"\nOverdue: {overdue} | Total: {len(self.tasks)} | Done: {len(self.done_tasks())}")
return "\n".join(lines)
def as_dict(self) -> Dict[str, Any]:
return {k: asdict(v) for k, v in self.tasks.items()}