aya-summarization / app /utils /progress_tracker.py
Alphin Jain
First commit
334c1a6
"""
Progress monitoring utilities for tracking parallel processing.
"""
import time
import threading
import logging
from typing import Dict, List, Any, Set
logger = logging.getLogger(__name__)
class ProgressTracker:
"""
Tracks progress of parallel document processing tasks.
"""
def __init__(self, total_documents, update_interval=5):
"""
Initialize the progress tracker.
Args:
total_documents: Total number of documents to process
update_interval: How often to log updates (in seconds)
"""
self.total = total_documents
self.completed = 0
self.failed = 0
self.in_progress = 0
self.processed_files = set()
self.update_interval = update_interval
self.lock = threading.Lock()
self.start_time = time.time()
self.monitor_thread = None
self.stop_monitoring = threading.Event()
def mark_started(self, filename):
"""Mark a document as being processed."""
with self.lock:
self.in_progress += 1
logger.info(f"Started processing: {filename}")
def mark_completed(self, filename, success=True):
"""Mark a document as completed."""
with self.lock:
self.in_progress -= 1
if filename not in self.processed_files:
self.processed_files.add(filename)
if success:
self.completed += 1
else:
self.failed += 1
def get_stats(self):
"""Get current processing statistics."""
with self.lock:
elapsed = time.time() - self.start_time
remaining = self.total - (self.completed + self.failed)
# Calculate estimated time remaining
if self.completed > 0:
avg_time_per_doc = elapsed / self.completed
est_remaining = avg_time_per_doc * remaining
else:
est_remaining = None
return {
'total': self.total,
'completed': self.completed,
'failed': self.failed,
'in_progress': self.in_progress,
'remaining': remaining,
'elapsed_seconds': elapsed,
'estimated_remaining_seconds': est_remaining
}
def _format_time(self, seconds):
"""Format seconds as HH:MM:SS."""
if seconds is None:
return "unknown"
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _monitor_progress(self):
"""Monitor and log progress periodically."""
while not self.stop_monitoring.is_set():
stats = self.get_stats()
logger.info(
f"Progress: {stats['completed']}/{stats['total']} completed, "
f"{stats['failed']} failed, {stats['in_progress']} in progress | "
f"Elapsed: {self._format_time(stats['elapsed_seconds'])} | "
f"Est. remaining: {self._format_time(stats['estimated_remaining_seconds'])}"
)
# Check if we're done
if stats['completed'] + stats['failed'] >= stats['total']:
logger.info("All documents processed!")
break
# Wait for next update
self.stop_monitoring.wait(self.update_interval)
def start_monitoring(self):
"""Start background monitoring thread."""
self.monitor_thread = threading.Thread(target=self._monitor_progress)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop(self):
"""Stop the monitoring thread and report final results."""
if self.monitor_thread and self.monitor_thread.is_alive():
self.stop_monitoring.set()
self.monitor_thread.join(timeout=2.0)
# Log final statistics
stats = self.get_stats()
logger.info(
f"Final results: {stats['completed']}/{stats['total']} completed, "
f"{stats['failed']} failed | "
f"Total time: {self._format_time(stats['elapsed_seconds'])}"
)
success_rate = (stats['completed'] / stats['total']) * 100 if stats['total'] > 0 else 0
logger.info(f"Success rate: {success_rate:.2f}%")
return stats