LingouCore / app.py
5AILingouCore's picture
Update app.py
231ad90 verified
# app.py — Hugging Face Spaces (Gradio) "全部入り" 翻訳アプリ
# - Single translate + history table
# - Batch translate (TXT/CSV) + download result CSV
# - Glossary CSV (src,tgt)
# - Model selector (m2m100 / opus-mt / nllb)
# - Safe limits for free CPU Spaces
import os
import io
import csv
import time
import json
import tempfile
from itertools import islice
from typing import Dict, Optional, List, Tuple, Any
import gradio as gr
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# -------------------------
# Model registry
# -------------------------
# NOTE:
# - "opus-mt" is fast on CPU (recommended for free tier speed)
# - "m2m100-418M" matches your current project
# - "nllb-600M" can be heavier (quality often good, CPU slower)
MODEL_SPECS: Dict[str, Dict[str, Any]] = {
"m2m100-418M (multilingual, your current)": {
"kind": "m2m100",
"name": {"ja-en": "facebook/m2m100_418M", "en-ja": "facebook/m2m100_418M"},
"lang": {"ja": "ja", "en": "en"},
"needs_forced_bos": True,
"supports_src_lang": True,
},
"opus-mt (fast CPU, ja<->en)": {
"kind": "opus",
"name": {"ja-en": "Helsinki-NLP/opus-mt-ja-en", "en-ja": "Helsinki-NLP/opus-mt-en-ja"},
"lang": {"ja": None, "en": None},
"needs_forced_bos": False,
"supports_src_lang": False,
},
"nllb-600M (quality, heavier)": {
"kind": "nllb",
"name": {"ja-en": "facebook/nllb-200-distilled-600M", "en-ja": "facebook/nllb-200-distilled-600M"},
"lang": {"ja": "jpn_Jpan", "en": "eng_Latn"},
"needs_forced_bos": True,
"supports_src_lang": True,
},
}
# Cache: (model_key, direction) -> (tokenizer, model)
TOK_CACHE: Dict[Tuple[str, str], Any] = {}
MDL_CACHE: Dict[Tuple[str, str], Any] = {}
# -------------------------
# Safety limits (public space)
# -------------------------
MAX_SINGLE_CHARS = 4000 # single input max chars
MAX_BATCH_LINES = 200 # batch line cap
MAX_BATCH_CHARS_TOTAL = 20000 # batch total chars cap
DEFAULT_MAX_NEW_TOKENS = 256
# -------------------------
# Helpers
# -------------------------
def detect_direction_by_text(text: str, prefer: str = "ja-en") -> str:
"""Simple heuristic: Japanese char => ja-en else en-ja."""
for ch in text:
if ("\u3040" <= ch <= "\u30ff") or ("\u4e00" <= ch <= "\u9fff"):
return "ja-en"
return "en-ja" if prefer == "ja-en" else "ja-en"
def read_glossary_csv(path: Optional[str]) -> Optional[List[List[str]]]:
"""Read glossary CSV (src,tgt). UTF-8. No header assumed."""
if not path:
return None
rows: List[List[str]] = []
with open(path, "r", encoding="utf-8") as f:
for r in csv.reader(f):
if len(r) >= 2:
src = (r[0] or "").strip()
tgt = (r[1] or "").strip()
if src:
rows.append([src, tgt])
return rows or None
def apply_glossary(text: str, glossary: Optional[List[List[str]]]) -> str:
if not glossary:
return text
out = text
for src, tgt in glossary:
if src:
out = out.replace(src, tgt)
return out
def gen_kwargs_for_mode(conversation_mode: bool, base_beams: int) -> dict:
"""
Stable defaults for public CPU:
- Normal: deterministic beam search
- Conversation: slightly more colloquial (beam-sampling) but still stable
"""
if conversation_mode:
return dict(
do_sample=True,
temperature=0.75,
top_p=0.85,
top_k=40,
num_beams=max(1, min(2, int(base_beams))), # keep it small for stability
repetition_penalty=1.08,
)
return dict(
do_sample=False,
num_beams=int(base_beams),
repetition_penalty=1.05,
)
def _get_forced_bos_id(tokenizer, lang: str) -> Optional[int]:
# M2M100: get_lang_id
if hasattr(tokenizer, "get_lang_id"):
try:
return tokenizer.get_lang_id(lang)
except Exception:
pass
# NLLB: lang_code_to_id
if hasattr(tokenizer, "lang_code_to_id") and isinstance(getattr(tokenizer, "lang_code_to_id"), dict):
if lang in tokenizer.lang_code_to_id:
return tokenizer.lang_code_to_id[lang]
# Fallback: token id
try:
return tokenizer.convert_tokens_to_ids(lang)
except Exception:
return None
def _load_model(model_key: str, direction: str):
"""Lazy load + cache."""
cache_key = (model_key, direction)
if cache_key in TOK_CACHE:
return TOK_CACHE[cache_key], MDL_CACHE[cache_key]
spec = MODEL_SPECS[model_key]
model_name = spec["name"][direction]
tok = AutoTokenizer.from_pretrained(model_name)
dtype = torch.float16 if DEVICE.type == "cuda" else torch.float32
mdl = AutoModelForSeq2SeqLM.from_pretrained(
model_name,
torch_dtype=dtype,
low_cpu_mem_usage=True,
)
mdl.to(DEVICE).eval()
TOK_CACHE[cache_key] = tok
MDL_CACHE[cache_key] = mdl
return tok, mdl
@torch.inference_mode()
def translate_one(
model_key: str,
direction: str,
text: str,
max_new_tokens: int,
num_beams: int,
conversation: bool,
) -> str:
tok, mdl = _load_model(model_key, direction)
spec = MODEL_SPECS[model_key]
# language tags (if supported)
src_lang = spec["lang"]["ja" if direction == "ja-en" else "en"]
tgt_lang = spec["lang"]["en" if direction == "ja-en" else "ja"]
if spec.get("supports_src_lang") and hasattr(tok, "src_lang") and src_lang:
tok.src_lang = src_lang
inputs = tok(text, return_tensors="pt", truncation=True, max_length=512).to(DEVICE)
gen_opts = gen_kwargs_for_mode(bool(conversation), int(num_beams))
# forced BOS for multilingual models
forced_id = None
if spec.get("needs_forced_bos") and tgt_lang:
forced_id = _get_forced_bos_id(tok, tgt_lang)
generate_kwargs = dict(
**inputs,
max_new_tokens=int(max_new_tokens),
no_repeat_ngram_size=3,
length_penalty=1.05,
**gen_opts,
)
if forced_id is not None:
generate_kwargs["forced_bos_token_id"] = forced_id
out_ids = mdl.generate(**generate_kwargs)
return tok.batch_decode(out_ids, skip_special_tokens=True)[0]
def _clamp_int(v: Any, lo: int, hi: int, default: int) -> int:
try:
x = int(v)
return max(lo, min(hi, x))
except Exception:
return default
def _history_to_table(history: List[Dict[str, str]]) -> List[List[str]]:
# headers: time, direction, src, dst
rows = []
for item in history[-100:][::-1]: # show latest first, cap 100 rows
rows.append([item["time"], item["direction"], item["src"], item["dst"]])
return rows
def _export_history(history: List[Dict[str, str]], fmt: str) -> str:
tmpdir = tempfile.mkdtemp(prefix="history_")
if fmt == "csv":
path = os.path.join(tmpdir, "history.csv")
with open(path, "w", newline="", encoding="utf-8-sig") as f:
w = csv.writer(f)
w.writerow(["time", "direction", "src", "dst"])
for item in history:
w.writerow([item["time"], item["direction"], item["src"], item["dst"]])
return path
else:
path = os.path.join(tmpdir, "history.txt")
with open(path, "w", encoding="utf-8") as f:
for i, item in enumerate(history, 1):
f.write(f"[{i}] {item['time']} | {item['direction']}\n")
f.write(f"SRC: {item['src']}\n")
f.write(f"DST: {item['dst']}\n")
f.write("\n")
return path
def _read_batch_lines(file_path: str) -> List[str]:
"""
Accept:
- .txt: 1 line = 1 item
- .csv: use first column as src (ignores header if it looks like header)
"""
lower = (file_path or "").lower()
lines: List[str] = []
if lower.endswith(".csv"):
with open(file_path, "r", encoding="utf-8") as f:
r = csv.reader(f)
for row in islice(r, MAX_BATCH_LINES + 5):
if not row:
continue
val = (row[0] or "").strip()
if not val:
continue
# naive header skip
if len(lines) == 0 and val.lower() in ("src", "source", "text", "input"):
continue
lines.append(val)
if len(lines) >= MAX_BATCH_LINES:
break
else:
with open(file_path, "r", encoding="utf-8") as f:
for ln in islice(f, MAX_BATCH_LINES):
ln = ln.rstrip("\n").strip()
if ln:
lines.append(ln)
# total chars guard
total_chars = sum(len(x) for x in lines)
if total_chars > MAX_BATCH_CHARS_TOTAL:
# shrink until safe
kept = []
c = 0
for s in lines:
if c + len(s) > MAX_BATCH_CHARS_TOTAL:
break
kept.append(s)
c += len(s)
lines = kept
return lines
# -------------------------
# Gradio handlers
# -------------------------
def warmup(model_key: str) -> str:
t0 = time.time()
try:
_load_model(model_key, "ja-en")
used = time.time() - t0
return f"✅ Warmup OK ({used:.2f}s) — model: {model_key}"
except Exception as e:
return f"❌ Warmup failed: {e}"
def do_translate(
text: str,
model_key: str,
dir_choice: str,
auto_on: bool,
conversation_on: bool,
glossary_path: Optional[str],
max_new_tokens: int,
num_beams: int,
history: List[Dict[str, str]],
):
text = (text or "").strip()
if not text:
return "", "⚠️ テキストを入力してください。", history, _history_to_table(history), gr.update(visible=False), gr.update(visible=False)
if len(text) > MAX_SINGLE_CHARS:
return "", f"⚠️ 入力が長すぎます(最大 {MAX_SINGLE_CHARS} 文字)。", history, _history_to_table(history), gr.update(visible=False), gr.update(visible=False)
direction = detect_direction_by_text(text, prefer=dir_choice) if auto_on else dir_choice
glossary = read_glossary_csv(glossary_path)
src_processed = apply_glossary(text, glossary)
max_new_tokens = _clamp_int(max_new_tokens, 16, 512, DEFAULT_MAX_NEW_TOKENS)
num_beams = _clamp_int(num_beams, 1, 6, 4)
t0 = time.time()
try:
out = translate_one(
model_key=model_key,
direction=direction,
text=src_processed,
max_new_tokens=max_new_tokens,
num_beams=num_beams,
conversation=bool(conversation_on),
)
used = time.time() - t0
item = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"direction": direction,
"src": text,
"dst": out,
}
history = (history or []) + [item]
table = _history_to_table(history)
info = f"✅ 完了:{used:.2f}s|model: **{model_key}**|方向:**{direction}**|chars: {len(text)}"
# show export buttons when history exists
return out, info, history, table, gr.update(visible=True), gr.update(visible=True)
except Exception as e:
info = f"❌ 翻訳に失敗しました: {e}"
return "", info, history, _history_to_table(history), gr.update(visible=bool(history)), gr.update(visible=bool(history))
def clear_all(history):
history = []
return (
"", "🧹 クリアしました。",
history, [],
gr.update(visible=False), # dl_hist_csv
gr.update(visible=False), # dl_hist_txt
gr.update(visible=False, value=None), # dl_batch_csv
gr.update(visible=False, value=None), # dl_batch_txt
"", # batch_status
"", # batch_preview
)
def export_history_csv(history: List[Dict[str, str]]):
if not history:
return None
return _export_history(history, "csv")
def export_history_txt(history: List[Dict[str, str]]):
if not history:
return None
return _export_history(history, "txt")
def do_batch(
batch_file_path: Optional[str],
model_key: str,
conversation_on: bool,
glossary_path: Optional[str],
max_new_tokens: int,
num_beams: int,
):
if not batch_file_path:
yield "⚠️ バッチファイル(TXT/CSV)を選択してください。", "", gr.update(visible=False), None
return
lines = _read_batch_lines(batch_file_path)
total = len(lines)
if total == 0:
yield "⚠️ 読み取れる行がありません(空/制限超過の可能性)。", "", gr.update(visible=False), None
return
glossary = read_glossary_csv(glossary_path)
max_new_tokens = _clamp_int(max_new_tokens, 16, 512, DEFAULT_MAX_NEW_TOKENS)
num_beams = _clamp_int(num_beams, 1, 6, 4)
t0 = time.time()
rows: List[Tuple[str, str, str]] = [] # (direction, src, dst)
yield "⏳ バッチ翻訳中… 0/..", "", gr.update(visible=False, value=None), gr.update(visible=False, value=None)
for i, src in enumerate(lines, 1):
direction = detect_direction_by_text(src, prefer="ja-en")
src_processed = apply_glossary(src, glossary)
try:
dst = translate_one(
model_key=model_key,
direction=direction,
text=src_processed,
max_new_tokens=max_new_tokens,
num_beams=num_beams,
conversation=bool(conversation_on),
)
except Exception as e:
dst = f"[ERROR] {e}"
rows.append((direction, src, dst))
if i == 1 or i % 5 == 0 or i == total:
pct = int(i * 100 / total)
yield f"⏳ バッチ翻訳中… {i}/{total} ({pct}%)", "", gr.update(visible=False), None
# Preview (limit)
preview_lines = []
for idx, (direction, s, d) in enumerate(rows[:50], 1):
preview_lines.append(f"**{idx}. ({direction})**\n- SRC: {s}\n- DST: {d}\n")
preview = "\n".join(preview_lines)
if total > 50:
preview += f"\n…(プレビューは先頭50行まで。全{total}行はCSVでダウンロード)"
# Write result CSV
tmpdir = tempfile.mkdtemp(prefix="batch_")
out_csv = os.path.join(tmpdir, "batch_result.csv")
out_txt = os.path.join(tmpdir, "batch_result.txt")
with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
w = csv.writer(f)
w.writerow(["direction", "src", "dst"])
for direction, s, d in rows:
w.writerow([direction, s, d])
with open(out_txt, "w", encoding="utf-8") as f:
for i, (direction, s, d) in enumerate(rows, 1):
f.write(f"[{i}] ({direction})\n")
f.write(f"SRC: {s}\n")
f.write(f"DST: {d}\n\n")
used = time.time() - t0
done_msg = f"✅ バッチ完了:{used:.2f}s|行数:{total}"
yield (
done_msg,
preview,
gr.update(visible=True, value=out_csv),
gr.update(visible=True, value=out_txt),
)
# -------------------------
# UI
# -------------------------
CUSTOM_CSS = """
.gradio-container { max-width: 1100px !important; }
.header-title { font-size: 34px; font-weight: 900; letter-spacing: .4px; margin: 6px 0 4px; }
.subtle { opacity: 0.9; }
.badge { display: inline-block; padding: 2px 10px; border-radius: 999px; border: 1px solid rgba(120,120,120,.35); font-size: 12px; }
"""
with gr.Blocks(title="Linguo Core — Translation Space") as demo:
gr.HTML("<div class='header-title'>Linguo Core — Translation</div>")
gr.HTML("""
<script>
async function copyTextToClipboard(text){
try{
await navigator.clipboard.writeText(text || "");
return "✅ Copied!";
}catch(e){
// fallback: older browsers
const ta = document.createElement("textarea");
ta.value = text || "";
document.body.appendChild(ta);
ta.select();
document.execCommand("copy");
document.body.removeChild(ta);
return "✅ Copied!";
}
}
</script>
""")
gr.Markdown(
"<span class='badge'>HF Spaces</span> <span class='badge'>Public-safe</span> "
"<span class='badge'>Glossary CSV</span> <span class='badge'>History</span> <span class='badge'>Batch</span>",
elem_classes=["subtle"],
)
history_state = gr.State([]) # List[Dict]
with gr.Row():
model_key = gr.Dropdown(
choices=list(MODEL_SPECS.keys()),
value="m2m100-418M (multilingual, your current)",
label="Model(無料CPUなら opus-mt が速い)",
)
warm = gr.Button("Warmup(初回ロード)")
warm_info = gr.Markdown("")
with gr.Row():
direction = gr.Radio(["ja-en", "en-ja"], value="ja-en", label="Direction")
auto = gr.Checkbox(value=True, label="Auto detect (日本語が含まれたら ja-en)")
conversation = gr.Checkbox(value=False, label="Conversation mode(口語寄せ)")
info = gr.Markdown("翻訳待機中…")
with gr.Row(equal_height=True):
with gr.Column(scale=1):
src = gr.Textbox(lines=10, label="Input", placeholder="翻訳したい文章を入力…")
with gr.Row():
btn = gr.Button("Translate", variant="primary")
btn_clear = gr.Button("Clear")
with gr.Column(scale=1):
dst = gr.Textbox(lines=10, label="Output")
copy_btn = gr.Button("Copy Output")
copy_status = gr.Markdown("")
with gr.Accordion("Glossary / Advanced / History / Batch", open=False):
file_gloss = gr.File(label="Glossary CSV(src,tgt)", file_count="single", type="filepath")
with gr.Row():
max_len = gr.Slider(16, 512, DEFAULT_MAX_NEW_TOKENS, step=16, label="max_new_tokens")
beams = gr.Slider(1, 6, 4, step=1, label="num_beams(通常モード向け)")
gr.Markdown("### History(直近100件表示 / エクスポート可)")
history_table = gr.Dataframe(
headers=["time", "direction", "src", "dst"],
datatype=["str", "str", "str", "str"],
row_count=0,
column_count=(4, "fixed"),
wrap=True,
interactive=False,
value=[],
label="History",
)
with gr.Row():
btn_clear_history = gr.Button("Clear history")
dl_hist_csv = gr.DownloadButton("Download history CSV", visible=False)
dl_hist_txt = gr.DownloadButton("Download history TXT", visible=False)
gr.Markdown("### Batch(TXT/CSV:1行=1件 / 公開Space保護で最大200行)")
batch_file = gr.File(label="Batch file (TXT/CSV UTF-8)", file_count="single", type="filepath")
btn_batch = gr.Button("Run batch translate")
batch_status = gr.Markdown("")
batch_preview = gr.Markdown("")
dl_batch_csv = gr.DownloadButton("Download batch_result.csv", visible=False)
dl_batch_txt = gr.DownloadButton("Download batch_result.txt", visible=False)
# Events
warm.click(warmup, inputs=[model_key], outputs=[warm_info], queue=True)
btn.click(
do_translate,
inputs=[src, model_key, direction, auto, conversation, file_gloss, max_len, beams, history_state],
outputs=[dst, info, history_state, history_table, dl_hist_csv, dl_hist_txt],
queue=True,
)
def _noop(x):
return x
copy_btn.click(
fn=_noop,
inputs=[dst],
outputs=[],
js="(text) => copyTextToClipboard(text)"
).then(
fn=lambda: "✅ Copied to clipboard.",
inputs=None,
outputs=copy_status
)
src.submit(
do_translate,
inputs=[src, model_key, direction, auto, conversation, file_gloss, max_len, beams, history_state],
outputs=[dst, info, history_state, history_table, dl_hist_csv, dl_hist_txt],
queue=True,
)
btn_clear.click(
lambda h: ("", "🧹 入力をクリアしました。", h, _history_to_table(h), gr.update(visible=bool(h)), gr.update(visible=bool(h))),
inputs=[history_state],
outputs=[src, info, history_state, history_table, dl_hist_csv, dl_hist_txt],
queue=False,
)
btn_clear_history.click(
clear_all,
inputs=[history_state],
outputs=[src, info, history_state, history_table, dl_hist_csv, dl_hist_txt, dl_batch_csv, dl_batch_txt, batch_status, batch_preview],
queue=False,
)
dl_hist_csv.click(export_history_csv, inputs=[history_state], outputs=[dl_hist_csv], queue=False)
dl_hist_txt.click(export_history_txt, inputs=[history_state], outputs=[dl_hist_txt], queue=False)
btn_batch.click(
do_batch,
inputs=[batch_file, model_key, conversation, file_gloss, max_len, beams],
outputs=[batch_status, batch_preview, dl_batch_csv, dl_batch_txt],
queue=True,
)
demo.queue(max_size=16, default_concurrency_limit=1).launch()