""" ProcessingJob data model: Batch configuration and execution tracking. Represents a voice extraction job with configuration and state. """ from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import List, Literal, Optional class ExtractionMode(Enum): """Extraction mode for audio processing.""" SPEECH = "speech" NONVERBAL = "nonverbal" BOTH = "both" class JobStatus(Enum): """Processing job status.""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class ProcessingJob: """ Voice extraction processing job. Represents a batch processing job with configuration, state tracking, and results collection. """ # Input configuration reference_file: str input_files: List[str] output_dir: str # Processing options extraction_mode: ExtractionMode = ExtractionMode.SPEECH apply_denoising: bool = False vad_threshold: float = 0.5 quality_threshold_enabled: bool = True # Job state status: JobStatus = JobStatus.PENDING job_id: Optional[str] = None created_at: Optional[str] = None started_at: Optional[str] = None completed_at: Optional[str] = None # Progress tracking total_files: int = 0 files_processed: int = 0 files_failed: int = 0 current_file: Optional[str] = None # Results output_files: List[str] = field(default_factory=list) failed_files: List[dict] = field(default_factory=list) # {file, error} # Statistics total_input_duration: float = 0.0 total_extracted_duration: float = 0.0 total_processing_time: float = 0.0 def __post_init__(self): """Initialize job after creation.""" if self.job_id is None: # Generate job ID from timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.job_id = f"job_{timestamp}" if self.created_at is None: self.created_at = datetime.now().isoformat() self.total_files = len(self.input_files) @property def progress_percentage(self) -> float: """ Get job progress as percentage. Returns: Progress percentage (0-100) """ if self.total_files == 0: return 0.0 return (self.files_processed / self.total_files) * 100 @property def success_rate(self) -> float: """ Get success rate for processed files. Returns: Success rate as percentage (0-100) """ processed = self.files_processed if processed == 0: return 0.0 succeeded = processed - self.files_failed return (succeeded / processed) * 100 @property def extraction_yield(self) -> float: """ Get extraction yield percentage. Returns: Yield as percentage of input duration (0-100) """ if self.total_input_duration == 0: return 0.0 return (self.total_extracted_duration / self.total_input_duration) * 100 @property def is_complete(self) -> bool: """Check if job is complete.""" return self.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED) @property def is_running(self) -> bool: """Check if job is currently running.""" return self.status == JobStatus.RUNNING def start(self): """Mark job as started.""" self.status = JobStatus.RUNNING self.started_at = datetime.now().isoformat() def complete(self): """Mark job as completed.""" self.status = JobStatus.COMPLETED self.completed_at = datetime.now().isoformat() # Calculate total processing time if self.started_at and self.completed_at: start = datetime.fromisoformat(self.started_at) end = datetime.fromisoformat(self.completed_at) self.total_processing_time = (end - start).total_seconds() def fail(self, error: str): """Mark job as failed.""" self.status = JobStatus.FAILED self.completed_at = datetime.now().isoformat() # Add general error to failed files self.failed_files.append( { "file": "JOB", "error": error, } ) def cancel(self): """Mark job as cancelled.""" self.status = JobStatus.CANCELLED self.completed_at = datetime.now().isoformat() def add_success( self, input_file: str, output_file: str, input_duration: float, extracted_duration: float ): """ Record successful file processing. Args: input_file: Input file path output_file: Output file path input_duration: Input file duration in seconds extracted_duration: Extracted audio duration in seconds """ self.files_processed += 1 self.output_files.append(output_file) self.total_input_duration += input_duration self.total_extracted_duration += extracted_duration def add_failure(self, input_file: str, error: str): """ Record failed file processing. Args: input_file: Input file path that failed error: Error message """ self.files_processed += 1 self.files_failed += 1 self.failed_files.append( { "file": input_file, "error": error, } ) def update_progress(self, current_file: str): """ Update current processing file. Args: current_file: Currently processing file path """ self.current_file = current_file def get_summary(self) -> dict: """ Get job summary statistics. Returns: Dictionary with summary information """ return { "job_id": self.job_id, "status": self.status.value, "extraction_mode": self.extraction_mode.value, "apply_denoising": self.apply_denoising, "total_files": self.total_files, "files_processed": self.files_processed, "files_succeeded": self.files_processed - self.files_failed, "files_failed": self.files_failed, "progress_percentage": self.progress_percentage, "success_rate": self.success_rate, "total_input_duration": self.total_input_duration, "total_extracted_duration": self.total_extracted_duration, "extraction_yield": self.extraction_yield, "total_processing_time": self.total_processing_time, "created_at": self.created_at, "started_at": self.started_at, "completed_at": self.completed_at, } def to_dict(self) -> dict: """Convert job to dictionary.""" return { "job_id": self.job_id, "reference_file": self.reference_file, "input_files": self.input_files, "output_dir": self.output_dir, "extraction_mode": self.extraction_mode.value, "apply_denoising": self.apply_denoising, "vad_threshold": self.vad_threshold, "quality_threshold_enabled": self.quality_threshold_enabled, "status": self.status.value, "created_at": self.created_at, "started_at": self.started_at, "completed_at": self.completed_at, "total_files": self.total_files, "files_processed": self.files_processed, "files_failed": self.files_failed, "current_file": self.current_file, "output_files": self.output_files, "failed_files": self.failed_files, "total_input_duration": self.total_input_duration, "total_extracted_duration": self.total_extracted_duration, "total_processing_time": self.total_processing_time, "summary": self.get_summary(), } @classmethod def from_dict(cls, data: dict) -> "ProcessingJob": """Create job from dictionary.""" data = data.copy() # Convert enum strings to enums if isinstance(data.get("extraction_mode"), str): data["extraction_mode"] = ExtractionMode(data["extraction_mode"]) if isinstance(data.get("status"), str): data["status"] = JobStatus(data["status"]) # Remove computed properties data.pop("summary", None) return cls(**data) def generate_report(self) -> str: """ Generate human-readable job report. Returns: Formatted report string """ report = ["=== Voice Extraction Job Report ===", ""] report.append(f"Job ID: {self.job_id}") report.append(f"Status: {self.status.value.upper()}") report.append(f"Mode: {self.extraction_mode.value}") report.append(f"Denoising: {'Enabled' if self.apply_denoising else 'Disabled'}") report.append("") report.append(f"Files Processed: {self.files_processed}/{self.total_files}") report.append(f"Success Rate: {self.success_rate:.1f}%") report.append(f"Progress: {self.progress_percentage:.1f}%") report.append("") report.append(f"Input Duration: {self.total_input_duration / 60:.1f} minutes") report.append(f"Extracted Duration: {self.total_extracted_duration / 60:.1f} minutes") report.append(f"Extraction Yield: {self.extraction_yield:.1f}%") if self.total_processing_time > 0: report.append(f"Processing Time: {self.total_processing_time / 60:.1f} minutes") if self.files_failed > 0: report.append("") report.append(f"Failed Files ({self.files_failed}):") for failure in self.failed_files[:5]: # Show first 5 report.append(f" - {failure['file']}: {failure['error']}") if len(self.failed_files) > 5: report.append(f" ... and {len(self.failed_files) - 5} more") return "\n".join(report)