File size: 11,299 Bytes
754d92a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import asyncio
import logging
import threading
import time
from asyncio import CancelledError
from collections.abc import Callable
from typing import Optional

logger = logging.getLogger(__name__)


class ProgressMonitor:
    def __init__(
        self,
        stages: list[tuple[str, float]],
        progress_change_callback: Callable | None = None,
        finish_callback: Callable | None = None,
        report_interval: float = 0.1,
        finish_event: asyncio.Event | None = None,
        cancel_event: threading.Event | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
        parent_monitor: Optional["ProgressMonitor"] = None,
        part_index: int | None = 0,
        total_parts: int | None = 1,
    ):
        self.lock = threading.Lock()
        self.parent_monitor = parent_monitor
        self.part_index = part_index
        self.total_parts = total_parts
        self.raw_stages = stages
        self.part_results = {}

        # Convert stages list to dict with name and weight
        self.stage = {}
        total_weight = sum(weight for _, weight in stages)
        for name, weight in stages:
            normalized_weight = weight / total_weight
            self.stage[name] = TranslationStage(
                name,
                0,
                self,
                normalized_weight,
                self.lock,
            )

        self.progress_change_callback = progress_change_callback
        self.finish_callback = finish_callback
        self.report_interval = report_interval
        logger.debug(f"report_interval: {self.report_interval}")
        self.last_report_time = 0
        self.finish_stage_count = 0
        self.finish_event = finish_event
        self.cancel_event = cancel_event
        self.loop = loop
        self.disable = False
        if finish_event and not loop:
            raise ValueError("finish_event requires a loop")
        if self.progress_change_callback:
            self.progress_change_callback(
                type="stage_summary",
                stages=[
                    {
                        "name": name,
                        "percent": self.stage[name].weight,
                    }
                    for name, _ in stages
                ],
                part_index=self.part_index,
                total_parts=self.total_parts,
            )

    def create_part_monitor(
        self, part_index: int, total_parts: int
    ) -> "ProgressMonitor":
        """Create a new progress monitor for a document part"""
        return ProgressMonitor(
            stages=self.raw_stages,
            progress_change_callback=self._handle_part_progress,
            finish_callback=self._handle_part_finish,
            report_interval=self.report_interval,
            cancel_event=self.cancel_event,
            loop=self.loop,
            parent_monitor=self,
            part_index=part_index,
            total_parts=total_parts,
        )

    def _handle_part_progress(self, **kwargs):
        """Handle progress updates from part monitors"""
        if self.progress_change_callback and not self.disable:
            # Add part information to progress update
            kwargs["part_index"] = kwargs.get("part_index")
            kwargs["total_parts"] = kwargs.get("total_parts")
            self.progress_change_callback(**kwargs)

    def _handle_part_finish(self, **kwargs):
        """Handle completion of a part translation"""
        if kwargs["type"] == "error":
            logger.info(f"progress_monitor handle_part_finish: {kwargs['error']}")
            self.finish_callback(type="error", error=kwargs["error"])
            return
        if "translate_result" in kwargs:
            part_index = kwargs.get("part_index")
            if part_index is not None:
                self.part_results[part_index] = kwargs["translate_result"]

        # if self.finish_callback and not self.disable:
        #     self.finish_callback(**kwargs)

    def stage_start(self, stage_name: str, total: int):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return DummyTranslationStage(stage_name, total, self, 0)
        stage = self.stage[stage_name]
        stage.run_time += 1
        stage.name = stage_name
        stage.display_name = f"{stage_name}" if stage.run_time > 1 else stage_name
        stage.current = 0
        stage.total = total
        if self.progress_change_callback:
            self.progress_change_callback(
                type="progress_start",
                stage=stage.display_name,
                stage_progress=0.0,
                stage_current=0,
                stage_total=total,
                overall_progress=self.calculate_current_progress(),
                part_index=self.part_index + 1,
                total_parts=self.total_parts,
            )
        self.last_report_time = 0.0
        return stage

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        logger.debug("ProgressMonitor __exit__")

    def on_finish(self):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        if self.cancel_event:
            self.cancel_event.set()
        if self.finish_event and self.loop:
            self.loop.call_soon_threadsafe(self.finish_event.set)
        if self.cancel_event and self.cancel_event.is_set():
            self.finish_callback(type="error", error=CancelledError)

    def stage_done(self, stage):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        self.last_report_time = 0.0
        self.finish_stage_count += 1
        if (
            stage.current != stage.total
            and self.cancel_event is not None
            and not self.cancel_event.is_set()
        ):
            logger.warning(
                f"Stage {stage.name} completed with {stage.current}/{stage.total} items",
            )
            return
        if self.progress_change_callback:
            self.progress_change_callback(
                type="progress_end",
                stage=stage.display_name,
                stage_progress=100.0,
                stage_current=stage.total,
                stage_total=stage.total,
                overall_progress=self.calculate_current_progress(),
                part_index=self.part_index + 1,
                total_parts=self.total_parts,
            )

    def calculate_current_progress(self, stage=None):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return 100
        part_weight = 1 / self.total_parts
        if self.parent_monitor:
            part_offset = self.part_index * part_weight
        else:
            part_offset = len(self.part_results) * part_weight
        part_offset *= 100
        progress = self._calculate_current_progress(stage) * part_weight + part_offset
        return progress

    def _calculate_current_progress(self, stage=None):
        """Calculate overall progress including part progress"""
        # Count completed stages
        completed_stages = sum(
            1 for s in self.stage.values() if s.run_time > 0 and s.current == s.total
        )

        # If all stages are complete, return exactly 100
        if completed_stages == len(self.stage):
            return 100

        # Calculate progress based on weights
        progress = sum(
            s.weight * 100
            for s in self.stage.values()
            if s.run_time > 0 and s.current == s.total
        )
        if stage is not None and 0 < stage.total != stage.current:
            progress += stage.weight * stage.current * 100 / stage.total

        # If this is a part monitor (has parent_monitor), return the progress as is
        if hasattr(self, "parent_monitor") and self.parent_monitor:
            return progress

        # Otherwise return the standard progress
        return progress

    def stage_update(self, stage, n: int):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        report_time_delta = time.time() - self.last_report_time
        if report_time_delta < self.report_interval and stage.total > 3:
            return
        if self.progress_change_callback:
            if stage.total != 0:
                stage_progress = stage.current * 100 / stage.total
            else:
                stage_progress = 100
            self.progress_change_callback(
                type="progress_update",
                stage=stage.display_name,
                stage_progress=stage_progress,
                stage_current=stage.current,
                stage_total=stage.total,
                overall_progress=self.calculate_current_progress(stage),
                part_index=self.part_index + 1,
                total_parts=self.total_parts,
            )
            self.last_report_time = time.time()

    def translate_done(self, translate_result):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        if self.finish_callback:
            self.finish_callback(type="finish", translate_result=translate_result)

    def translate_error(self, error):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        if self.finish_callback:
            logger.info(f"progress_monitor handle translate_error: {error}")
            self.finish_callback(type="error", error=error)

    def raise_if_cancelled(self):
        if self.cancel_event and self.cancel_event.is_set():
            raise asyncio.CancelledError

    def cancel(self):
        if self.disable or self.parent_monitor and self.parent_monitor.disable:
            return
        if self.cancel_event:
            logger.info("Translation canceled")
            self.cancel_event.set()


class TranslationStage:
    def __init__(
        self,
        name: str,
        total: int,
        pm: ProgressMonitor,
        weight: float,
        lock: threading.Lock,
    ):
        self.name = name
        self.display_name = name
        self.current = 0
        self.total = total
        self.pm = pm
        self.run_time = 0
        self.weight = weight
        self.lock = lock

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        with self.lock:
            diff = self.total - self.current
            if diff > 0:
                logger.info(
                    f"Stage {self.name} completed with {self.current}/{self.total} items"
                )
            self.pm.stage_update(self, diff)
            self.current = self.total
            self.pm.stage_done(self)

    def advance(self, n: int = 1):
        with self.lock:
            self.current += n
            self.pm.stage_update(self, n)


class DummyTranslationStage:
    def __init__(self, name: str, total: int, pm: ProgressMonitor, weight: float):
        self.name = name
        self.display_name = name
        self.current = 0
        self.total = total
        self.pm = pm

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def advance(self, n: int = 1):
        pass