Spaces:
Runtime error
Runtime error
| import os | |
| def _ensure_valid_omp_threads() -> None: | |
| # Some Spaces images set OMP_NUM_THREADS to invalid values; libgomp will error. | |
| v = (os.environ.get("OMP_NUM_THREADS") or "").strip() | |
| try: | |
| if int(v) <= 0: | |
| raise ValueError | |
| except Exception: | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| _ensure_valid_omp_threads() | |
| import re | |
| import torch | |
| import spaces | |
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
| from peft import PeftModel | |
| MODEL_ID = "unsloth/Qwen2.5-7B-Instruct-bnb-4bit" | |
| ADAPTER_ID = "Npc5/sutsilvan-qwen-2.5-7b" | |
| # Global variables for model and tokenizer | |
| tokenizer = None | |
| model = None | |
| USER_DE_TO_RM = "Translate the following German text to Sutsilvan Romansh:\n{text}" | |
| _RE_PARAS = re.compile(r"\n\s*\n+") | |
| _RE_SENTENCE_SPLIT = re.compile(r"(?<=[.!?…])\s+") | |
| _RE_WS = re.compile(r"\s+") | |
| def _ensure_valid_omp_threads() -> None: | |
| # Some Spaces images set OMP_NUM_THREADS to invalid values; libgomp will error. | |
| v = (os.environ.get("OMP_NUM_THREADS") or "").strip() | |
| try: | |
| if int(v) <= 0: | |
| raise ValueError | |
| except Exception: | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| def _normalize(text: str) -> str: | |
| return _RE_WS.sub(" ", (text or "").replace("\r", " ").replace("\n", " ")).strip() | |
| def _split_paragraphs(text: str) -> list[str]: | |
| text = (text or "").replace("\r\n", "\n").replace("\r", "\n").strip() | |
| if not text: | |
| return [] | |
| return [p.strip() for p in _RE_PARAS.split(text) if p.strip()] | |
| def _split_sentences(text: str) -> list[str]: | |
| text = _normalize(text) | |
| if not text: | |
| return [] | |
| parts = _RE_SENTENCE_SPLIT.split(text) | |
| return [p.strip() for p in parts if p.strip()] | |
| def _build_input_text(tokenizer, text: str) -> str: | |
| prompt = USER_DE_TO_RM.format(text=text) | |
| messages = [{"role": "user", "content": prompt}] | |
| return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| def _count_prompt_tokens(tokenizer, text: str) -> int: | |
| input_text = _build_input_text(tokenizer, text) | |
| return len(tokenizer(input_text, add_special_tokens=False).input_ids) | |
| def _chunk_paragraph(tokenizer, paragraph: str, max_prompt_tokens: int) -> list[str]: | |
| paragraph = paragraph.strip() | |
| if not paragraph: | |
| return [] | |
| if _count_prompt_tokens(tokenizer, paragraph) <= max_prompt_tokens: | |
| return [paragraph] | |
| sentences = _split_sentences(paragraph) | |
| if not sentences: | |
| return [paragraph] | |
| chunks: list[str] = [] | |
| current = "" | |
| def flush() -> None: | |
| nonlocal current | |
| if current: | |
| chunks.append(current) | |
| current = "" | |
| for sentence in sentences: | |
| candidate = sentence if not current else f"{current} {sentence}" | |
| if _count_prompt_tokens(tokenizer, candidate) <= max_prompt_tokens: | |
| current = candidate | |
| continue | |
| flush() | |
| if _count_prompt_tokens(tokenizer, sentence) <= max_prompt_tokens: | |
| current = sentence | |
| continue | |
| # Fallback: split an extremely long sentence by words. | |
| words = sentence.split(" ") | |
| piece = "" | |
| for w in words: | |
| candidate2 = w if not piece else f"{piece} {w}" | |
| if _count_prompt_tokens(tokenizer, candidate2) <= max_prompt_tokens: | |
| piece = candidate2 | |
| continue | |
| if piece: | |
| chunks.append(piece) | |
| piece = w | |
| if piece: | |
| chunks.append(piece) | |
| flush() | |
| return chunks | |
| def _chunk_text(tokenizer, text: str, max_prompt_tokens: int) -> list[list[str]]: | |
| paragraphs = _split_paragraphs(text) | |
| if not paragraphs: | |
| return [] | |
| return [_chunk_paragraph(tokenizer, p, max_prompt_tokens) for p in paragraphs] | |
| def load_model(): | |
| global tokenizer, model | |
| if model is not None: | |
| return model, tokenizer | |
| print("Loading model and tokenizer...") | |
| token = os.environ.get("HF_TOKEN") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, token=token) | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16 | |
| ) | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| token=token | |
| ) | |
| model = PeftModel.from_pretrained(base_model, ADAPTER_ID, token=token) | |
| model.eval() | |
| return model, tokenizer | |
| def translate(text, max_new_tokens=128, temperature=0.1): | |
| m, t = load_model() | |
| text = (text or "").strip() | |
| if not text: | |
| return "", "Enter German text to translate." | |
| # Keep inference stable + fast by capping the working context window. | |
| env_ctx = int((os.environ.get("MAX_CONTEXT_TOKENS") or "4096").strip() or "4096") | |
| model_ctx = int(getattr(getattr(m, "config", None), "max_position_embeddings", env_ctx) or env_ctx) | |
| context_limit = max(512, min(env_ctx, model_ctx)) | |
| max_new_tokens = int(max_new_tokens) | |
| safety = 64 | |
| min_prompt = 256 | |
| if max_new_tokens > context_limit - safety - min_prompt: | |
| max_new_tokens = max(16, context_limit - safety - min_prompt) | |
| # Chunk sizing: besides the context window, also cap input size relative to max_new_tokens | |
| # so users don't get truncated translations when max_new_tokens is small. | |
| prompt_overhead = _count_prompt_tokens(t, "") | |
| ratio = float((os.environ.get("CHUNK_INPUT_TO_OUTPUT_RATIO") or "0.95").strip() or "0.95") | |
| ratio = max(0.2, min(1.0, ratio)) | |
| max_prompt_tokens_context = context_limit - safety - max_new_tokens | |
| max_prompt_tokens_output = prompt_overhead + int(max_new_tokens * ratio) | |
| max_prompt_tokens = max(64, min(max_prompt_tokens_context, max_prompt_tokens_output)) | |
| para_chunks = _chunk_text(t, text, max_prompt_tokens=max_prompt_tokens) | |
| if not para_chunks: | |
| return "", "No translatable text found." | |
| # Hard limit to avoid runaway multi-minute requests. | |
| flat_chunks = sum((len(chs) for chs in para_chunks), 0) | |
| max_chunks = int((os.environ.get("MAX_CHUNKS") or "400").strip() or "400") | |
| if flat_chunks > max_chunks: | |
| return ( | |
| "", | |
| f"Text is too long ({flat_chunks} chunks). Increase MAX_CHUNKS or split the text.", | |
| ) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def translate_chunk(chunk_text: str) -> str: | |
| input_text = _build_input_text(t, chunk_text) | |
| inputs = t(input_text, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = m.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| do_sample=True if temperature > 0 else False, | |
| pad_token_id=t.eos_token_id, | |
| ) | |
| out = t.decode(outputs[0][len(inputs["input_ids"][0]) :], skip_special_tokens=True) | |
| return out.strip() | |
| done = 0 | |
| translated_paras: list[str] = [] | |
| for para_index, chunks in enumerate(para_chunks, start=1): | |
| translated_chunks: list[str] = [] | |
| for chunk in chunks: | |
| chunk = chunk.strip() | |
| if not chunk: | |
| continue | |
| translated_chunks.append(translate_chunk(chunk)) | |
| done += 1 | |
| # Stream partial output so long texts feel responsive. | |
| partial_paras = translated_paras + [" ".join([c for c in translated_chunks if c]).strip()] | |
| out_text = "\n\n".join([p for p in partial_paras if p]).strip() | |
| info = ( | |
| f"Chunk {done}/{flat_chunks} | Paragraph {para_index}/{len(para_chunks)} | " | |
| f"Ctx: {context_limit} | Prompt cap: {max_prompt_tokens} | New/chunk: {max_new_tokens}" | |
| ) | |
| yield out_text, info | |
| translated_paras.append(" ".join([c for c in translated_chunks if c]).strip()) | |
| out_text = "\n\n".join([p for p in translated_paras if p]).strip() | |
| info = ( | |
| f"Done | Chunks: {flat_chunks} | Context cap: {context_limit} | " | |
| f"Prompt cap: {max_prompt_tokens} | Max new tokens/chunk: {max_new_tokens}" | |
| ) | |
| yield out_text, info | |
| # Build UI | |
| with gr.Blocks(title="Qwen Sutsilvan Translator") as demo: | |
| gr.Markdown("# Qwen 2.5 Sutsilvan Translator") | |
| gr.Markdown("Fine-tuned German -> Sutsilvan Romansh translation.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text = gr.Textbox( | |
| label="German", | |
| lines=12, | |
| placeholder="Paste German text here (short or long). The app will auto-chunk long inputs.", | |
| ) | |
| max_tokens = gr.Slider(16, 2048, value=512, step=16, label="Max New Tokens (per chunk)") | |
| temp = gr.Slider(0.0, 1.0, value=0.1, step=0.05, label="Temperature") | |
| btn = gr.Button("Translate", variant="primary") | |
| with gr.Column(): | |
| output_text = gr.Textbox(label="Sutsilvan", lines=12, interactive=False) | |
| status = gr.Markdown() | |
| btn.click( | |
| fn=translate, | |
| inputs=[input_text, max_tokens, temp], | |
| outputs=[output_text, status], | |
| concurrency_limit=1, | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch(max_threads=1) | |