Npc5's picture
Upload folder using huggingface_hub
9b8c45d verified
import os
def _ensure_valid_omp_threads() -> None:
# Some Spaces images set OMP_NUM_THREADS to invalid values; libgomp will error.
v = (os.environ.get("OMP_NUM_THREADS") or "").strip()
try:
if int(v) <= 0:
raise ValueError
except Exception:
os.environ["OMP_NUM_THREADS"] = "1"
_ensure_valid_omp_threads()
import re
import torch
import spaces
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel
MODEL_ID = "unsloth/Qwen2.5-7B-Instruct-bnb-4bit"
ADAPTER_ID = "Npc5/sutsilvan-qwen-2.5-7b"
# Global variables for model and tokenizer
tokenizer = None
model = None
USER_DE_TO_RM = "Translate the following German text to Sutsilvan Romansh:\n{text}"
_RE_PARAS = re.compile(r"\n\s*\n+")
_RE_SENTENCE_SPLIT = re.compile(r"(?<=[.!?…])\s+")
_RE_WS = re.compile(r"\s+")
def _ensure_valid_omp_threads() -> None:
# Some Spaces images set OMP_NUM_THREADS to invalid values; libgomp will error.
v = (os.environ.get("OMP_NUM_THREADS") or "").strip()
try:
if int(v) <= 0:
raise ValueError
except Exception:
os.environ["OMP_NUM_THREADS"] = "1"
def _normalize(text: str) -> str:
return _RE_WS.sub(" ", (text or "").replace("\r", " ").replace("\n", " ")).strip()
def _split_paragraphs(text: str) -> list[str]:
text = (text or "").replace("\r\n", "\n").replace("\r", "\n").strip()
if not text:
return []
return [p.strip() for p in _RE_PARAS.split(text) if p.strip()]
def _split_sentences(text: str) -> list[str]:
text = _normalize(text)
if not text:
return []
parts = _RE_SENTENCE_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
def _build_input_text(tokenizer, text: str) -> str:
prompt = USER_DE_TO_RM.format(text=text)
messages = [{"role": "user", "content": prompt}]
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
def _count_prompt_tokens(tokenizer, text: str) -> int:
input_text = _build_input_text(tokenizer, text)
return len(tokenizer(input_text, add_special_tokens=False).input_ids)
def _chunk_paragraph(tokenizer, paragraph: str, max_prompt_tokens: int) -> list[str]:
paragraph = paragraph.strip()
if not paragraph:
return []
if _count_prompt_tokens(tokenizer, paragraph) <= max_prompt_tokens:
return [paragraph]
sentences = _split_sentences(paragraph)
if not sentences:
return [paragraph]
chunks: list[str] = []
current = ""
def flush() -> None:
nonlocal current
if current:
chunks.append(current)
current = ""
for sentence in sentences:
candidate = sentence if not current else f"{current} {sentence}"
if _count_prompt_tokens(tokenizer, candidate) <= max_prompt_tokens:
current = candidate
continue
flush()
if _count_prompt_tokens(tokenizer, sentence) <= max_prompt_tokens:
current = sentence
continue
# Fallback: split an extremely long sentence by words.
words = sentence.split(" ")
piece = ""
for w in words:
candidate2 = w if not piece else f"{piece} {w}"
if _count_prompt_tokens(tokenizer, candidate2) <= max_prompt_tokens:
piece = candidate2
continue
if piece:
chunks.append(piece)
piece = w
if piece:
chunks.append(piece)
flush()
return chunks
def _chunk_text(tokenizer, text: str, max_prompt_tokens: int) -> list[list[str]]:
paragraphs = _split_paragraphs(text)
if not paragraphs:
return []
return [_chunk_paragraph(tokenizer, p, max_prompt_tokens) for p in paragraphs]
def load_model():
global tokenizer, model
if model is not None:
return model, tokenizer
print("Loading model and tokenizer...")
token = os.environ.get("HF_TOKEN")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=token)
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
base_model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
quantization_config=bnb_config,
device_map="auto",
token=token
)
model = PeftModel.from_pretrained(base_model, ADAPTER_ID, token=token)
model.eval()
return model, tokenizer
@spaces.GPU
def translate(text, max_new_tokens=128, temperature=0.1):
m, t = load_model()
text = (text or "").strip()
if not text:
return "", "Enter German text to translate."
# Keep inference stable + fast by capping the working context window.
env_ctx = int((os.environ.get("MAX_CONTEXT_TOKENS") or "4096").strip() or "4096")
model_ctx = int(getattr(getattr(m, "config", None), "max_position_embeddings", env_ctx) or env_ctx)
context_limit = max(512, min(env_ctx, model_ctx))
max_new_tokens = int(max_new_tokens)
safety = 64
min_prompt = 256
if max_new_tokens > context_limit - safety - min_prompt:
max_new_tokens = max(16, context_limit - safety - min_prompt)
# Chunk sizing: besides the context window, also cap input size relative to max_new_tokens
# so users don't get truncated translations when max_new_tokens is small.
prompt_overhead = _count_prompt_tokens(t, "")
ratio = float((os.environ.get("CHUNK_INPUT_TO_OUTPUT_RATIO") or "0.95").strip() or "0.95")
ratio = max(0.2, min(1.0, ratio))
max_prompt_tokens_context = context_limit - safety - max_new_tokens
max_prompt_tokens_output = prompt_overhead + int(max_new_tokens * ratio)
max_prompt_tokens = max(64, min(max_prompt_tokens_context, max_prompt_tokens_output))
para_chunks = _chunk_text(t, text, max_prompt_tokens=max_prompt_tokens)
if not para_chunks:
return "", "No translatable text found."
# Hard limit to avoid runaway multi-minute requests.
flat_chunks = sum((len(chs) for chs in para_chunks), 0)
max_chunks = int((os.environ.get("MAX_CHUNKS") or "400").strip() or "400")
if flat_chunks > max_chunks:
return (
"",
f"Text is too long ({flat_chunks} chunks). Increase MAX_CHUNKS or split the text.",
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def translate_chunk(chunk_text: str) -> str:
input_text = _build_input_text(t, chunk_text)
inputs = t(input_text, return_tensors="pt").to(device)
with torch.no_grad():
outputs = m.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
do_sample=True if temperature > 0 else False,
pad_token_id=t.eos_token_id,
)
out = t.decode(outputs[0][len(inputs["input_ids"][0]) :], skip_special_tokens=True)
return out.strip()
done = 0
translated_paras: list[str] = []
for para_index, chunks in enumerate(para_chunks, start=1):
translated_chunks: list[str] = []
for chunk in chunks:
chunk = chunk.strip()
if not chunk:
continue
translated_chunks.append(translate_chunk(chunk))
done += 1
# Stream partial output so long texts feel responsive.
partial_paras = translated_paras + [" ".join([c for c in translated_chunks if c]).strip()]
out_text = "\n\n".join([p for p in partial_paras if p]).strip()
info = (
f"Chunk {done}/{flat_chunks} | Paragraph {para_index}/{len(para_chunks)} | "
f"Ctx: {context_limit} | Prompt cap: {max_prompt_tokens} | New/chunk: {max_new_tokens}"
)
yield out_text, info
translated_paras.append(" ".join([c for c in translated_chunks if c]).strip())
out_text = "\n\n".join([p for p in translated_paras if p]).strip()
info = (
f"Done | Chunks: {flat_chunks} | Context cap: {context_limit} | "
f"Prompt cap: {max_prompt_tokens} | Max new tokens/chunk: {max_new_tokens}"
)
yield out_text, info
# Build UI
with gr.Blocks(title="Qwen Sutsilvan Translator") as demo:
gr.Markdown("# Qwen 2.5 Sutsilvan Translator")
gr.Markdown("Fine-tuned German -> Sutsilvan Romansh translation.")
with gr.Row():
with gr.Column():
input_text = gr.Textbox(
label="German",
lines=12,
placeholder="Paste German text here (short or long). The app will auto-chunk long inputs.",
)
max_tokens = gr.Slider(16, 2048, value=512, step=16, label="Max New Tokens (per chunk)")
temp = gr.Slider(0.0, 1.0, value=0.1, step=0.05, label="Temperature")
btn = gr.Button("Translate", variant="primary")
with gr.Column():
output_text = gr.Textbox(label="Sutsilvan", lines=12, interactive=False)
status = gr.Markdown()
btn.click(
fn=translate,
inputs=[input_text, max_tokens, temp],
outputs=[output_text, status],
concurrency_limit=1,
)
if __name__ == "__main__":
demo.queue()
demo.launch(max_threads=1)