voice-tools / src /models /processing_job.py
jcudit's picture
jcudit HF Staff
fix: correct gitignore to only exclude root-level models directory, not src/models package
0456b70
"""
ProcessingJob data model: Batch configuration and execution tracking.
Represents a voice extraction job with configuration and state.
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Literal, Optional
class ExtractionMode(Enum):
"""Extraction mode for audio processing."""
SPEECH = "speech"
NONVERBAL = "nonverbal"
BOTH = "both"
class JobStatus(Enum):
"""Processing job status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ProcessingJob:
"""
Voice extraction processing job.
Represents a batch processing job with configuration, state tracking,
and results collection.
"""
# Input configuration
reference_file: str
input_files: List[str]
output_dir: str
# Processing options
extraction_mode: ExtractionMode = ExtractionMode.SPEECH
apply_denoising: bool = False
vad_threshold: float = 0.5
quality_threshold_enabled: bool = True
# Job state
status: JobStatus = JobStatus.PENDING
job_id: Optional[str] = None
created_at: Optional[str] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
# Progress tracking
total_files: int = 0
files_processed: int = 0
files_failed: int = 0
current_file: Optional[str] = None
# Results
output_files: List[str] = field(default_factory=list)
failed_files: List[dict] = field(default_factory=list) # {file, error}
# Statistics
total_input_duration: float = 0.0
total_extracted_duration: float = 0.0
total_processing_time: float = 0.0
def __post_init__(self):
"""Initialize job after creation."""
if self.job_id is None:
# Generate job ID from timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.job_id = f"job_{timestamp}"
if self.created_at is None:
self.created_at = datetime.now().isoformat()
self.total_files = len(self.input_files)
@property
def progress_percentage(self) -> float:
"""
Get job progress as percentage.
Returns:
Progress percentage (0-100)
"""
if self.total_files == 0:
return 0.0
return (self.files_processed / self.total_files) * 100
@property
def success_rate(self) -> float:
"""
Get success rate for processed files.
Returns:
Success rate as percentage (0-100)
"""
processed = self.files_processed
if processed == 0:
return 0.0
succeeded = processed - self.files_failed
return (succeeded / processed) * 100
@property
def extraction_yield(self) -> float:
"""
Get extraction yield percentage.
Returns:
Yield as percentage of input duration (0-100)
"""
if self.total_input_duration == 0:
return 0.0
return (self.total_extracted_duration / self.total_input_duration) * 100
@property
def is_complete(self) -> bool:
"""Check if job is complete."""
return self.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED)
@property
def is_running(self) -> bool:
"""Check if job is currently running."""
return self.status == JobStatus.RUNNING
def start(self):
"""Mark job as started."""
self.status = JobStatus.RUNNING
self.started_at = datetime.now().isoformat()
def complete(self):
"""Mark job as completed."""
self.status = JobStatus.COMPLETED
self.completed_at = datetime.now().isoformat()
# Calculate total processing time
if self.started_at and self.completed_at:
start = datetime.fromisoformat(self.started_at)
end = datetime.fromisoformat(self.completed_at)
self.total_processing_time = (end - start).total_seconds()
def fail(self, error: str):
"""Mark job as failed."""
self.status = JobStatus.FAILED
self.completed_at = datetime.now().isoformat()
# Add general error to failed files
self.failed_files.append(
{
"file": "JOB",
"error": error,
}
)
def cancel(self):
"""Mark job as cancelled."""
self.status = JobStatus.CANCELLED
self.completed_at = datetime.now().isoformat()
def add_success(
self, input_file: str, output_file: str, input_duration: float, extracted_duration: float
):
"""
Record successful file processing.
Args:
input_file: Input file path
output_file: Output file path
input_duration: Input file duration in seconds
extracted_duration: Extracted audio duration in seconds
"""
self.files_processed += 1
self.output_files.append(output_file)
self.total_input_duration += input_duration
self.total_extracted_duration += extracted_duration
def add_failure(self, input_file: str, error: str):
"""
Record failed file processing.
Args:
input_file: Input file path that failed
error: Error message
"""
self.files_processed += 1
self.files_failed += 1
self.failed_files.append(
{
"file": input_file,
"error": error,
}
)
def update_progress(self, current_file: str):
"""
Update current processing file.
Args:
current_file: Currently processing file path
"""
self.current_file = current_file
def get_summary(self) -> dict:
"""
Get job summary statistics.
Returns:
Dictionary with summary information
"""
return {
"job_id": self.job_id,
"status": self.status.value,
"extraction_mode": self.extraction_mode.value,
"apply_denoising": self.apply_denoising,
"total_files": self.total_files,
"files_processed": self.files_processed,
"files_succeeded": self.files_processed - self.files_failed,
"files_failed": self.files_failed,
"progress_percentage": self.progress_percentage,
"success_rate": self.success_rate,
"total_input_duration": self.total_input_duration,
"total_extracted_duration": self.total_extracted_duration,
"extraction_yield": self.extraction_yield,
"total_processing_time": self.total_processing_time,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
}
def to_dict(self) -> dict:
"""Convert job to dictionary."""
return {
"job_id": self.job_id,
"reference_file": self.reference_file,
"input_files": self.input_files,
"output_dir": self.output_dir,
"extraction_mode": self.extraction_mode.value,
"apply_denoising": self.apply_denoising,
"vad_threshold": self.vad_threshold,
"quality_threshold_enabled": self.quality_threshold_enabled,
"status": self.status.value,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
"total_files": self.total_files,
"files_processed": self.files_processed,
"files_failed": self.files_failed,
"current_file": self.current_file,
"output_files": self.output_files,
"failed_files": self.failed_files,
"total_input_duration": self.total_input_duration,
"total_extracted_duration": self.total_extracted_duration,
"total_processing_time": self.total_processing_time,
"summary": self.get_summary(),
}
@classmethod
def from_dict(cls, data: dict) -> "ProcessingJob":
"""Create job from dictionary."""
data = data.copy()
# Convert enum strings to enums
if isinstance(data.get("extraction_mode"), str):
data["extraction_mode"] = ExtractionMode(data["extraction_mode"])
if isinstance(data.get("status"), str):
data["status"] = JobStatus(data["status"])
# Remove computed properties
data.pop("summary", None)
return cls(**data)
def generate_report(self) -> str:
"""
Generate human-readable job report.
Returns:
Formatted report string
"""
report = ["=== Voice Extraction Job Report ===", ""]
report.append(f"Job ID: {self.job_id}")
report.append(f"Status: {self.status.value.upper()}")
report.append(f"Mode: {self.extraction_mode.value}")
report.append(f"Denoising: {'Enabled' if self.apply_denoising else 'Disabled'}")
report.append("")
report.append(f"Files Processed: {self.files_processed}/{self.total_files}")
report.append(f"Success Rate: {self.success_rate:.1f}%")
report.append(f"Progress: {self.progress_percentage:.1f}%")
report.append("")
report.append(f"Input Duration: {self.total_input_duration / 60:.1f} minutes")
report.append(f"Extracted Duration: {self.total_extracted_duration / 60:.1f} minutes")
report.append(f"Extraction Yield: {self.extraction_yield:.1f}%")
if self.total_processing_time > 0:
report.append(f"Processing Time: {self.total_processing_time / 60:.1f} minutes")
if self.files_failed > 0:
report.append("")
report.append(f"Failed Files ({self.files_failed}):")
for failure in self.failed_files[:5]: # Show first 5
report.append(f" - {failure['file']}: {failure['error']}")
if len(self.failed_files) > 5:
report.append(f" ... and {len(self.failed_files) - 5} more")
return "\n".join(report)