Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| ProcessingJob data model: Batch configuration and execution tracking. | |
| Represents a voice extraction job with configuration and state. | |
| """ | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import List, Literal, Optional | |
| class ExtractionMode(Enum): | |
| """Extraction mode for audio processing.""" | |
| SPEECH = "speech" | |
| NONVERBAL = "nonverbal" | |
| BOTH = "both" | |
| class JobStatus(Enum): | |
| """Processing job status.""" | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class ProcessingJob: | |
| """ | |
| Voice extraction processing job. | |
| Represents a batch processing job with configuration, state tracking, | |
| and results collection. | |
| """ | |
| # Input configuration | |
| reference_file: str | |
| input_files: List[str] | |
| output_dir: str | |
| # Processing options | |
| extraction_mode: ExtractionMode = ExtractionMode.SPEECH | |
| apply_denoising: bool = False | |
| vad_threshold: float = 0.5 | |
| quality_threshold_enabled: bool = True | |
| # Job state | |
| status: JobStatus = JobStatus.PENDING | |
| job_id: Optional[str] = None | |
| created_at: Optional[str] = None | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| # Progress tracking | |
| total_files: int = 0 | |
| files_processed: int = 0 | |
| files_failed: int = 0 | |
| current_file: Optional[str] = None | |
| # Results | |
| output_files: List[str] = field(default_factory=list) | |
| failed_files: List[dict] = field(default_factory=list) # {file, error} | |
| # Statistics | |
| total_input_duration: float = 0.0 | |
| total_extracted_duration: float = 0.0 | |
| total_processing_time: float = 0.0 | |
| def __post_init__(self): | |
| """Initialize job after creation.""" | |
| if self.job_id is None: | |
| # Generate job ID from timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.job_id = f"job_{timestamp}" | |
| if self.created_at is None: | |
| self.created_at = datetime.now().isoformat() | |
| self.total_files = len(self.input_files) | |
| def progress_percentage(self) -> float: | |
| """ | |
| Get job progress as percentage. | |
| Returns: | |
| Progress percentage (0-100) | |
| """ | |
| if self.total_files == 0: | |
| return 0.0 | |
| return (self.files_processed / self.total_files) * 100 | |
| def success_rate(self) -> float: | |
| """ | |
| Get success rate for processed files. | |
| Returns: | |
| Success rate as percentage (0-100) | |
| """ | |
| processed = self.files_processed | |
| if processed == 0: | |
| return 0.0 | |
| succeeded = processed - self.files_failed | |
| return (succeeded / processed) * 100 | |
| def extraction_yield(self) -> float: | |
| """ | |
| Get extraction yield percentage. | |
| Returns: | |
| Yield as percentage of input duration (0-100) | |
| """ | |
| if self.total_input_duration == 0: | |
| return 0.0 | |
| return (self.total_extracted_duration / self.total_input_duration) * 100 | |
| def is_complete(self) -> bool: | |
| """Check if job is complete.""" | |
| return self.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED) | |
| def is_running(self) -> bool: | |
| """Check if job is currently running.""" | |
| return self.status == JobStatus.RUNNING | |
| def start(self): | |
| """Mark job as started.""" | |
| self.status = JobStatus.RUNNING | |
| self.started_at = datetime.now().isoformat() | |
| def complete(self): | |
| """Mark job as completed.""" | |
| self.status = JobStatus.COMPLETED | |
| self.completed_at = datetime.now().isoformat() | |
| # Calculate total processing time | |
| if self.started_at and self.completed_at: | |
| start = datetime.fromisoformat(self.started_at) | |
| end = datetime.fromisoformat(self.completed_at) | |
| self.total_processing_time = (end - start).total_seconds() | |
| def fail(self, error: str): | |
| """Mark job as failed.""" | |
| self.status = JobStatus.FAILED | |
| self.completed_at = datetime.now().isoformat() | |
| # Add general error to failed files | |
| self.failed_files.append( | |
| { | |
| "file": "JOB", | |
| "error": error, | |
| } | |
| ) | |
| def cancel(self): | |
| """Mark job as cancelled.""" | |
| self.status = JobStatus.CANCELLED | |
| self.completed_at = datetime.now().isoformat() | |
| def add_success( | |
| self, input_file: str, output_file: str, input_duration: float, extracted_duration: float | |
| ): | |
| """ | |
| Record successful file processing. | |
| Args: | |
| input_file: Input file path | |
| output_file: Output file path | |
| input_duration: Input file duration in seconds | |
| extracted_duration: Extracted audio duration in seconds | |
| """ | |
| self.files_processed += 1 | |
| self.output_files.append(output_file) | |
| self.total_input_duration += input_duration | |
| self.total_extracted_duration += extracted_duration | |
| def add_failure(self, input_file: str, error: str): | |
| """ | |
| Record failed file processing. | |
| Args: | |
| input_file: Input file path that failed | |
| error: Error message | |
| """ | |
| self.files_processed += 1 | |
| self.files_failed += 1 | |
| self.failed_files.append( | |
| { | |
| "file": input_file, | |
| "error": error, | |
| } | |
| ) | |
| def update_progress(self, current_file: str): | |
| """ | |
| Update current processing file. | |
| Args: | |
| current_file: Currently processing file path | |
| """ | |
| self.current_file = current_file | |
| def get_summary(self) -> dict: | |
| """ | |
| Get job summary statistics. | |
| Returns: | |
| Dictionary with summary information | |
| """ | |
| return { | |
| "job_id": self.job_id, | |
| "status": self.status.value, | |
| "extraction_mode": self.extraction_mode.value, | |
| "apply_denoising": self.apply_denoising, | |
| "total_files": self.total_files, | |
| "files_processed": self.files_processed, | |
| "files_succeeded": self.files_processed - self.files_failed, | |
| "files_failed": self.files_failed, | |
| "progress_percentage": self.progress_percentage, | |
| "success_rate": self.success_rate, | |
| "total_input_duration": self.total_input_duration, | |
| "total_extracted_duration": self.total_extracted_duration, | |
| "extraction_yield": self.extraction_yield, | |
| "total_processing_time": self.total_processing_time, | |
| "created_at": self.created_at, | |
| "started_at": self.started_at, | |
| "completed_at": self.completed_at, | |
| } | |
| def to_dict(self) -> dict: | |
| """Convert job to dictionary.""" | |
| return { | |
| "job_id": self.job_id, | |
| "reference_file": self.reference_file, | |
| "input_files": self.input_files, | |
| "output_dir": self.output_dir, | |
| "extraction_mode": self.extraction_mode.value, | |
| "apply_denoising": self.apply_denoising, | |
| "vad_threshold": self.vad_threshold, | |
| "quality_threshold_enabled": self.quality_threshold_enabled, | |
| "status": self.status.value, | |
| "created_at": self.created_at, | |
| "started_at": self.started_at, | |
| "completed_at": self.completed_at, | |
| "total_files": self.total_files, | |
| "files_processed": self.files_processed, | |
| "files_failed": self.files_failed, | |
| "current_file": self.current_file, | |
| "output_files": self.output_files, | |
| "failed_files": self.failed_files, | |
| "total_input_duration": self.total_input_duration, | |
| "total_extracted_duration": self.total_extracted_duration, | |
| "total_processing_time": self.total_processing_time, | |
| "summary": self.get_summary(), | |
| } | |
| def from_dict(cls, data: dict) -> "ProcessingJob": | |
| """Create job from dictionary.""" | |
| data = data.copy() | |
| # Convert enum strings to enums | |
| if isinstance(data.get("extraction_mode"), str): | |
| data["extraction_mode"] = ExtractionMode(data["extraction_mode"]) | |
| if isinstance(data.get("status"), str): | |
| data["status"] = JobStatus(data["status"]) | |
| # Remove computed properties | |
| data.pop("summary", None) | |
| return cls(**data) | |
| def generate_report(self) -> str: | |
| """ | |
| Generate human-readable job report. | |
| Returns: | |
| Formatted report string | |
| """ | |
| report = ["=== Voice Extraction Job Report ===", ""] | |
| report.append(f"Job ID: {self.job_id}") | |
| report.append(f"Status: {self.status.value.upper()}") | |
| report.append(f"Mode: {self.extraction_mode.value}") | |
| report.append(f"Denoising: {'Enabled' if self.apply_denoising else 'Disabled'}") | |
| report.append("") | |
| report.append(f"Files Processed: {self.files_processed}/{self.total_files}") | |
| report.append(f"Success Rate: {self.success_rate:.1f}%") | |
| report.append(f"Progress: {self.progress_percentage:.1f}%") | |
| report.append("") | |
| report.append(f"Input Duration: {self.total_input_duration / 60:.1f} minutes") | |
| report.append(f"Extracted Duration: {self.total_extracted_duration / 60:.1f} minutes") | |
| report.append(f"Extraction Yield: {self.extraction_yield:.1f}%") | |
| if self.total_processing_time > 0: | |
| report.append(f"Processing Time: {self.total_processing_time / 60:.1f} minutes") | |
| if self.files_failed > 0: | |
| report.append("") | |
| report.append(f"Failed Files ({self.files_failed}):") | |
| for failure in self.failed_files[:5]: # Show first 5 | |
| report.append(f" - {failure['file']}: {failure['error']}") | |
| if len(self.failed_files) > 5: | |
| report.append(f" ... and {len(self.failed_files) - 5} more") | |
| return "\n".join(report) | |