| | """ |
| | Progress Tracking Module |
| | Handles progress monitoring, ETA calculations, and performance statistics |
| | """ |
| |
|
| | import time |
| | import logging |
| | from typing import Optional, Callable, Dict, Any, List |
| | from dataclasses import dataclass, field |
| | from collections import deque |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | @dataclass |
| | class ProgressSnapshot: |
| | """Snapshot of progress at a specific point in time""" |
| | timestamp: float |
| | frame_number: int |
| | stage: str |
| | fps: float |
| | memory_usage_mb: Optional[float] = None |
| | custom_metrics: Dict[str, Any] = field(default_factory=dict) |
| |
|
| | class ProgressTracker: |
| | """ |
| | Enhanced progress tracking with detailed statistics and ETA calculations |
| | """ |
| | |
| | def __init__(self, total_frames: int, callback: Optional[Callable] = None, |
| | track_performance: bool = True): |
| | self.total_frames = total_frames |
| | self.callback = callback |
| | self.track_performance = track_performance |
| | |
| | |
| | self.start_time = time.time() |
| | self.last_update_time = self.start_time |
| | self.processed_frames = 0 |
| | |
| | |
| | self.frame_times = deque(maxlen=100) |
| | self.fps_history = deque(maxlen=50) |
| | self.snapshots: List[ProgressSnapshot] = [] |
| | |
| | |
| | self.current_stage = "initializing" |
| | self.stage_start_time = self.start_time |
| | self.stages_completed = [] |
| | |
| | |
| | self.stats = { |
| | 'total_processing_time': 0.0, |
| | 'average_fps': 0.0, |
| | 'peak_fps': 0.0, |
| | 'slowest_fps': float('inf'), |
| | 'frames_per_second_variance': 0.0, |
| | 'estimated_completion_accuracy': 0.0, |
| | 'stage_times': {}, |
| | 'memory_peak_mb': 0.0 |
| | } |
| | |
| | |
| | self.eta_smoothing_factor = 0.2 |
| | self.smoothed_fps = 0.0 |
| | |
| | logger.debug(f"ProgressTracker initialized for {total_frames} frames") |
| | |
| | def update(self, frame_number: int, stage: str = "", |
| | custom_metrics: Optional[Dict[str, Any]] = None, |
| | memory_usage_mb: Optional[float] = None): |
| | """ |
| | Update progress with comprehensive tracking |
| | |
| | Args: |
| | frame_number: Current frame being processed |
| | stage: Current processing stage description |
| | custom_metrics: Additional metrics to track |
| | memory_usage_mb: Current memory usage in MB |
| | """ |
| | current_time = time.time() |
| | |
| | |
| | if stage and stage != self.current_stage: |
| | self._complete_stage() |
| | self.current_stage = stage |
| | self.stage_start_time = current_time |
| | |
| | |
| | if self.processed_frames > 0: |
| | frame_time = current_time - self.last_update_time |
| | self.frame_times.append(frame_time) |
| | |
| | self.processed_frames = frame_number |
| | self.last_update_time = current_time |
| | |
| | |
| | elapsed_time = current_time - self.start_time |
| | current_fps = self._calculate_current_fps() |
| | |
| | |
| | if current_fps > 0: |
| | self.fps_history.append(current_fps) |
| | self._update_smoothed_fps(current_fps) |
| | |
| | |
| | eta_seconds = self._calculate_eta() |
| | progress_pct = self.processed_frames / self.total_frames if self.total_frames > 0 else 0 |
| | |
| | |
| | self._update_statistics(current_fps, memory_usage_mb) |
| | |
| | |
| | if self.track_performance: |
| | snapshot = ProgressSnapshot( |
| | timestamp=current_time, |
| | frame_number=frame_number, |
| | stage=self.current_stage, |
| | fps=current_fps, |
| | memory_usage_mb=memory_usage_mb, |
| | custom_metrics=custom_metrics or {} |
| | ) |
| | self.snapshots.append(snapshot) |
| | |
| | |
| | message = self._generate_progress_message( |
| | elapsed_time, current_fps, eta_seconds, stage |
| | ) |
| | |
| | |
| | if self.callback: |
| | try: |
| | self.callback(progress_pct, message) |
| | except Exception as e: |
| | logger.warning(f"Progress callback failed: {e}") |
| | |
| | |
| | if frame_number % 50 == 0 or frame_number == self.total_frames: |
| | self._log_detailed_progress(progress_pct, current_fps, eta_seconds) |
| | |
| | def _calculate_current_fps(self) -> float: |
| | """Calculate current FPS based on recent frame times""" |
| | if not self.frame_times: |
| | return 0.0 |
| | |
| | |
| | recent_frame_times = list(self.frame_times)[-10:] |
| | avg_frame_time = sum(recent_frame_times) / len(recent_frame_times) |
| | |
| | return 1.0 / avg_frame_time if avg_frame_time > 0 else 0.0 |
| | |
| | def _update_smoothed_fps(self, current_fps: float): |
| | """Update smoothed FPS using exponential smoothing""" |
| | if self.smoothed_fps == 0.0: |
| | self.smoothed_fps = current_fps |
| | else: |
| | self.smoothed_fps = ( |
| | self.eta_smoothing_factor * current_fps + |
| | (1 - self.eta_smoothing_factor) * self.smoothed_fps |
| | ) |
| | |
| | def _calculate_eta(self) -> float: |
| | """Calculate estimated time to completion""" |
| | if self.processed_frames <= 0 or self.smoothed_fps <= 0: |
| | return 0.0 |
| | |
| | remaining_frames = self.total_frames - self.processed_frames |
| | return remaining_frames / self.smoothed_fps |
| | |
| | def _update_statistics(self, current_fps: float, memory_usage_mb: Optional[float]): |
| | """Update comprehensive statistics""" |
| | current_time = time.time() |
| | self.stats['total_processing_time'] = current_time - self.start_time |
| | |
| | |
| | if self.fps_history: |
| | fps_list = list(self.fps_history) |
| | self.stats['average_fps'] = sum(fps_list) / len(fps_list) |
| | self.stats['peak_fps'] = max(fps_list) |
| | self.stats['slowest_fps'] = min(fps_list) |
| | |
| | |
| | avg_fps = self.stats['average_fps'] |
| | variance = sum((fps - avg_fps) ** 2 for fps in fps_list) / len(fps_list) |
| | self.stats['frames_per_second_variance'] = variance |
| | |
| | |
| | if memory_usage_mb and memory_usage_mb > self.stats['memory_peak_mb']: |
| | self.stats['memory_peak_mb'] = memory_usage_mb |
| | |
| | def _complete_stage(self): |
| | """Complete the current stage and record its duration""" |
| | if self.current_stage: |
| | stage_duration = time.time() - self.stage_start_time |
| | self.stats['stage_times'][self.current_stage] = stage_duration |
| | self.stages_completed.append({ |
| | 'stage': self.current_stage, |
| | 'duration': stage_duration, |
| | 'frames_processed': self.processed_frames |
| | }) |
| | logger.debug(f"Completed stage '{self.current_stage}' in {stage_duration:.2f}s") |
| | |
| | def _generate_progress_message(self, elapsed_time: float, current_fps: float, |
| | eta_seconds: float, stage: str) -> str: |
| | """Generate comprehensive progress message""" |
| | |
| | message = ( |
| | f"Frame {self.processed_frames}/{self.total_frames} | " |
| | f"Elapsed: {self._format_time(elapsed_time)} | " |
| | f"Speed: {current_fps:.1f} fps" |
| | ) |
| | |
| | |
| | if eta_seconds > 0: |
| | message += f" | ETA: {self._format_time(eta_seconds)}" |
| | |
| | |
| | if stage: |
| | message = f"{stage} | {message}" |
| | |
| | |
| | if self.fps_history and len(self.fps_history) >= 10: |
| | recent_avg = sum(list(self.fps_history)[-10:]) / 10 |
| | if abs(current_fps - recent_avg) / recent_avg > 0.2: |
| | trend = "↗" if current_fps > recent_avg else "↘" |
| | message += f" {trend}" |
| | |
| | return message |
| | |
| | def _format_time(self, seconds: float) -> str: |
| | """Format time duration in human-readable format""" |
| | if seconds < 60: |
| | return f"{int(seconds)}s" |
| | elif seconds < 3600: |
| | minutes = int(seconds // 60) |
| | secs = int(seconds % 60) |
| | return f"{minutes}m {secs}s" |
| | else: |
| | hours = int(seconds // 3600) |
| | minutes = int((seconds % 3600) // 60) |
| | return f"{hours}h {minutes}m" |
| | |
| | def _log_detailed_progress(self, progress_pct: float, current_fps: float, eta_seconds: float): |
| | """Log detailed progress information""" |
| | logger.info( |
| | f"Progress: {progress_pct*100:.1f}% | " |
| | f"FPS: {current_fps:.1f} (avg: {self.stats['average_fps']:.1f}) | " |
| | f"ETA: {self._format_time(eta_seconds)} | " |
| | f"Stage: {self.current_stage}" |
| | ) |
| | |
| | def set_stage(self, stage: str): |
| | """Manually set the current processing stage""" |
| | if stage != self.current_stage: |
| | self._complete_stage() |
| | self.current_stage = stage |
| | self.stage_start_time = time.time() |
| | logger.debug(f"Stage changed to: {stage}") |
| | |
| | def add_custom_metric(self, key: str, value: Any): |
| | """Add a custom metric to the current snapshot""" |
| | if self.snapshots: |
| | self.snapshots[-1].custom_metrics[key] = value |
| | |
| | def get_performance_summary(self) -> Dict[str, Any]: |
| | """Get comprehensive performance summary""" |
| | self._complete_stage() |
| | |
| | total_time = time.time() - self.start_time |
| | |
| | summary = { |
| | 'total_frames': self.total_frames, |
| | 'processed_frames': self.processed_frames, |
| | 'completion_percentage': (self.processed_frames / self.total_frames * 100) if self.total_frames > 0 else 0, |
| | 'total_processing_time': total_time, |
| | 'overall_fps': self.processed_frames / total_time if total_time > 0 else 0, |
| | 'stages_completed': len(self.stages_completed), |
| | 'current_stage': self.current_stage, |
| | 'statistics': self.stats.copy(), |
| | 'stage_breakdown': self.stages_completed.copy() |
| | } |
| | |
| | |
| | if self.stats['stage_times']: |
| | total_stage_time = sum(self.stats['stage_times'].values()) |
| | summary['stage_percentages'] = { |
| | stage: (duration / total_stage_time * 100) |
| | for stage, duration in self.stats['stage_times'].items() |
| | } |
| | |
| | |
| | if self.fps_history: |
| | fps_list = list(self.fps_history) |
| | summary['performance_analysis'] = { |
| | 'fps_stability': self._calculate_fps_stability(), |
| | 'performance_trend': self._analyze_performance_trend(), |
| | 'bottleneck_detection': self._detect_bottlenecks() |
| | } |
| | |
| | return summary |
| | |
| | def _calculate_fps_stability(self) -> str: |
| | """Analyze FPS stability""" |
| | if not self.fps_history or len(self.fps_history) < 10: |
| | return "insufficient_data" |
| | |
| | variance = self.stats['frames_per_second_variance'] |
| | avg_fps = self.stats['average_fps'] |
| | |
| | if avg_fps == 0: |
| | return "unstable" |
| | |
| | coefficient_of_variation = (variance ** 0.5) / avg_fps |
| | |
| | if coefficient_of_variation < 0.1: |
| | return "very_stable" |
| | elif coefficient_of_variation < 0.2: |
| | return "stable" |
| | elif coefficient_of_variation < 0.4: |
| | return "moderate" |
| | else: |
| | return "unstable" |
| | |
| | def _analyze_performance_trend(self) -> str: |
| | """Analyze performance trend over time""" |
| | if len(self.fps_history) < 20: |
| | return "insufficient_data" |
| | |
| | |
| | fps_list = list(self.fps_history) |
| | quartile_size = len(fps_list) // 4 |
| | |
| | first_quartile_avg = sum(fps_list[:quartile_size]) / quartile_size |
| | last_quartile_avg = sum(fps_list[-quartile_size:]) / quartile_size |
| | |
| | change_percent = ((last_quartile_avg - first_quartile_avg) / first_quartile_avg) * 100 |
| | |
| | if change_percent > 10: |
| | return "improving" |
| | elif change_percent < -10: |
| | return "degrading" |
| | else: |
| | return "stable" |
| | |
| | def _detect_bottlenecks(self) -> List[str]: |
| | """Detect potential performance bottlenecks""" |
| | bottlenecks = [] |
| | |
| | |
| | if self.stats['average_fps'] < 0.5: |
| | bottlenecks.append("very_low_fps") |
| | |
| | |
| | if self.stats['frames_per_second_variance'] > (self.stats['average_fps'] * 0.5) ** 2: |
| | bottlenecks.append("inconsistent_performance") |
| | |
| | |
| | if self.stats['memory_peak_mb'] > 8000: |
| | bottlenecks.append("high_memory_usage") |
| | |
| | |
| | if self.stats['stage_times']: |
| | stage_times = list(self.stats['stage_times'].values()) |
| | max_time = max(stage_times) |
| | avg_time = sum(stage_times) / len(stage_times) |
| | |
| | if max_time > avg_time * 3: |
| | bottlenecks.append("stage_imbalance") |
| | |
| | return bottlenecks |
| | |
| | def export_performance_data(self) -> Dict[str, Any]: |
| | """Export detailed performance data for analysis""" |
| | return { |
| | 'metadata': { |
| | 'total_frames': self.total_frames, |
| | 'tracking_enabled': self.track_performance, |
| | 'start_time': self.start_time, |
| | 'export_time': time.time() |
| | }, |
| | 'snapshots': [ |
| | { |
| | 'timestamp': snap.timestamp, |
| | 'frame_number': snap.frame_number, |
| | 'stage': snap.stage, |
| | 'fps': snap.fps, |
| | 'memory_usage_mb': snap.memory_usage_mb, |
| | 'custom_metrics': snap.custom_metrics |
| | } |
| | for snap in self.snapshots |
| | ], |
| | 'statistics': self.stats, |
| | 'stages': self.stages_completed, |
| | 'performance_summary': self.get_performance_summary() |
| | } |
| | |
| | def reset(self, new_total_frames: Optional[int] = None): |
| | """Reset tracker for new processing session""" |
| | if new_total_frames is not None: |
| | self.total_frames = new_total_frames |
| | |
| | self.start_time = time.time() |
| | self.last_update_time = self.start_time |
| | self.processed_frames = 0 |
| | self.frame_times.clear() |
| | self.fps_history.clear() |
| | self.snapshots.clear() |
| | self.current_stage = "initializing" |
| | self.stage_start_time = self.start_time |
| | self.stages_completed.clear() |
| | self.smoothed_fps = 0.0 |
| | |
| | |
| | self.stats = { |
| | 'total_processing_time': 0.0, |
| | 'average_fps': 0.0, |
| | 'peak_fps': 0.0, |
| | 'slowest_fps': float('inf'), |
| | 'frames_per_second_variance': 0.0, |
| | 'estimated_completion_accuracy': 0.0, |
| | 'stage_times': {}, |
| | 'memory_peak_mb': 0.0 |
| | } |
| | |
| | logger.debug("ProgressTracker reset") |
| | |
| | def finalize(self) -> Dict[str, Any]: |
| | """Finalize tracking and return comprehensive results""" |
| | self._complete_stage() |
| | final_summary = self.get_performance_summary() |
| | |
| | logger.info( |
| | f"Processing completed: {self.processed_frames}/{self.total_frames} frames " |
| | f"in {self._format_time(final_summary['total_processing_time'])} " |
| | f"(avg: {final_summary['overall_fps']:.1f} fps)" |
| | ) |
| | |
| | return final_summary |