Spaces:
Sleeping
Sleeping
| """ | |
| Real-time processing status tracker for MorphGuard. | |
| This module provides a status tracking system for long-running operations, | |
| allowing clients to receive updates on processing progress. | |
| """ | |
| import time | |
| import uuid | |
| import threading | |
| import json | |
| import os | |
| from typing import Dict, List, Any, Optional, Callable | |
| from enum import Enum | |
| import logging | |
| class JobStatus(str, Enum): | |
| """Enum representing the status of a processing job.""" | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class JobType(str, Enum): | |
| """Enum representing types of processing jobs.""" | |
| DETECTION = "detection" | |
| DEMORPHING = "demorphing" | |
| VERIFICATION = "verification" | |
| LIVENESS = "liveness" | |
| BATCH_PROCESS = "batch_process" | |
| IMPORT = "import" | |
| EXPORT = "export" | |
| TRAINING = "training" | |
| OTHER = "other" | |
| class StatusTracker: | |
| """ | |
| Tracks the status of long-running processing jobs. | |
| Provides methods to create, update, and query job status, as well as | |
| register callbacks for status changes. | |
| """ | |
| def __init__(self, storage_dir: Optional[str] = None): | |
| """ | |
| Initialize the status tracker. | |
| Args: | |
| storage_dir: Directory to store job status data (optional) | |
| """ | |
| self.jobs: Dict[str, Dict[str, Any]] = {} | |
| self.callbacks: Dict[str, List[Callable]] = {} | |
| self.lock = threading.RLock() | |
| self.logger = logging.getLogger("StatusTracker") | |
| # Configure storage | |
| self.storage_dir = storage_dir | |
| if storage_dir: | |
| os.makedirs(storage_dir, exist_ok=True) | |
| self._load_jobs() | |
| def _load_jobs(self) -> None: | |
| """Load jobs from storage directory.""" | |
| if not self.storage_dir: | |
| return | |
| try: | |
| for filename in os.listdir(self.storage_dir): | |
| if filename.endswith(".json"): | |
| job_id = filename[:-5] # Remove .json extension | |
| file_path = os.path.join(self.storage_dir, filename) | |
| with open(file_path, "r") as f: | |
| job_data = json.load(f) | |
| self.jobs[job_id] = job_data | |
| except Exception as e: | |
| self.logger.error(f"Failed to load jobs from storage: {e}") | |
| def _save_job(self, job_id: str) -> None: | |
| """ | |
| Save job data to storage. | |
| Args: | |
| job_id: ID of the job to save | |
| """ | |
| if not self.storage_dir: | |
| return | |
| try: | |
| with self.lock: | |
| if job_id in self.jobs: | |
| file_path = os.path.join(self.storage_dir, f"{job_id}.json") | |
| with open(file_path, "w") as f: | |
| json.dump(self.jobs[job_id], f) | |
| except Exception as e: | |
| self.logger.error(f"Failed to save job {job_id}: {e}") | |
| def create_job( | |
| self, | |
| job_type: JobType, | |
| description: str, | |
| user_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Create a new processing job. | |
| Args: | |
| job_type: Type of job | |
| description: Human-readable description of the job | |
| user_id: ID of the user who created the job (optional) | |
| metadata: Additional metadata for the job (optional) | |
| Returns: | |
| Job ID | |
| """ | |
| job_id = str(uuid.uuid4()) | |
| job_data = { | |
| "id": job_id, | |
| "type": job_type, | |
| "description": description, | |
| "status": JobStatus.PENDING, | |
| "progress": 0.0, | |
| "created_at": time.time(), | |
| "started_at": None, | |
| "completed_at": None, | |
| "message": "Job created", | |
| "user_id": user_id, | |
| "metadata": metadata or {}, | |
| "results": None, | |
| "error": None | |
| } | |
| with self.lock: | |
| self.jobs[job_id] = job_data | |
| self._save_job(job_id) | |
| self._notify_status_change(job_id, JobStatus.PENDING, 0.0, "Job created") | |
| return job_id | |
| def start_job(self, job_id: str) -> None: | |
| """ | |
| Mark a job as running. | |
| Args: | |
| job_id: ID of the job to start | |
| """ | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| job = self.jobs[job_id] | |
| if job["status"] != JobStatus.PENDING: | |
| raise ValueError(f"Job {job_id} is not in pending status") | |
| job["status"] = JobStatus.RUNNING | |
| job["started_at"] = time.time() | |
| job["message"] = "Job started" | |
| self._save_job(job_id) | |
| self._notify_status_change(job_id, JobStatus.RUNNING, 0.0, "Job started") | |
| def update_progress( | |
| self, | |
| job_id: str, | |
| progress: float, | |
| message: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Update the progress of a job. | |
| Args: | |
| job_id: ID of the job to update | |
| progress: Progress value between 0.0 and 1.0 | |
| message: Optional status message | |
| """ | |
| if progress < 0.0 or progress > 1.0: | |
| raise ValueError("Progress must be between 0.0 and 1.0") | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| job = self.jobs[job_id] | |
| if job["status"] != JobStatus.RUNNING: | |
| raise ValueError(f"Job {job_id} is not in running status") | |
| job["progress"] = progress | |
| if message: | |
| job["message"] = message | |
| self._save_job(job_id) | |
| self._notify_status_change(job_id, JobStatus.RUNNING, progress, message) | |
| def complete_job( | |
| self, | |
| job_id: str, | |
| results: Optional[Dict[str, Any]] = None, | |
| message: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Mark a job as completed. | |
| Args: | |
| job_id: ID of the job to complete | |
| results: Optional results data | |
| message: Optional completion message | |
| """ | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| job = self.jobs[job_id] | |
| if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]: | |
| raise ValueError(f"Job {job_id} cannot be completed from {job['status']} status") | |
| job["status"] = JobStatus.COMPLETED | |
| job["progress"] = 1.0 | |
| job["completed_at"] = time.time() | |
| job["results"] = results | |
| if message: | |
| job["message"] = message | |
| else: | |
| job["message"] = "Job completed successfully" | |
| self._save_job(job_id) | |
| self._notify_status_change( | |
| job_id, JobStatus.COMPLETED, 1.0, message or "Job completed successfully", results | |
| ) | |
| def fail_job( | |
| self, | |
| job_id: str, | |
| error: str, | |
| error_details: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Mark a job as failed. | |
| Args: | |
| job_id: ID of the job that failed | |
| error: Error message | |
| error_details: Optional detailed error information | |
| """ | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| job = self.jobs[job_id] | |
| if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]: | |
| raise ValueError(f"Job {job_id} cannot be failed from {job['status']} status") | |
| job["status"] = JobStatus.FAILED | |
| job["completed_at"] = time.time() | |
| job["message"] = f"Job failed: {error}" | |
| job["error"] = { | |
| "message": error, | |
| "details": error_details or {} | |
| } | |
| self._save_job(job_id) | |
| self._notify_status_change( | |
| job_id, | |
| JobStatus.FAILED, | |
| job["progress"], | |
| f"Job failed: {error}", | |
| None, | |
| {"message": error, "details": error_details or {}} | |
| ) | |
| def cancel_job(self, job_id: str, reason: Optional[str] = None) -> None: | |
| """ | |
| Cancel a job. | |
| Args: | |
| job_id: ID of the job to cancel | |
| reason: Optional reason for cancellation | |
| """ | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| job = self.jobs[job_id] | |
| if job["status"] not in [JobStatus.PENDING, JobStatus.RUNNING]: | |
| raise ValueError(f"Job {job_id} cannot be cancelled from {job['status']} status") | |
| job["status"] = JobStatus.CANCELLED | |
| job["completed_at"] = time.time() | |
| if reason: | |
| job["message"] = f"Job cancelled: {reason}" | |
| else: | |
| job["message"] = "Job cancelled by user" | |
| self._save_job(job_id) | |
| self._notify_status_change( | |
| job_id, | |
| JobStatus.CANCELLED, | |
| job["progress"], | |
| reason if reason else "Job cancelled by user" | |
| ) | |
| def get_job_status(self, job_id: str) -> Dict[str, Any]: | |
| """ | |
| Get the current status of a job. | |
| Args: | |
| job_id: ID of the job | |
| Returns: | |
| Job status data | |
| Raises: | |
| ValueError: If job is not found | |
| """ | |
| with self.lock: | |
| if job_id not in self.jobs: | |
| raise ValueError(f"Job {job_id} not found") | |
| return self.jobs[job_id].copy() | |
| def get_jobs_by_status(self, status: JobStatus) -> List[Dict[str, Any]]: | |
| """ | |
| Get all jobs with a specific status. | |
| Args: | |
| status: Status to filter by | |
| Returns: | |
| List of job data | |
| """ | |
| with self.lock: | |
| return [job.copy() for job in self.jobs.values() if job["status"] == status] | |
| def get_jobs_by_user(self, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get all jobs for a specific user. | |
| Args: | |
| user_id: User ID to filter by | |
| Returns: | |
| List of job data | |
| """ | |
| with self.lock: | |
| return [job.copy() for job in self.jobs.values() if job["user_id"] == user_id] | |
| def get_all_jobs(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get all jobs. | |
| Returns: | |
| List of all job data | |
| """ | |
| with self.lock: | |
| return [job.copy() for job in self.jobs.values()] | |
| def register_callback(self, job_id: str, callback: Callable) -> None: | |
| """ | |
| Register a callback for job status changes. | |
| Args: | |
| job_id: ID of the job to watch | |
| callback: Callback function to call on status change | |
| """ | |
| with self.lock: | |
| if job_id not in self.callbacks: | |
| self.callbacks[job_id] = [] | |
| self.callbacks[job_id].append(callback) | |
| def unregister_callback(self, job_id: str, callback: Callable) -> None: | |
| """ | |
| Remove a callback for job status changes. | |
| Args: | |
| job_id: ID of the job | |
| callback: Callback function to remove | |
| """ | |
| with self.lock: | |
| if job_id in self.callbacks: | |
| if callback in self.callbacks[job_id]: | |
| self.callbacks[job_id].remove(callback) | |
| if not self.callbacks[job_id]: | |
| del self.callbacks[job_id] | |
| def _notify_status_change( | |
| self, | |
| job_id: str, | |
| status: JobStatus, | |
| progress: float, | |
| message: Optional[str] = None, | |
| results: Optional[Dict[str, Any]] = None, | |
| error: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Notify all registered callbacks about a job status change. | |
| Args: | |
| job_id: ID of the job | |
| status: New status | |
| progress: Current progress | |
| message: Status message | |
| results: Job results (for completed jobs) | |
| error: Error information (for failed jobs) | |
| """ | |
| notification = { | |
| "job_id": job_id, | |
| "status": status, | |
| "progress": progress, | |
| "message": message, | |
| "timestamp": time.time() | |
| } | |
| if results is not None: | |
| notification["results"] = results | |
| if error is not None: | |
| notification["error"] = error | |
| with self.lock: | |
| callbacks = self.callbacks.get(job_id, [])[:] # Make a copy | |
| for callback in callbacks: | |
| try: | |
| callback(notification) | |
| except Exception as e: | |
| self.logger.error(f"Error in status callback for job {job_id}: {e}") | |
| def cleanup_old_jobs(self, max_age_days: int = 7) -> int: | |
| """ | |
| Remove old completed, failed, or cancelled jobs. | |
| Args: | |
| max_age_days: Maximum age of jobs to keep in days | |
| Returns: | |
| Number of jobs removed | |
| """ | |
| max_age_seconds = max_age_days * 24 * 60 * 60 | |
| current_time = time.time() | |
| removed_count = 0 | |
| with self.lock: | |
| job_ids_to_remove = [] | |
| for job_id, job in self.jobs.items(): | |
| if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: | |
| if current_time - job["completed_at"] > max_age_seconds: | |
| job_ids_to_remove.append(job_id) | |
| for job_id in job_ids_to_remove: | |
| del self.jobs[job_id] | |
| removed_count += 1 | |
| # Remove storage file if it exists | |
| if self.storage_dir: | |
| file_path = os.path.join(self.storage_dir, f"{job_id}.json") | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| self.logger.error(f"Failed to remove job file {file_path}: {e}") | |
| return removed_count | |
| # Create a singleton instance | |
| _instance = None | |
| def get_tracker(storage_dir: Optional[str] = None) -> StatusTracker: | |
| """ | |
| Get the global status tracker instance. | |
| Args: | |
| storage_dir: Optional storage directory (only used if instance doesn't exist yet) | |
| Returns: | |
| StatusTracker instance | |
| """ | |
| global _instance | |
| if _instance is None: | |
| _instance = StatusTracker(storage_dir) | |
| return _instance |