""" ChatterboxTTS Progress Tracking & Performance Monitoring Module ============================================================== OVERVIEW: This module provides comprehensive progress tracking, performance monitoring, and logging for ChatterboxTTS audiobook generation. It handles real-time ETA calculations, VRAM usage monitoring, and detailed logging for debugging and optimization. MAIN COMPONENTS: 1. LOGGING SYSTEM: File + console logging with color-coded output 2. PROGRESS TRACKING: Real-time progress display with ETA calculations 3. PERFORMANCE MONITORING: GPU memory usage, processing times, realtime factors 4. BATCH PROGRESS: Multi-chapter audiobook progress aggregation 5. SYSTEM MONITORING: VRAM safety thresholds and memory optimization KEY FEATURES: - Real-time ETA updates during TTS processing - VRAM usage monitoring with automatic cleanup - Performance metrics (realtime factor, chunks/minute) - Color-coded console output for different message types - Detailed logging for troubleshooting and optimization - Memory-safe processing with configurable thresholds PERFORMANCE ENHANCEMENTS: - Added producer-consumer pipeline progress tracking - Enhanced ETA calculations for more accurate estimates - GPU memory monitoring prevents VRAM exhaustion - Automatic memory cleanup and garbage collection - Processing speed metrics for performance optimization USAGE: Called by TTS engine during audiobook generation to provide user feedback and monitor system resource usage for safe, efficient processing. """ import time import sys import logging from datetime import timedelta from pathlib import Path from config.config import * # ============================================================================ # LOGGING SETUP # ============================================================================ def setup_logging(log_dir): """Setup logging configuration""" log_file = log_dir / "chunk_validation.log" # Clear existing log open(log_file, 'w').close() logging.basicConfig( filename=str(log_file), level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", filemode='w' # Overwrite existing log ) # Also log to console for important messages console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) formatter = logging.Formatter('%(levelname)s - %(message)s') console_handler.setFormatter(formatter) logging.getLogger().addHandler(console_handler) def log_console(message, color=None): """Log to both console and file with optional color""" color_codes = { "RED": RED, "GREEN": GREEN, "YELLOW": YELLOW, "CYAN": CYAN, "BOLD": BOLD, "RESET": RESET } prefix = color_codes.get(color, "") suffix = RESET if color else "" print(f"{prefix}{message}{suffix}") logging.info(message) def log_run(message, log_path): """Log to run file""" with open(log_path, "a", encoding="utf-8") as logf: logf.write(message + "\n") # ============================================================================ # PROGRESS TRACKING # ============================================================================ def log_chunk_progress(i, total_chunks, start_time, total_audio_duration=0.0): """Enhanced progress logging with accurate realtime factor""" elapsed = time.time() - start_time avg_time = elapsed / (i + 1) eta = avg_time * total_chunks remaining = eta - elapsed def fmt(seconds): return str(timedelta(seconds=int(seconds))) # Show VRAM usage in progress allocated, _ = monitor_vram_usage("chunk_progress") # Calculate ACCURATE realtime factor using actual audio duration if total_audio_duration > 0 and elapsed > 0: actual_realtime = total_audio_duration / elapsed realtime_str = f"{GREEN}{actual_realtime:.2f}x{RESET}" audio_str = f" | Audio: {GREEN}{fmt(total_audio_duration)}{RESET}" else: actual_realtime = 0.0 # Default value when calculating realtime_str = f"{YELLOW}Calculating...{RESET}" audio_str = "" # Force immediate output with explicit flushing progress_msg = (f"\nšŸŒ€ Chunk {i+1}/{total_chunks} | ā± Elapsed: {CYAN}{fmt(elapsed)}{RESET} | " f"ETA: {CYAN}{fmt(eta)}{RESET} | Remaining: {YELLOW}{fmt(remaining)}{RESET} | " f"Realtime: {realtime_str} | VRAM: {GREEN}{allocated:.1f}GB{RESET}{audio_str}") print(progress_msg) sys.stdout.flush() # Force immediate output # Create clean status message for GUI (without ANSI color codes) realtime_display = f"{actual_realtime:.2f}x" if actual_realtime > 0 else "Calculating..." clean_status = (f"Elapsed: {fmt(elapsed)} | ETA: {fmt(eta)} | Remaining: {fmt(remaining)} | " f"Realtime: {realtime_display} | VRAM: {allocated:.1f}GB" + (f" | Audio: {fmt(total_audio_duration)}" if total_audio_duration > 0 else "")) # Emit status to GUI if callback is available if hasattr(log_chunk_progress, '_status_callback') and log_chunk_progress._status_callback: log_chunk_progress._status_callback(clean_status) # Also log to file for debugging realtime_log = f"{actual_realtime:.2f}x" if actual_realtime > 0 else "N/A" logging.info(f"Progress: Chunk {i+1}/{total_chunks}, Elapsed: {fmt(elapsed)}, " f"ETA: {fmt(eta)}, Realtime: {realtime_log}, " f"Audio Duration: {fmt(total_audio_duration)}, VRAM: {allocated:.1f}GB") def display_batch_progress(batch_start, batch_end, total_chunks): """Display batch processing progress""" batch_progress = (batch_end / total_chunks) * 100 print(f"\nšŸ“Š Batch Progress: {batch_start+1}-{batch_end}/{total_chunks} ({batch_progress:.1f}%)") def display_final_summary(elapsed_time, audio_duration, chunk_count, realtime_factor): """Display final processing summary""" elapsed_td = timedelta(seconds=int(elapsed_time)) audio_td = timedelta(seconds=int(audio_duration)) print(f"\nšŸŽ‰ {GREEN}Processing Complete!{RESET}") print(f"šŸ“Š Final Statistics:") print(f" ā±ļø Processing Time: {CYAN}{elapsed_td}{RESET}") print(f" šŸŽµ Audio Duration: {GREEN}{audio_td}{RESET}") print(f" šŸ“¦ Total Chunks: {YELLOW}{chunk_count}{RESET}") print(f" šŸš€ Realtime Factor: {BOLD}{realtime_factor:.2f}x{RESET}") print(f" šŸ’¾ Memory Efficiency: {GREEN}Optimized{RESET}") # ============================================================================ # VRAM AND PERFORMANCE MONITORING # ============================================================================ def monitor_vram_usage(operation_name=""): """Real-time VRAM monitoring with threshold warnings""" import torch if not torch.cuda.is_available(): return 0, 0 allocated = torch.cuda.memory_allocated() / 1024**3 reserved = torch.cuda.memory_reserved() / 1024**3 if allocated > VRAM_SAFETY_THRESHOLD: logging.warning(f"āš ļø High VRAM usage during {operation_name}: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved") # Trigger memory optimization if available optimize_memory_if_needed() return allocated, reserved def monitor_gpu_utilization(): """Monitor GPU utilization if pynvml is available""" try: import pynvml pynvml.nvmlInit() handle = pynvml.nvmlDeviceGetHandleByIndex(0) util = pynvml.nvmlDeviceGetUtilizationRates(handle) temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) return { "gpu_util": util.gpu, "memory_util": util.memory, "temperature": temp } except: return {"gpu_util": "N/A", "memory_util": "N/A", "temperature": "N/A"} def optimize_memory_if_needed(): """Trigger memory optimization when thresholds are exceeded""" try: # Try to use the enhanced CUDA memory optimization if available from modules.tts_engine import optimize_cuda_memory_usage optimize_cuda_memory_usage() except ImportError: # Fallback to basic optimization import torch import gc torch.cuda.empty_cache() gc.collect() if torch.cuda.is_available(): torch.cuda.ipc_collect() def display_system_info(): """Display system information at startup""" import torch print(f"\nšŸ–„ļø {CYAN}System Information:{RESET}") # CUDA info if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) total_vram = torch.cuda.get_device_properties(0).total_memory / 1024**3 print(f" GPU: {GREEN}{gpu_name}{RESET}") print(f" VRAM: {GREEN}{total_vram:.1f}GB{RESET}") print(f" CUDA Version: {GREEN}{torch.version.cuda}{RESET}") else: print(f" GPU: {RED}Not Available{RESET}") # Memory threshold print(f" VRAM Safety Threshold: {YELLOW}{VRAM_SAFETY_THRESHOLD}GB{RESET}") # Worker configuration print(f" Max Workers: {YELLOW}{MAX_WORKERS}{RESET}") print(f" Dynamic Workers: {YELLOW}{USE_DYNAMIC_WORKERS}{RESET}") # ============================================================================ # PERFORMANCE TRACKING # ============================================================================ class PerformanceTracker: """ PERFORMANCE TRACKING CLASS - Core metrics collection and analysis =============================================================== PURPOSE: This class provides comprehensive performance monitoring for TTS processing, tracking timing, memory usage, and generating detailed performance reports for optimization and debugging. TRACKED METRICS: - Individual chunk processing times - VRAM usage per chunk (allocated vs reserved memory) - Batch processing times for multi-chapter books - Overall processing statistics and trends - Real-time factor calculations (audio time vs processing time) USAGE FLOW: 1. Initialize at start of TTS session 2. Log chunk completions during processing 3. Track batch completions for multi-part books 4. Generate final performance report BENEFITS: - Identifies processing bottlenecks - Monitors memory usage patterns - Provides user feedback on progress - Enables performance optimization - Helps debug processing issues """ def __init__(self): """Initialize performance tracking with baseline metrics""" self.start_time = time.time() # Session start timestamp self.chunk_times = [] # Individual chunk processing times self.vram_usage = [] # VRAM usage snapshots (chunk_id, allocated, reserved) self.batch_times = [] # Batch processing times for multi-chapter books def log_chunk_completion(self, chunk_index, audio_duration): """Log individual chunk completion""" current_time = time.time() chunk_time = current_time - (self.start_time + sum(self.chunk_times)) self.chunk_times.append(chunk_time) # Track VRAM allocated, reserved = monitor_vram_usage() self.vram_usage.append((chunk_index, allocated, reserved)) def log_batch_completion(self, batch_size): """Log batch completion""" if len(self.chunk_times) >= batch_size: batch_time = sum(self.chunk_times[-batch_size:]) self.batch_times.append(batch_time) def get_performance_summary(self): """Get comprehensive performance summary""" total_time = time.time() - self.start_time avg_chunk_time = sum(self.chunk_times) / len(self.chunk_times) if self.chunk_times else 0 vram_peak = max([usage[1] for usage in self.vram_usage]) if self.vram_usage else 0 vram_avg = sum([usage[1] for usage in self.vram_usage]) / len(self.vram_usage) if self.vram_usage else 0 return { "total_time": total_time, "avg_chunk_time": avg_chunk_time, "total_chunks": len(self.chunk_times), "vram_peak": vram_peak, "vram_average": vram_avg, "batch_count": len(self.batch_times) } # ============================================================================ # ERROR AND WARNING TRACKING # ============================================================================ def log_processing_error(chunk_id, error_message, error_type="GENERAL"): """Log processing errors with categorization""" timestamp = time.strftime('%Y-%m-%d %H:%M:%S') error_log = f"[{timestamp}] {error_type} ERROR - Chunk {chunk_id}: {error_message}" logging.error(error_log) print(f"{RED}āŒ Error in chunk {chunk_id}: {error_message}{RESET}") def log_processing_warning(chunk_id, warning_message, warning_type="GENERAL"): """Log processing warnings with categorization""" timestamp = time.strftime('%Y-%m-%d %H:%M:%S') warning_log = f"[{timestamp}] {warning_type} WARNING - Chunk {chunk_id}: {warning_message}" logging.warning(warning_log) print(f"{YELLOW}āš ļø Warning in chunk {chunk_id}: {warning_message}{RESET}") # ============================================================================ # REAL-TIME STATUS DISPLAY # ============================================================================ def create_status_line(current_chunk, total_chunks, elapsed_time, realtime_factor, vram_usage): """Create a single-line status for real-time updates""" progress_percent = (current_chunk / total_chunks) * 100 elapsed_str = str(timedelta(seconds=int(elapsed_time))) status = (f"šŸ”„ {current_chunk}/{total_chunks} ({progress_percent:.1f}%) | " f"ā±ļø {elapsed_str} | šŸš€ {realtime_factor:.2f}x | šŸ’¾ {vram_usage:.1f}GB") return status def update_status_line(status_message): """Update status line in place""" print(f"\r{status_message}", end='', flush=True) # ============================================================================ # EXPORT FUNCTIONS # ============================================================================ def export_performance_report(output_dir, performance_data): """Export detailed performance report""" report_path = output_dir / "performance_report.txt" with open(report_path, 'w', encoding='utf-8') as f: f.write("GenTTS Performance Report\n") f.write("=" * 50 + "\n\n") f.write(f"Processing Summary:\n") f.write(f" Total Processing Time: {timedelta(seconds=int(performance_data['total_time']))}\n") f.write(f" Average Chunk Time: {performance_data['avg_chunk_time']:.2f}s\n") f.write(f" Total Chunks Processed: {performance_data['total_chunks']}\n") f.write(f" Peak VRAM Usage: {performance_data['vram_peak']:.2f}GB\n") f.write(f" Average VRAM Usage: {performance_data['vram_average']:.2f}GB\n") f.write(f" Batch Count: {performance_data['batch_count']}\n") return report_path