fromozu commited on
Commit
7c5eff0
·
verified ·
1 Parent(s): 1479093

Upload hf_backend/translator_runner.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. hf_backend/translator_runner.py +303 -0
hf_backend/translator_runner.py ADDED
@@ -0,0 +1,303 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ import re
5
+ from queue import Empty, Queue
6
+ import shutil
7
+ import subprocess
8
+ import sys
9
+ from pathlib import Path
10
+ from threading import Thread
11
+
12
+ from hf_backend.config import AppConfig
13
+ from hf_backend.ebook_convert import normalize_epub_for_translation
14
+
15
+
16
+ SUPPORTED_EXTENSIONS = {".epub", ".txt", ".srt"}
17
+ TQDM_PROGRESS_RE = re.compile(
18
+ r"(?P<percent>\d{1,3})%\|.*?\|\s*(?P<current>\d+)/(?P<total>\d+)\s*\[",
19
+ )
20
+ BACKMATTER_MARKER_RE = re.compile(r"BBM_BACKMATTER_SKIPPED::(?P<title>.+)")
21
+ CHECKPOINT_MARKER_RE = re.compile(r"BBM_CHECKPOINT_SAVED::(?P<count>\d+)")
22
+
23
+
24
+ def _key_args(config: AppConfig, model: str) -> list[str]:
25
+ if model == "claude" and config.claude_key:
26
+ return ["--claude_key", config.claude_key]
27
+ if model == "deepl" and config.deepl_key:
28
+ return ["--deepl_key", config.deepl_key]
29
+ if model == "caiyun" and config.caiyun_key:
30
+ return ["--caiyun_key", config.caiyun_key]
31
+ if model == "gemini" and config.gemini_key:
32
+ return ["--gemini_key", config.gemini_key]
33
+ if model == "customapi" and config.custom_api:
34
+ return ["--custom_api", config.custom_api]
35
+ if model in {"chatgptapi", "gpt4"} and config.openai_key:
36
+ return ["--openai_key", config.openai_key]
37
+ return []
38
+
39
+
40
+ def extract_progress_update(line: str) -> dict[str, int] | None:
41
+ cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", line).strip()
42
+ matched = TQDM_PROGRESS_RE.search(cleaned)
43
+ if not matched:
44
+ return None
45
+
46
+ percent = int(matched.group("percent"))
47
+ current = int(matched.group("current"))
48
+ total = int(matched.group("total"))
49
+ return {
50
+ "percent": percent,
51
+ "current": current,
52
+ "total": total,
53
+ }
54
+
55
+
56
+ def extract_backmatter_event(line: str) -> dict[str, str | bool] | None:
57
+ cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", line).strip()
58
+ matched = BACKMATTER_MARKER_RE.search(cleaned)
59
+ if not matched:
60
+ return None
61
+
62
+ title = matched.group("title").strip()
63
+ return {
64
+ "stage": "translating",
65
+ "message": f"检测到尾部附录章节,后续内容保留原文:{title}",
66
+ "backmatter_skipped": True,
67
+ "backmatter_title": title,
68
+ }
69
+
70
+
71
+ def extract_checkpoint_event(line: str) -> dict[str, str | bool | int] | None:
72
+ cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", line).strip()
73
+ matched = CHECKPOINT_MARKER_RE.search(cleaned)
74
+ if not matched:
75
+ return None
76
+
77
+ current = int(matched.group("count"))
78
+ return {
79
+ "stage": "translating",
80
+ "message": f"已保存断点:{current}",
81
+ "checkpoint_saved": True,
82
+ "checkpoint_progress_current": current,
83
+ }
84
+
85
+
86
+ def _read_stream_chunks(stream, stream_name: str, queue: Queue) -> None:
87
+ if stream is None:
88
+ return
89
+
90
+ if hasattr(stream, "read"):
91
+ buffer = ""
92
+ while True:
93
+ chunk = stream.read(1)
94
+ if not chunk:
95
+ if buffer:
96
+ queue.put((stream_name, buffer))
97
+ return
98
+ buffer += chunk
99
+ if chunk in {"\n", "\r"}:
100
+ queue.put((stream_name, buffer))
101
+ buffer = ""
102
+ else:
103
+ while True:
104
+ chunk = stream.readline()
105
+ if not chunk:
106
+ return
107
+ queue.put((stream_name, chunk))
108
+
109
+
110
+ def _run_streaming_translation(
111
+ command: list[str],
112
+ *,
113
+ config: AppConfig,
114
+ progress_callback=None,
115
+ ) -> str:
116
+ env = os.environ.copy()
117
+ env["PYTHONIOENCODING"] = "utf-8"
118
+ env["PYTHONUNBUFFERED"] = "1"
119
+ process = subprocess.Popen(
120
+ command,
121
+ cwd=config.repo_root,
122
+ stdout=subprocess.PIPE,
123
+ stderr=subprocess.PIPE,
124
+ text=True,
125
+ encoding="utf-8",
126
+ errors="replace",
127
+ bufsize=1,
128
+ env=env,
129
+ )
130
+
131
+ output_queue: Queue = Queue()
132
+ readers = [
133
+ Thread(
134
+ target=_read_stream_chunks,
135
+ args=(process.stdout, "stdout", output_queue),
136
+ daemon=True,
137
+ ),
138
+ Thread(
139
+ target=_read_stream_chunks,
140
+ args=(process.stderr, "stderr", output_queue),
141
+ daemon=True,
142
+ ),
143
+ ]
144
+ for reader in readers:
145
+ reader.start()
146
+
147
+ combined_parts: list[str] = []
148
+ last_progress_signature: tuple[int, int, int] | None = None
149
+
150
+ while True:
151
+ if process.poll() is not None and all(not reader.is_alive() for reader in readers) and output_queue.empty():
152
+ break
153
+ try:
154
+ _stream_name, chunk = output_queue.get(timeout=0.1)
155
+ except Empty:
156
+ continue
157
+
158
+ text = chunk.strip()
159
+ if text:
160
+ combined_parts.append(text)
161
+ backmatter_event = extract_backmatter_event(text)
162
+ if backmatter_event and progress_callback:
163
+ progress_callback(backmatter_event)
164
+ checkpoint_event = extract_checkpoint_event(text)
165
+ if checkpoint_event and progress_callback:
166
+ progress_callback(checkpoint_event)
167
+ progress = extract_progress_update(text)
168
+ if progress and progress_callback:
169
+ signature = (
170
+ progress["percent"],
171
+ progress["current"],
172
+ progress["total"],
173
+ )
174
+ if signature != last_progress_signature:
175
+ progress_callback(
176
+ {
177
+ "stage": "translating",
178
+ "message": f"正在翻译:{progress['current']} / {progress['total']}",
179
+ **progress,
180
+ }
181
+ )
182
+ last_progress_signature = signature
183
+
184
+ for reader in readers:
185
+ reader.join(timeout=1)
186
+
187
+ if process.wait() != 0:
188
+ combined_output = "\n".join(combined_parts)
189
+ raise RuntimeError(
190
+ f"translation command failed with code {process.returncode}\n{combined_output[-3000:]}"
191
+ )
192
+
193
+ return "\n".join(combined_parts)
194
+
195
+
196
+ def run_translation_job(
197
+ config: AppConfig,
198
+ job: dict,
199
+ source_path: Path,
200
+ *,
201
+ progress_callback=None,
202
+ checkpoint_source_path: Path | None = None,
203
+ ) -> tuple[Path, str]:
204
+ suffix = source_path.suffix.lower()
205
+ if suffix not in SUPPORTED_EXTENSIONS:
206
+ raise RuntimeError(f"unsupported file type: {suffix}")
207
+
208
+ work_dir = config.work_dir / job["job_id"]
209
+ work_dir.mkdir(parents=True, exist_ok=True)
210
+
211
+ local_source = work_dir / job["filename"]
212
+ shutil.copy2(source_path, local_source)
213
+
214
+ translation_source = local_source
215
+ if suffix == ".epub":
216
+ translation_source = normalize_epub_for_translation(config, local_source, work_dir)
217
+ if checkpoint_source_path is not None:
218
+ seeded_checkpoint = translation_source.parent / f".{translation_source.stem}.temp.bin"
219
+ shutil.copy2(checkpoint_source_path, seeded_checkpoint)
220
+
221
+ model = config.resolve_model(job.get("model"))
222
+ language = config.resolve_language(job.get("language"))
223
+ command = [
224
+ sys.executable,
225
+ str(config.make_book_script),
226
+ "--book_name",
227
+ str(translation_source),
228
+ "--model",
229
+ model,
230
+ "--language",
231
+ language,
232
+ ]
233
+
234
+ command.extend(_key_args(config, model))
235
+ if config.api_base:
236
+ command.extend(["--api_base", config.api_base])
237
+ if suffix == ".epub" and getattr(config, "epub_accumulated_num", 1) > 1:
238
+ command.extend(["--accumulated_num", str(config.epub_accumulated_num)])
239
+ if suffix == ".epub":
240
+ command.extend(
241
+ [
242
+ "--checkpoint_interval",
243
+ str(getattr(config, "epub_checkpoint_interval", 50)),
244
+ ]
245
+ )
246
+ if checkpoint_source_path is not None:
247
+ command.append("--resume")
248
+ if getattr(config, "epub_backmatter_skip_after_percent", 0) > 0:
249
+ command.extend(
250
+ [
251
+ "--skip_backmatter_after_percent",
252
+ str(config.epub_backmatter_skip_after_percent),
253
+ ]
254
+ )
255
+ if getattr(config, "epub_backmatter_titles", "").strip():
256
+ command.extend(
257
+ [
258
+ "--skip_backmatter_titles",
259
+ config.epub_backmatter_titles,
260
+ ]
261
+ )
262
+ if config.batch_size > 0 and suffix == ".txt":
263
+ command.extend(["--batch_size", str(config.batch_size)])
264
+
265
+ if progress_callback:
266
+ combined_output = _run_streaming_translation(
267
+ command,
268
+ config=config,
269
+ progress_callback=progress_callback,
270
+ )
271
+ else:
272
+ completed = subprocess.run(
273
+ command,
274
+ cwd=config.repo_root,
275
+ check=False,
276
+ capture_output=True,
277
+ text=True,
278
+ encoding="utf-8",
279
+ errors="replace",
280
+ )
281
+ combined_output = "\n".join(
282
+ part for part in [completed.stdout.strip(), completed.stderr.strip()] if part
283
+ )
284
+
285
+ if completed.returncode != 0:
286
+ raise RuntimeError(
287
+ f"translation command failed with code {completed.returncode}\n{combined_output[-3000:]}"
288
+ )
289
+
290
+ result_path = translation_source.with_name(
291
+ f"{translation_source.stem}_bilingual{translation_source.suffix}"
292
+ )
293
+ if not result_path.exists():
294
+ raise RuntimeError(f"expected translated file not found: {result_path.name}")
295
+
296
+ canonical_result_path = local_source.with_name(
297
+ f"{local_source.stem}_bilingual{local_source.suffix}"
298
+ )
299
+ if canonical_result_path != result_path:
300
+ shutil.copy2(result_path, canonical_result_path)
301
+ result_path = canonical_result_path
302
+
303
+ return result_path, combined_output