MorphGuard / src /status_tracker.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
15.6 kB
"""
Real-time processing status tracker for MorphGuard.
This module provides a status tracking system for long-running operations,
allowing clients to receive updates on processing progress.
"""
import time
import uuid
import threading
import json
import os
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
import logging
class JobStatus(str, Enum):
"""Enum representing the status of a processing job."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class JobType(str, Enum):
"""Enum representing types of processing jobs."""
DETECTION = "detection"
DEMORPHING = "demorphing"
VERIFICATION = "verification"
LIVENESS = "liveness"
BATCH_PROCESS = "batch_process"
IMPORT = "import"
EXPORT = "export"
TRAINING = "training"
OTHER = "other"
class StatusTracker:
"""
Tracks the status of long-running processing jobs.
Provides methods to create, update, and query job status, as well as
register callbacks for status changes.
"""
def __init__(self, storage_dir: Optional[str] = None):
"""
Initialize the status tracker.
Args:
storage_dir: Directory to store job status data (optional)
"""
self.jobs: Dict[str, Dict[str, Any]] = {}
self.callbacks: Dict[str, List[Callable]] = {}
self.lock = threading.RLock()
self.logger = logging.getLogger("StatusTracker")
# Configure storage
self.storage_dir = storage_dir
if storage_dir:
os.makedirs(storage_dir, exist_ok=True)
self._load_jobs()
def _load_jobs(self) -> None:
"""Load jobs from storage directory."""
if not self.storage_dir:
return
try:
for filename in os.listdir(self.storage_dir):
if filename.endswith(".json"):
job_id = filename[:-5] # Remove .json extension
file_path = os.path.join(self.storage_dir, filename)
with open(file_path, "r") as f:
job_data = json.load(f)
self.jobs[job_id] = job_data
except Exception as e:
self.logger.error(f"Failed to load jobs from storage: {e}")
def _save_job(self, job_id: str) -> None:
"""
Save job data to storage.
Args:
job_id: ID of the job to save
"""
if not self.storage_dir:
return
try:
with self.lock:
if job_id in self.jobs:
file_path = os.path.join(self.storage_dir, f"{job_id}.json")
with open(file_path, "w") as f:
json.dump(self.jobs[job_id], f)
except Exception as e:
self.logger.error(f"Failed to save job {job_id}: {e}")
def create_job(
self,
job_type: JobType,
description: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Create a new processing job.
Args:
job_type: Type of job
description: Human-readable description of the job
user_id: ID of the user who created the job (optional)
metadata: Additional metadata for the job (optional)
Returns:
Job ID
"""
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"type": job_type,
"description": description,
"status": JobStatus.PENDING,
"progress": 0.0,
"created_at": time.time(),
"started_at": None,
"completed_at": None,
"message": "Job created",
"user_id": user_id,
"metadata": metadata or {},
"results": None,
"error": None
}
with self.lock:
self.jobs[job_id] = job_data
self._save_job(job_id)
self._notify_status_change(job_id, JobStatus.PENDING, 0.0, "Job created")
return job_id
def start_job(self, job_id: str) -> None:
"""
Mark a job as running.
Args:
job_id: ID of the job to start
"""
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
job = self.jobs[job_id]
if job["status"] != JobStatus.PENDING:
raise ValueError(f"Job {job_id} is not in pending status")
job["status"] = JobStatus.RUNNING
job["started_at"] = time.time()
job["message"] = "Job started"
self._save_job(job_id)
self._notify_status_change(job_id, JobStatus.RUNNING, 0.0, "Job started")
def update_progress(
self,
job_id: str,
progress: float,
message: Optional[str] = None
) -> None:
"""
Update the progress of a job.
Args:
job_id: ID of the job to update
progress: Progress value between 0.0 and 1.0
message: Optional status message
"""
if progress < 0.0 or progress > 1.0:
raise ValueError("Progress must be between 0.0 and 1.0")
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
job = self.jobs[job_id]
if job["status"] != JobStatus.RUNNING:
raise ValueError(f"Job {job_id} is not in running status")
job["progress"] = progress
if message:
job["message"] = message
self._save_job(job_id)
self._notify_status_change(job_id, JobStatus.RUNNING, progress, message)
def complete_job(
self,
job_id: str,
results: Optional[Dict[str, Any]] = None,
message: Optional[str] = None
) -> None:
"""
Mark a job as completed.
Args:
job_id: ID of the job to complete
results: Optional results data
message: Optional completion message
"""
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
job = self.jobs[job_id]
if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]:
raise ValueError(f"Job {job_id} cannot be completed from {job['status']} status")
job["status"] = JobStatus.COMPLETED
job["progress"] = 1.0
job["completed_at"] = time.time()
job["results"] = results
if message:
job["message"] = message
else:
job["message"] = "Job completed successfully"
self._save_job(job_id)
self._notify_status_change(
job_id, JobStatus.COMPLETED, 1.0, message or "Job completed successfully", results
)
def fail_job(
self,
job_id: str,
error: str,
error_details: Optional[Dict[str, Any]] = None
) -> None:
"""
Mark a job as failed.
Args:
job_id: ID of the job that failed
error: Error message
error_details: Optional detailed error information
"""
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
job = self.jobs[job_id]
if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]:
raise ValueError(f"Job {job_id} cannot be failed from {job['status']} status")
job["status"] = JobStatus.FAILED
job["completed_at"] = time.time()
job["message"] = f"Job failed: {error}"
job["error"] = {
"message": error,
"details": error_details or {}
}
self._save_job(job_id)
self._notify_status_change(
job_id,
JobStatus.FAILED,
job["progress"],
f"Job failed: {error}",
None,
{"message": error, "details": error_details or {}}
)
def cancel_job(self, job_id: str, reason: Optional[str] = None) -> None:
"""
Cancel a job.
Args:
job_id: ID of the job to cancel
reason: Optional reason for cancellation
"""
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
job = self.jobs[job_id]
if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]:
raise ValueError(f"Job {job_id} cannot be cancelled from {job['status']} status")
job["status"] = JobStatus.CANCELLED
job["completed_at"] = time.time()
if reason:
job["message"] = f"Job cancelled: {reason}"
else:
job["message"] = "Job cancelled by user"
self._save_job(job_id)
self._notify_status_change(
job_id,
JobStatus.CANCELLED,
job["progress"],
reason if reason else "Job cancelled by user"
)
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""
Get the current status of a job.
Args:
job_id: ID of the job
Returns:
Job status data
Raises:
ValueError: If job is not found
"""
with self.lock:
if job_id not in self.jobs:
raise ValueError(f"Job {job_id} not found")
return self.jobs[job_id].copy()
def get_jobs_by_status(self, status: JobStatus) -> List[Dict[str, Any]]:
"""
Get all jobs with a specific status.
Args:
status: Status to filter by
Returns:
List of job data
"""
with self.lock:
return [job.copy() for job in self.jobs.values() if job["status"] == status]
def get_jobs_by_user(self, user_id: str) -> List[Dict[str, Any]]:
"""
Get all jobs for a specific user.
Args:
user_id: User ID to filter by
Returns:
List of job data
"""
with self.lock:
return [job.copy() for job in self.jobs.values() if job["user_id"] == user_id]
def get_all_jobs(self) -> List[Dict[str, Any]]:
"""
Get all jobs.
Returns:
List of all job data
"""
with self.lock:
return [job.copy() for job in self.jobs.values()]
def register_callback(self, job_id: str, callback: Callable) -> None:
"""
Register a callback for job status changes.
Args:
job_id: ID of the job to watch
callback: Callback function to call on status change
"""
with self.lock:
if job_id not in self.callbacks:
self.callbacks[job_id] = []
self.callbacks[job_id].append(callback)
def unregister_callback(self, job_id: str, callback: Callable) -> None:
"""
Remove a callback for job status changes.
Args:
job_id: ID of the job
callback: Callback function to remove
"""
with self.lock:
if job_id in self.callbacks:
if callback in self.callbacks[job_id]:
self.callbacks[job_id].remove(callback)
if not self.callbacks[job_id]:
del self.callbacks[job_id]
def _notify_status_change(
self,
job_id: str,
status: JobStatus,
progress: float,
message: Optional[str] = None,
results: Optional[Dict[str, Any]] = None,
error: Optional[Dict[str, Any]] = None
) -> None:
"""
Notify all registered callbacks about a job status change.
Args:
job_id: ID of the job
status: New status
progress: Current progress
message: Status message
results: Job results (for completed jobs)
error: Error information (for failed jobs)
"""
notification = {
"job_id": job_id,
"status": status,
"progress": progress,
"message": message,
"timestamp": time.time()
}
if results is not None:
notification["results"] = results
if error is not None:
notification["error"] = error
with self.lock:
callbacks = self.callbacks.get(job_id, [])[:] # Make a copy
for callback in callbacks:
try:
callback(notification)
except Exception as e:
self.logger.error(f"Error in status callback for job {job_id}: {e}")
def cleanup_old_jobs(self, max_age_days: int = 7) -> int:
"""
Remove old completed, failed, or cancelled jobs.
Args:
max_age_days: Maximum age of jobs to keep in days
Returns:
Number of jobs removed
"""
max_age_seconds = max_age_days * 24 * 60 * 60
current_time = time.time()
removed_count = 0
with self.lock:
job_ids_to_remove = []
for job_id, job in self.jobs.items():
if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
if current_time - job["completed_at"] > max_age_seconds:
job_ids_to_remove.append(job_id)
for job_id in job_ids_to_remove:
del self.jobs[job_id]
removed_count += 1
# Remove storage file if it exists
if self.storage_dir:
file_path = os.path.join(self.storage_dir, f"{job_id}.json")
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
self.logger.error(f"Failed to remove job file {file_path}: {e}")
return removed_count
# Create a singleton instance
_instance = None
def get_tracker(storage_dir: Optional[str] = None) -> StatusTracker:
"""
Get the global status tracker instance.
Args:
storage_dir: Optional storage directory (only used if instance doesn't exist yet)
Returns:
StatusTracker instance
"""
global _instance
if _instance is None:
_instance = StatusTracker(storage_dir)
return _instance