""" Progress monitoring utilities for tracking parallel processing. """ import time import threading import logging from typing import Dict, List, Any, Set logger = logging.getLogger(__name__) class ProgressTracker: """ Tracks progress of parallel document processing tasks. """ def __init__(self, total_documents, update_interval=5): """ Initialize the progress tracker. Args: total_documents: Total number of documents to process update_interval: How often to log updates (in seconds) """ self.total = total_documents self.completed = 0 self.failed = 0 self.in_progress = 0 self.processed_files = set() self.update_interval = update_interval self.lock = threading.Lock() self.start_time = time.time() self.monitor_thread = None self.stop_monitoring = threading.Event() def mark_started(self, filename): """Mark a document as being processed.""" with self.lock: self.in_progress += 1 logger.info(f"Started processing: {filename}") def mark_completed(self, filename, success=True): """Mark a document as completed.""" with self.lock: self.in_progress -= 1 if filename not in self.processed_files: self.processed_files.add(filename) if success: self.completed += 1 else: self.failed += 1 def get_stats(self): """Get current processing statistics.""" with self.lock: elapsed = time.time() - self.start_time remaining = self.total - (self.completed + self.failed) # Calculate estimated time remaining if self.completed > 0: avg_time_per_doc = elapsed / self.completed est_remaining = avg_time_per_doc * remaining else: est_remaining = None return { 'total': self.total, 'completed': self.completed, 'failed': self.failed, 'in_progress': self.in_progress, 'remaining': remaining, 'elapsed_seconds': elapsed, 'estimated_remaining_seconds': est_remaining } def _format_time(self, seconds): """Format seconds as HH:MM:SS.""" if seconds is None: return "unknown" hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def _monitor_progress(self): """Monitor and log progress periodically.""" while not self.stop_monitoring.is_set(): stats = self.get_stats() logger.info( f"Progress: {stats['completed']}/{stats['total']} completed, " f"{stats['failed']} failed, {stats['in_progress']} in progress | " f"Elapsed: {self._format_time(stats['elapsed_seconds'])} | " f"Est. remaining: {self._format_time(stats['estimated_remaining_seconds'])}" ) # Check if we're done if stats['completed'] + stats['failed'] >= stats['total']: logger.info("All documents processed!") break # Wait for next update self.stop_monitoring.wait(self.update_interval) def start_monitoring(self): """Start background monitoring thread.""" self.monitor_thread = threading.Thread(target=self._monitor_progress) self.monitor_thread.daemon = True self.monitor_thread.start() def stop(self): """Stop the monitoring thread and report final results.""" if self.monitor_thread and self.monitor_thread.is_alive(): self.stop_monitoring.set() self.monitor_thread.join(timeout=2.0) # Log final statistics stats = self.get_stats() logger.info( f"Final results: {stats['completed']}/{stats['total']} completed, " f"{stats['failed']} failed | " f"Total time: {self._format_time(stats['elapsed_seconds'])}" ) success_rate = (stats['completed'] / stats['total']) * 100 if stats['total'] > 0 else 0 logger.info(f"Success rate: {success_rate:.2f}%") return stats