Spaces:
Sleeping
Sleeping
| # app.py — Hugging Face Spaces (Gradio) "全部入り" 翻訳アプリ | |
| # - Single translate + history table | |
| # - Batch translate (TXT/CSV) + download result CSV | |
| # - Glossary CSV (src,tgt) | |
| # - Model selector (m2m100 / opus-mt / nllb) | |
| # - Safe limits for free CPU Spaces | |
| import os | |
| import io | |
| import csv | |
| import time | |
| import json | |
| import tempfile | |
| from itertools import islice | |
| from typing import Dict, Optional, List, Tuple, Any | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # ------------------------- | |
| # Model registry | |
| # ------------------------- | |
| # NOTE: | |
| # - "opus-mt" is fast on CPU (recommended for free tier speed) | |
| # - "m2m100-418M" matches your current project | |
| # - "nllb-600M" can be heavier (quality often good, CPU slower) | |
| MODEL_SPECS: Dict[str, Dict[str, Any]] = { | |
| "m2m100-418M (multilingual, your current)": { | |
| "kind": "m2m100", | |
| "name": {"ja-en": "facebook/m2m100_418M", "en-ja": "facebook/m2m100_418M"}, | |
| "lang": {"ja": "ja", "en": "en"}, | |
| "needs_forced_bos": True, | |
| "supports_src_lang": True, | |
| }, | |
| "opus-mt (fast CPU, ja<->en)": { | |
| "kind": "opus", | |
| "name": {"ja-en": "Helsinki-NLP/opus-mt-ja-en", "en-ja": "Helsinki-NLP/opus-mt-en-ja"}, | |
| "lang": {"ja": None, "en": None}, | |
| "needs_forced_bos": False, | |
| "supports_src_lang": False, | |
| }, | |
| "nllb-600M (quality, heavier)": { | |
| "kind": "nllb", | |
| "name": {"ja-en": "facebook/nllb-200-distilled-600M", "en-ja": "facebook/nllb-200-distilled-600M"}, | |
| "lang": {"ja": "jpn_Jpan", "en": "eng_Latn"}, | |
| "needs_forced_bos": True, | |
| "supports_src_lang": True, | |
| }, | |
| } | |
| # Cache: (model_key, direction) -> (tokenizer, model) | |
| TOK_CACHE: Dict[Tuple[str, str], Any] = {} | |
| MDL_CACHE: Dict[Tuple[str, str], Any] = {} | |
| # ------------------------- | |
| # Safety limits (public space) | |
| # ------------------------- | |
| MAX_SINGLE_CHARS = 4000 # single input max chars | |
| MAX_BATCH_LINES = 200 # batch line cap | |
| MAX_BATCH_CHARS_TOTAL = 20000 # batch total chars cap | |
| DEFAULT_MAX_NEW_TOKENS = 256 | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def detect_direction_by_text(text: str, prefer: str = "ja-en") -> str: | |
| """Simple heuristic: Japanese char => ja-en else en-ja.""" | |
| for ch in text: | |
| if ("\u3040" <= ch <= "\u30ff") or ("\u4e00" <= ch <= "\u9fff"): | |
| return "ja-en" | |
| return "en-ja" if prefer == "ja-en" else "ja-en" | |
| def read_glossary_csv(path: Optional[str]) -> Optional[List[List[str]]]: | |
| """Read glossary CSV (src,tgt). UTF-8. No header assumed.""" | |
| if not path: | |
| return None | |
| rows: List[List[str]] = [] | |
| with open(path, "r", encoding="utf-8") as f: | |
| for r in csv.reader(f): | |
| if len(r) >= 2: | |
| src = (r[0] or "").strip() | |
| tgt = (r[1] or "").strip() | |
| if src: | |
| rows.append([src, tgt]) | |
| return rows or None | |
| def apply_glossary(text: str, glossary: Optional[List[List[str]]]) -> str: | |
| if not glossary: | |
| return text | |
| out = text | |
| for src, tgt in glossary: | |
| if src: | |
| out = out.replace(src, tgt) | |
| return out | |
| def gen_kwargs_for_mode(conversation_mode: bool, base_beams: int) -> dict: | |
| """ | |
| Stable defaults for public CPU: | |
| - Normal: deterministic beam search | |
| - Conversation: slightly more colloquial (beam-sampling) but still stable | |
| """ | |
| if conversation_mode: | |
| return dict( | |
| do_sample=True, | |
| temperature=0.75, | |
| top_p=0.85, | |
| top_k=40, | |
| num_beams=max(1, min(2, int(base_beams))), # keep it small for stability | |
| repetition_penalty=1.08, | |
| ) | |
| return dict( | |
| do_sample=False, | |
| num_beams=int(base_beams), | |
| repetition_penalty=1.05, | |
| ) | |
| def _get_forced_bos_id(tokenizer, lang: str) -> Optional[int]: | |
| # M2M100: get_lang_id | |
| if hasattr(tokenizer, "get_lang_id"): | |
| try: | |
| return tokenizer.get_lang_id(lang) | |
| except Exception: | |
| pass | |
| # NLLB: lang_code_to_id | |
| if hasattr(tokenizer, "lang_code_to_id") and isinstance(getattr(tokenizer, "lang_code_to_id"), dict): | |
| if lang in tokenizer.lang_code_to_id: | |
| return tokenizer.lang_code_to_id[lang] | |
| # Fallback: token id | |
| try: | |
| return tokenizer.convert_tokens_to_ids(lang) | |
| except Exception: | |
| return None | |
| def _load_model(model_key: str, direction: str): | |
| """Lazy load + cache.""" | |
| cache_key = (model_key, direction) | |
| if cache_key in TOK_CACHE: | |
| return TOK_CACHE[cache_key], MDL_CACHE[cache_key] | |
| spec = MODEL_SPECS[model_key] | |
| model_name = spec["name"][direction] | |
| tok = AutoTokenizer.from_pretrained(model_name) | |
| dtype = torch.float16 if DEVICE.type == "cuda" else torch.float32 | |
| mdl = AutoModelForSeq2SeqLM.from_pretrained( | |
| model_name, | |
| torch_dtype=dtype, | |
| low_cpu_mem_usage=True, | |
| ) | |
| mdl.to(DEVICE).eval() | |
| TOK_CACHE[cache_key] = tok | |
| MDL_CACHE[cache_key] = mdl | |
| return tok, mdl | |
| def translate_one( | |
| model_key: str, | |
| direction: str, | |
| text: str, | |
| max_new_tokens: int, | |
| num_beams: int, | |
| conversation: bool, | |
| ) -> str: | |
| tok, mdl = _load_model(model_key, direction) | |
| spec = MODEL_SPECS[model_key] | |
| # language tags (if supported) | |
| src_lang = spec["lang"]["ja" if direction == "ja-en" else "en"] | |
| tgt_lang = spec["lang"]["en" if direction == "ja-en" else "ja"] | |
| if spec.get("supports_src_lang") and hasattr(tok, "src_lang") and src_lang: | |
| tok.src_lang = src_lang | |
| inputs = tok(text, return_tensors="pt", truncation=True, max_length=512).to(DEVICE) | |
| gen_opts = gen_kwargs_for_mode(bool(conversation), int(num_beams)) | |
| # forced BOS for multilingual models | |
| forced_id = None | |
| if spec.get("needs_forced_bos") and tgt_lang: | |
| forced_id = _get_forced_bos_id(tok, tgt_lang) | |
| generate_kwargs = dict( | |
| **inputs, | |
| max_new_tokens=int(max_new_tokens), | |
| no_repeat_ngram_size=3, | |
| length_penalty=1.05, | |
| **gen_opts, | |
| ) | |
| if forced_id is not None: | |
| generate_kwargs["forced_bos_token_id"] = forced_id | |
| out_ids = mdl.generate(**generate_kwargs) | |
| return tok.batch_decode(out_ids, skip_special_tokens=True)[0] | |
| def _clamp_int(v: Any, lo: int, hi: int, default: int) -> int: | |
| try: | |
| x = int(v) | |
| return max(lo, min(hi, x)) | |
| except Exception: | |
| return default | |
| def _history_to_table(history: List[Dict[str, str]]) -> List[List[str]]: | |
| # headers: time, direction, src, dst | |
| rows = [] | |
| for item in history[-100:][::-1]: # show latest first, cap 100 rows | |
| rows.append([item["time"], item["direction"], item["src"], item["dst"]]) | |
| return rows | |
| def _export_history(history: List[Dict[str, str]], fmt: str) -> str: | |
| tmpdir = tempfile.mkdtemp(prefix="history_") | |
| if fmt == "csv": | |
| path = os.path.join(tmpdir, "history.csv") | |
| with open(path, "w", newline="", encoding="utf-8-sig") as f: | |
| w = csv.writer(f) | |
| w.writerow(["time", "direction", "src", "dst"]) | |
| for item in history: | |
| w.writerow([item["time"], item["direction"], item["src"], item["dst"]]) | |
| return path | |
| else: | |
| path = os.path.join(tmpdir, "history.txt") | |
| with open(path, "w", encoding="utf-8") as f: | |
| for i, item in enumerate(history, 1): | |
| f.write(f"[{i}] {item['time']} | {item['direction']}\n") | |
| f.write(f"SRC: {item['src']}\n") | |
| f.write(f"DST: {item['dst']}\n") | |
| f.write("\n") | |
| return path | |
| def _read_batch_lines(file_path: str) -> List[str]: | |
| """ | |
| Accept: | |
| - .txt: 1 line = 1 item | |
| - .csv: use first column as src (ignores header if it looks like header) | |
| """ | |
| lower = (file_path or "").lower() | |
| lines: List[str] = [] | |
| if lower.endswith(".csv"): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| r = csv.reader(f) | |
| for row in islice(r, MAX_BATCH_LINES + 5): | |
| if not row: | |
| continue | |
| val = (row[0] or "").strip() | |
| if not val: | |
| continue | |
| # naive header skip | |
| if len(lines) == 0 and val.lower() in ("src", "source", "text", "input"): | |
| continue | |
| lines.append(val) | |
| if len(lines) >= MAX_BATCH_LINES: | |
| break | |
| else: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| for ln in islice(f, MAX_BATCH_LINES): | |
| ln = ln.rstrip("\n").strip() | |
| if ln: | |
| lines.append(ln) | |
| # total chars guard | |
| total_chars = sum(len(x) for x in lines) | |
| if total_chars > MAX_BATCH_CHARS_TOTAL: | |
| # shrink until safe | |
| kept = [] | |
| c = 0 | |
| for s in lines: | |
| if c + len(s) > MAX_BATCH_CHARS_TOTAL: | |
| break | |
| kept.append(s) | |
| c += len(s) | |
| lines = kept | |
| return lines | |
| # ------------------------- | |
| # Gradio handlers | |
| # ------------------------- | |
| def warmup(model_key: str) -> str: | |
| t0 = time.time() | |
| try: | |
| _load_model(model_key, "ja-en") | |
| used = time.time() - t0 | |
| return f"✅ Warmup OK ({used:.2f}s) — model: {model_key}" | |
| except Exception as e: | |
| return f"❌ Warmup failed: {e}" | |
| def do_translate( | |
| text: str, | |
| model_key: str, | |
| dir_choice: str, | |
| auto_on: bool, | |
| conversation_on: bool, | |
| glossary_path: Optional[str], | |
| max_new_tokens: int, | |
| num_beams: int, | |
| history: List[Dict[str, str]], | |
| ): | |
| text = (text or "").strip() | |
| if not text: | |
| return "", "⚠️ テキストを入力してください。", history, _history_to_table(history), gr.update(visible=False), gr.update(visible=False) | |
| if len(text) > MAX_SINGLE_CHARS: | |
| return "", f"⚠️ 入力が長すぎます(最大 {MAX_SINGLE_CHARS} 文字)。", history, _history_to_table(history), gr.update(visible=False), gr.update(visible=False) | |
| direction = detect_direction_by_text(text, prefer=dir_choice) if auto_on else dir_choice | |
| glossary = read_glossary_csv(glossary_path) | |
| src_processed = apply_glossary(text, glossary) | |
| max_new_tokens = _clamp_int(max_new_tokens, 16, 512, DEFAULT_MAX_NEW_TOKENS) | |
| num_beams = _clamp_int(num_beams, 1, 6, 4) | |
| t0 = time.time() | |
| try: | |
| out = translate_one( | |
| model_key=model_key, | |
| direction=direction, | |
| text=src_processed, | |
| max_new_tokens=max_new_tokens, | |
| num_beams=num_beams, | |
| conversation=bool(conversation_on), | |
| ) | |
| used = time.time() - t0 | |
| item = { | |
| "time": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "direction": direction, | |
| "src": text, | |
| "dst": out, | |
| } | |
| history = (history or []) + [item] | |
| table = _history_to_table(history) | |
| info = f"✅ 完了:{used:.2f}s|model: **{model_key}**|方向:**{direction}**|chars: {len(text)}" | |
| # show export buttons when history exists | |
| return out, info, history, table, gr.update(visible=True), gr.update(visible=True) | |
| except Exception as e: | |
| info = f"❌ 翻訳に失敗しました: {e}" | |
| return "", info, history, _history_to_table(history), gr.update(visible=bool(history)), gr.update(visible=bool(history)) | |
| def clear_all(history): | |
| history = [] | |
| return ( | |
| "", "🧹 クリアしました。", | |
| history, [], | |
| gr.update(visible=False), # dl_hist_csv | |
| gr.update(visible=False), # dl_hist_txt | |
| gr.update(visible=False, value=None), # dl_batch_csv | |
| gr.update(visible=False, value=None), # dl_batch_txt | |
| "", # batch_status | |
| "", # batch_preview | |
| ) | |
| def export_history_csv(history: List[Dict[str, str]]): | |
| if not history: | |
| return None | |
| return _export_history(history, "csv") | |
| def export_history_txt(history: List[Dict[str, str]]): | |
| if not history: | |
| return None | |
| return _export_history(history, "txt") | |
| def do_batch( | |
| batch_file_path: Optional[str], | |
| model_key: str, | |
| conversation_on: bool, | |
| glossary_path: Optional[str], | |
| max_new_tokens: int, | |
| num_beams: int, | |
| ): | |
| if not batch_file_path: | |
| yield "⚠️ バッチファイル(TXT/CSV)を選択してください。", "", gr.update(visible=False), None | |
| return | |
| lines = _read_batch_lines(batch_file_path) | |
| total = len(lines) | |
| if total == 0: | |
| yield "⚠️ 読み取れる行がありません(空/制限超過の可能性)。", "", gr.update(visible=False), None | |
| return | |
| glossary = read_glossary_csv(glossary_path) | |
| max_new_tokens = _clamp_int(max_new_tokens, 16, 512, DEFAULT_MAX_NEW_TOKENS) | |
| num_beams = _clamp_int(num_beams, 1, 6, 4) | |
| t0 = time.time() | |
| rows: List[Tuple[str, str, str]] = [] # (direction, src, dst) | |
| yield "⏳ バッチ翻訳中… 0/..", "", gr.update(visible=False, value=None), gr.update(visible=False, value=None) | |
| for i, src in enumerate(lines, 1): | |
| direction = detect_direction_by_text(src, prefer="ja-en") | |
| src_processed = apply_glossary(src, glossary) | |
| try: | |
| dst = translate_one( | |
| model_key=model_key, | |
| direction=direction, | |
| text=src_processed, | |
| max_new_tokens=max_new_tokens, | |
| num_beams=num_beams, | |
| conversation=bool(conversation_on), | |
| ) | |
| except Exception as e: | |
| dst = f"[ERROR] {e}" | |
| rows.append((direction, src, dst)) | |
| if i == 1 or i % 5 == 0 or i == total: | |
| pct = int(i * 100 / total) | |
| yield f"⏳ バッチ翻訳中… {i}/{total} ({pct}%)", "", gr.update(visible=False), None | |
| # Preview (limit) | |
| preview_lines = [] | |
| for idx, (direction, s, d) in enumerate(rows[:50], 1): | |
| preview_lines.append(f"**{idx}. ({direction})**\n- SRC: {s}\n- DST: {d}\n") | |
| preview = "\n".join(preview_lines) | |
| if total > 50: | |
| preview += f"\n…(プレビューは先頭50行まで。全{total}行はCSVでダウンロード)" | |
| # Write result CSV | |
| tmpdir = tempfile.mkdtemp(prefix="batch_") | |
| out_csv = os.path.join(tmpdir, "batch_result.csv") | |
| out_txt = os.path.join(tmpdir, "batch_result.txt") | |
| with open(out_csv, "w", newline="", encoding="utf-8-sig") as f: | |
| w = csv.writer(f) | |
| w.writerow(["direction", "src", "dst"]) | |
| for direction, s, d in rows: | |
| w.writerow([direction, s, d]) | |
| with open(out_txt, "w", encoding="utf-8") as f: | |
| for i, (direction, s, d) in enumerate(rows, 1): | |
| f.write(f"[{i}] ({direction})\n") | |
| f.write(f"SRC: {s}\n") | |
| f.write(f"DST: {d}\n\n") | |
| used = time.time() - t0 | |
| done_msg = f"✅ バッチ完了:{used:.2f}s|行数:{total}" | |
| yield ( | |
| done_msg, | |
| preview, | |
| gr.update(visible=True, value=out_csv), | |
| gr.update(visible=True, value=out_txt), | |
| ) | |
| # ------------------------- | |
| # UI | |
| # ------------------------- | |
| CUSTOM_CSS = """ | |
| .gradio-container { max-width: 1100px !important; } | |
| .header-title { font-size: 34px; font-weight: 900; letter-spacing: .4px; margin: 6px 0 4px; } | |
| .subtle { opacity: 0.9; } | |
| .badge { display: inline-block; padding: 2px 10px; border-radius: 999px; border: 1px solid rgba(120,120,120,.35); font-size: 12px; } | |
| """ | |
| with gr.Blocks(title="Linguo Core — Translation Space") as demo: | |
| gr.HTML("<div class='header-title'>Linguo Core — Translation</div>") | |
| gr.HTML(""" | |
| <script> | |
| async function copyTextToClipboard(text){ | |
| try{ | |
| await navigator.clipboard.writeText(text || ""); | |
| return "✅ Copied!"; | |
| }catch(e){ | |
| // fallback: older browsers | |
| const ta = document.createElement("textarea"); | |
| ta.value = text || ""; | |
| document.body.appendChild(ta); | |
| ta.select(); | |
| document.execCommand("copy"); | |
| document.body.removeChild(ta); | |
| return "✅ Copied!"; | |
| } | |
| } | |
| </script> | |
| """) | |
| gr.Markdown( | |
| "<span class='badge'>HF Spaces</span> <span class='badge'>Public-safe</span> " | |
| "<span class='badge'>Glossary CSV</span> <span class='badge'>History</span> <span class='badge'>Batch</span>", | |
| elem_classes=["subtle"], | |
| ) | |
| history_state = gr.State([]) # List[Dict] | |
| with gr.Row(): | |
| model_key = gr.Dropdown( | |
| choices=list(MODEL_SPECS.keys()), | |
| value="m2m100-418M (multilingual, your current)", | |
| label="Model(無料CPUなら opus-mt が速い)", | |
| ) | |
| warm = gr.Button("Warmup(初回ロード)") | |
| warm_info = gr.Markdown("") | |
| with gr.Row(): | |
| direction = gr.Radio(["ja-en", "en-ja"], value="ja-en", label="Direction") | |
| auto = gr.Checkbox(value=True, label="Auto detect (日本語が含まれたら ja-en)") | |
| conversation = gr.Checkbox(value=False, label="Conversation mode(口語寄せ)") | |
| info = gr.Markdown("翻訳待機中…") | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1): | |
| src = gr.Textbox(lines=10, label="Input", placeholder="翻訳したい文章を入力…") | |
| with gr.Row(): | |
| btn = gr.Button("Translate", variant="primary") | |
| btn_clear = gr.Button("Clear") | |
| with gr.Column(scale=1): | |
| dst = gr.Textbox(lines=10, label="Output") | |
| copy_btn = gr.Button("Copy Output") | |
| copy_status = gr.Markdown("") | |
| with gr.Accordion("Glossary / Advanced / History / Batch", open=False): | |
| file_gloss = gr.File(label="Glossary CSV(src,tgt)", file_count="single", type="filepath") | |
| with gr.Row(): | |
| max_len = gr.Slider(16, 512, DEFAULT_MAX_NEW_TOKENS, step=16, label="max_new_tokens") | |
| beams = gr.Slider(1, 6, 4, step=1, label="num_beams(通常モード向け)") | |
| gr.Markdown("### History(直近100件表示 / エクスポート可)") | |
| history_table = gr.Dataframe( | |
| headers=["time", "direction", "src", "dst"], | |
| datatype=["str", "str", "str", "str"], | |
| row_count=0, | |
| column_count=(4, "fixed"), | |
| wrap=True, | |
| interactive=False, | |
| value=[], | |
| label="History", | |
| ) | |
| with gr.Row(): | |
| btn_clear_history = gr.Button("Clear history") | |
| dl_hist_csv = gr.DownloadButton("Download history CSV", visible=False) | |
| dl_hist_txt = gr.DownloadButton("Download history TXT", visible=False) | |
| gr.Markdown("### Batch(TXT/CSV:1行=1件 / 公開Space保護で最大200行)") | |
| batch_file = gr.File(label="Batch file (TXT/CSV UTF-8)", file_count="single", type="filepath") | |
| btn_batch = gr.Button("Run batch translate") | |
| batch_status = gr.Markdown("") | |
| batch_preview = gr.Markdown("") | |
| dl_batch_csv = gr.DownloadButton("Download batch_result.csv", visible=False) | |
| dl_batch_txt = gr.DownloadButton("Download batch_result.txt", visible=False) | |
| # Events | |
| warm.click(warmup, inputs=[model_key], outputs=[warm_info], queue=True) | |
| btn.click( | |
| do_translate, | |
| inputs=[src, model_key, direction, auto, conversation, file_gloss, max_len, beams, history_state], | |
| outputs=[dst, info, history_state, history_table, dl_hist_csv, dl_hist_txt], | |
| queue=True, | |
| ) | |
| def _noop(x): | |
| return x | |
| copy_btn.click( | |
| fn=_noop, | |
| inputs=[dst], | |
| outputs=[], | |
| js="(text) => copyTextToClipboard(text)" | |
| ).then( | |
| fn=lambda: "✅ Copied to clipboard.", | |
| inputs=None, | |
| outputs=copy_status | |
| ) | |
| src.submit( | |
| do_translate, | |
| inputs=[src, model_key, direction, auto, conversation, file_gloss, max_len, beams, history_state], | |
| outputs=[dst, info, history_state, history_table, dl_hist_csv, dl_hist_txt], | |
| queue=True, | |
| ) | |
| btn_clear.click( | |
| lambda h: ("", "🧹 入力をクリアしました。", h, _history_to_table(h), gr.update(visible=bool(h)), gr.update(visible=bool(h))), | |
| inputs=[history_state], | |
| outputs=[src, info, history_state, history_table, dl_hist_csv, dl_hist_txt], | |
| queue=False, | |
| ) | |
| btn_clear_history.click( | |
| clear_all, | |
| inputs=[history_state], | |
| outputs=[src, info, history_state, history_table, dl_hist_csv, dl_hist_txt, dl_batch_csv, dl_batch_txt, batch_status, batch_preview], | |
| queue=False, | |
| ) | |
| dl_hist_csv.click(export_history_csv, inputs=[history_state], outputs=[dl_hist_csv], queue=False) | |
| dl_hist_txt.click(export_history_txt, inputs=[history_state], outputs=[dl_hist_txt], queue=False) | |
| btn_batch.click( | |
| do_batch, | |
| inputs=[batch_file, model_key, conversation, file_gloss, max_len, beams], | |
| outputs=[batch_status, batch_preview, dl_batch_csv, dl_batch_txt], | |
| queue=True, | |
| ) | |
| demo.queue(max_size=16, default_concurrency_limit=1).launch() | |