File size: 4,428 Bytes
334c1a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Progress monitoring utilities for tracking parallel processing.
"""
import time
import threading
import logging
from typing import Dict, List, Any, Set

logger = logging.getLogger(__name__)


class ProgressTracker:
    """
    Tracks progress of parallel document processing tasks.
    """

    def __init__(self, total_documents, update_interval=5):
        """
        Initialize the progress tracker.

        Args:
            total_documents: Total number of documents to process
            update_interval: How often to log updates (in seconds)
        """
        self.total = total_documents
        self.completed = 0
        self.failed = 0
        self.in_progress = 0
        self.processed_files = set()
        self.update_interval = update_interval
        self.lock = threading.Lock()
        self.start_time = time.time()
        self.monitor_thread = None
        self.stop_monitoring = threading.Event()

    def mark_started(self, filename):
        """Mark a document as being processed."""
        with self.lock:
            self.in_progress += 1
            logger.info(f"Started processing: {filename}")

    def mark_completed(self, filename, success=True):
        """Mark a document as completed."""
        with self.lock:
            self.in_progress -= 1
            if filename not in self.processed_files:
                self.processed_files.add(filename)
                if success:
                    self.completed += 1
                else:
                    self.failed += 1

    def get_stats(self):
        """Get current processing statistics."""
        with self.lock:
            elapsed = time.time() - self.start_time
            remaining = self.total - (self.completed + self.failed)

            # Calculate estimated time remaining
            if self.completed > 0:
                avg_time_per_doc = elapsed / self.completed
                est_remaining = avg_time_per_doc * remaining
            else:
                est_remaining = None

            return {
                'total': self.total,
                'completed': self.completed,
                'failed': self.failed,
                'in_progress': self.in_progress,
                'remaining': remaining,
                'elapsed_seconds': elapsed,
                'estimated_remaining_seconds': est_remaining
            }

    def _format_time(self, seconds):
        """Format seconds as HH:MM:SS."""
        if seconds is None:
            return "unknown"

        hours, remainder = divmod(int(seconds), 3600)
        minutes, seconds = divmod(remainder, 60)
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

    def _monitor_progress(self):
        """Monitor and log progress periodically."""
        while not self.stop_monitoring.is_set():
            stats = self.get_stats()

            logger.info(
                f"Progress: {stats['completed']}/{stats['total']} completed, "
                f"{stats['failed']} failed, {stats['in_progress']} in progress | "
                f"Elapsed: {self._format_time(stats['elapsed_seconds'])} | "
                f"Est. remaining: {self._format_time(stats['estimated_remaining_seconds'])}"
            )

            # Check if we're done
            if stats['completed'] + stats['failed'] >= stats['total']:
                logger.info("All documents processed!")
                break

            # Wait for next update
            self.stop_monitoring.wait(self.update_interval)

    def start_monitoring(self):
        """Start background monitoring thread."""
        self.monitor_thread = threading.Thread(target=self._monitor_progress)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()

    def stop(self):
        """Stop the monitoring thread and report final results."""
        if self.monitor_thread and self.monitor_thread.is_alive():
            self.stop_monitoring.set()
            self.monitor_thread.join(timeout=2.0)

        # Log final statistics
        stats = self.get_stats()
        logger.info(
            f"Final results: {stats['completed']}/{stats['total']} completed, "
            f"{stats['failed']} failed | "
            f"Total time: {self._format_time(stats['elapsed_seconds'])}"
        )

        success_rate = (stats['completed'] / stats['total']) * 100 if stats['total'] > 0 else 0
        logger.info(f"Success rate: {success_rate:.2f}%")

        return stats