Spaces:
Sleeping
Sleeping
| """Job status value object.""" | |
| from enum import Enum | |
| class JobStatus(str, Enum): | |
| """Enumeration of possible job statuses.""" | |
| PENDING = "pending" | |
| PROCESSING = "processing" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| def is_terminal(self) -> bool: | |
| """Check if status is terminal (no further changes expected).""" | |
| return self in [self.COMPLETED, self.FAILED, self.CANCELLED] | |
| def is_active(self) -> bool: | |
| """Check if job is actively being processed.""" | |
| return self == self.PROCESSING | |
| def can_transition_to(self, new_status: 'JobStatus') -> bool: | |
| """Check if transition to new status is valid.""" | |
| if self.is_terminal(): | |
| return False | |
| valid_transitions = { | |
| self.PENDING: [self.PROCESSING, self.CANCELLED], | |
| self.PROCESSING: [self.COMPLETED, self.FAILED, self.CANCELLED] | |
| } | |
| return new_status in valid_transitions.get(self, []) |