""" MonSub LLM Editor — Self-bootstrapping RunPod Serverless Handler Loads: Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled (base) + Tsedee/mongol-editor-llm-v1 (LoRA adapter) [swap to -v2 after v2 training] Accepts batches of raw Whisper-style text segments and returns edited Mongolian subtitle text with post-processing: - Brand name correction (chita→GTA, аифон→iPhone, etc.) - Hallucination guard (rejects outputs that are too different from input) - Chain-of-thought stripping (keeps only "Засварласан хувилбар:" content) - tag cleanup API: Input (JSON): { "texts": ["text 1", "text 2", ...], # required "mode": "edit" | "summarize" | "rewrite", # default: "edit" "instruction": "optional custom prompt", # optional "skip_post_processing": false # optional } Output: { "edited": ["edited 1", "edited 2", ...], "stats": { "count": N, "time_s": T, "tokens_per_s": X }, "fallback_used": [idx1, idx2, ...] # indices where hallucination guard fired } """ import os, sys, subprocess, time # ═══════════════════════════════════════════════════════════════════════ # BOOTSTRAP # ═══════════════════════════════════════════════════════════════════════ def ensure(pkg_import, pip_name=None): try: __import__(pkg_import) except ImportError: name = pip_name or pkg_import print(f"[BOOT] installing {name}...", flush=True) subprocess.run([sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", name], check=True) print("[BOOT] LLM editor handler starting...", flush=True) t0 = time.time() ensure("runpod") ensure("transformers", "transformers==5.5.0") ensure("peft", "peft==0.18.1") ensure("accelerate", "accelerate>=1.0.0") ensure("huggingface_hub") print(f"[BOOT] deps ready in {time.time()-t0:.1f}s", flush=True) # ── Module-level: only stdlib + runpod ────────────────────────────── os.environ.setdefault("CUBLAS_WORKSPACE_CONFIG", ":4096:8") import re import traceback import runpod HF_TOKEN = os.environ.get("HF_TOKEN", "") BASE_MODEL = os.environ.get("BASE_MODEL", "Jackrong/Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled") ADAPTER_REPO = os.environ.get("ADAPTER_REPO", "Tsedee/mongol-editor-llm-v1") MODEL = None TOKENIZER = None torch = None # lazy-loaded # ═══════════════════════════════════════════════════════════════════════ # BRAND CORRECTION DICT — post-processing safety net # ═══════════════════════════════════════════════════════════════════════ # Applied AFTER model output to catch brand names the model missed. # Case-insensitive substring match with word boundaries where possible. BRAND_FIXES = [ # (pattern_regex, replacement) # Games (r"\bчита\s*5\b", "GTA 5"), (r"\bжита\s*5\b", "GTA 5"), (r"\bгта\s*5\b", "GTA 5"), (r"\bчита\s*6\b", "GTA 6"), (r"\bжита\s*6\b", "GTA 6"), (r"\bгта\s*6\b", "GTA 6"), (r"\bфифа\b", "FIFA"), (r"\bкол\s*оф\s*дюти\b", "Call of Duty"), (r"\bкалл\s*оф\s*дюти\b", "Call of Duty"), (r"\bмайнкрафт\b", "Minecraft"), (r"\bмайн\s*крафт\b", "Minecraft"), (r"\bроблокс\b", "Roblox"), (r"\bфортнайт\b", "Fortnite"), (r"\bвальорант\b", "Valorant"), (r"\bвалорант\b", "Valorant"), (r"\bбагстари\b", "Rockstar Games"), (r"\bбагстар\b", "Rockstar Games"), (r"\bпубг\b", "PUBG"), (r"\bкс\s*го\b", "CS:GO"), (r"\bдота\s*2\b", "Dota 2"), (r"\bюбисофт\b", "Ubisoft"), (r"\bстим\b", "Steam"), # Tech (r"\bаифон\b", "iPhone"), (r"\bайфон\b", "iPhone"), (r"\bипад\b", "iPad"), (r"\bайпад\b", "iPad"), (r"\bмакбүүк\b", "MacBook"), (r"\bмакбук\b", "MacBook"), (r"\bэйрподс\b", "AirPods"), (r"\bсамсунг\b", "Samsung"), (r"\bгугл\b", "Google"), (r"\bгүүгэл\b", "Google"), (r"\bхуавей\b", "Huawei"), (r"\bшаоми\b", "Xiaomi"), (r"\bсяоми\b", "Xiaomi"), (r"\bредми\b", "Redmi"), (r"\bэпл\b", "Apple"), # Apps / Social (r"\bютуб\b", "YouTube"), (r"\bютүүб\b", "YouTube"), (r"\bтик\s*ток\b", "TikTok"), (r"\bтикток\b", "TikTok"), (r"\bинстаграм\b", "Instagram"), (r"\bфэйсбүүк\b", "Facebook"), (r"\bфейсбук\b", "Facebook"), (r"\bвацап\b", "WhatsApp"), (r"\bватсап\b", "WhatsApp"), (r"\bтелеграм\b", "Telegram"), (r"\bдискорд\b", "Discord"), (r"\bтвиттер\b", "Twitter"), (r"\bспотифай\b", "Spotify"), (r"\bнетфликс\b", "Netflix"), (r"\bубер\b", "Uber"), (r"\bчат\s*жпт\b", "ChatGPT"), (r"\bчатгпт\b", "ChatGPT"), (r"\bмиджорни\b", "Midjourney"), # Music / celebs (r"\bбтс\b", "BTS"), (r"\bбтэс\b", "BTS"), (r"\bблэкпинк\b", "BLACKPINK"), (r"\bблэк\s*пинк\b", "BLACKPINK"), # Common proper nouns (r"\bулаанбаатар\b", "Улаанбаатар"), (r"\bмонгол\s+улс\b", "Монгол Улс"), (r"\bзасгийн\s+газар\b", "Засгийн газар"), (r"\bуих\b", "УИХ"), (r"\bмуис\b", "МУИС"), ] COMPILED_BRAND_FIXES = [(re.compile(pat, re.IGNORECASE), rep) for pat, rep in BRAND_FIXES] def apply_brand_fixes(text: str) -> str: """Apply brand name corrections. Case-insensitive substitution.""" if not text: return text for pattern, replacement in COMPILED_BRAND_FIXES: text = pattern.sub(replacement, text) return text # ═══════════════════════════════════════════════════════════════════════ # OUTPUT PARSING & HALLUCINATION GUARD # ═══════════════════════════════════════════════════════════════════════ def strip_reasoning(raw_output: str) -> str: """ Extract the final edited version from model output. The training format is: Энэ өгүүлбэрт дараах зүйлс засах хэрэгтэй: 1. ... 2. ... Засварласан хувилбар: We want just . Strategy: 1. Split on "Засварласан хувилбар:" — take everything after 2. Split on "" — take first half (before tag) 3. Strip whitespace 4. If step 1 fails, return input as-is (assume model output was direct) """ if not raw_output: return "" text = raw_output # Prefer content after "Засварласан хувилбар:" marker = "Засварласан хувилбар:" if marker in text: text = text.split(marker, 1)[1] else: # Fallback markers for alt in ("Засварласан өгүүлбэр:", "Эцсийн хувилбар:", "Зөв хувилбар:"): if alt in text: text = text.split(alt, 1)[1] break # Cut at — anything after is a duplicate if "" in text: text = text.split("", 1)[0] if "" in text: # take content after ... block OR before it parts = text.split("", 1) text = parts[0] if parts[0].strip() else parts[1].split("", 1)[-1] # Sometimes the chain-of-thought bleeds in — cut at first blank line # AFTER a colon list ("1. ..." or similar) lines = [ln.rstrip() for ln in text.strip().split("\n")] # If first line is a list item, drop lines until we hit blank cleaned = [] skip_list = False for ln in lines: stripped = ln.strip() if re.match(r"^\d+\.\s", stripped): skip_list = True continue if skip_list and stripped == "": skip_list = False continue if skip_list: continue cleaned.append(ln) out = "\n".join(cleaned).strip() return out or text.strip() def hallucination_guard(original: str, edited: str, max_ratio: float = 1.6) -> tuple[str, bool]: """ Guard against hallucination: if the edited text is drastically longer than the original OR introduces too many new tokens, fall back to the original (optionally with light cleanup). Returns (text, fallback_used). """ if not edited: return original, True orig_len = max(len(original), 1) edit_len = len(edited) # Rule 1: too much longer (model invented content) if edit_len > orig_len * max_ratio and edit_len > orig_len + 40: return original, True # Rule 2: too much shorter (model truncated unexpectedly) if edit_len < orig_len * 0.4 and orig_len > 20: return original, True # Rule 3: zero overlap with original words (wrong topic) orig_words = set(re.findall(r"\w+", original.lower())) edit_words = set(re.findall(r"\w+", edited.lower())) if orig_words and len(orig_words & edit_words) / len(orig_words) < 0.3: return original, True return edited, False # ═══════════════════════════════════════════════════════════════════════ # MODEL LOADING (lazy, fork-safe) # ═══════════════════════════════════════════════════════════════════════ def load_model(): global MODEL, TOKENIZER, torch if MODEL is not None: return t = time.time() print("[LOAD] importing torch...", flush=True) import torch as _torch torch = _torch print("[LOAD] importing transformers + peft...", flush=True) from transformers import AutoTokenizer, AutoModelForCausalLM from peft import PeftModel print(f"[LOAD] CUDA available: {torch.cuda.is_available()}", flush=True) if torch.cuda.is_available(): print(f"[LOAD] device: {torch.cuda.get_device_name(0)}", flush=True) torch.cuda.init() torch.backends.cuda.matmul.allow_tf32 = True print(f"[LOAD] tokenizer from {ADAPTER_REPO}...", flush=True) TOKENIZER = AutoTokenizer.from_pretrained( ADAPTER_REPO, token=HF_TOKEN, trust_remote_code=True ) if TOKENIZER.pad_token is None: TOKENIZER.pad_token = TOKENIZER.eos_token print(f"[LOAD] base model {BASE_MODEL}...", flush=True) base = AutoModelForCausalLM.from_pretrained( BASE_MODEL, dtype=torch.bfloat16, device_map="auto", trust_remote_code=True, token=HF_TOKEN, attn_implementation="eager", ) print(f"[LOAD] adapter {ADAPTER_REPO}...", flush=True) MODEL = PeftModel.from_pretrained(base, ADAPTER_REPO, token=HF_TOKEN) MODEL.eval() print(f"[LOAD] ready in {time.time()-t:.1f}s · " f"VRAM {torch.cuda.memory_allocated()/1e9:.2f}GB", flush=True) # ═══════════════════════════════════════════════════════════════════════ # INFERENCE # ═══════════════════════════════════════════════════════════════════════ INSTRUCTIONS = { "edit": "Дараах ASR-ээс гарсан текстийг засварлаж, зөв subtitle болгоно уу.", "summarize": "Дараах бичлэгийн агуулгыг товчилно уу.", "rewrite": "Дараах өгүүлбэрийг уран бичлэгтэй болгон засна уу.", } def generate_one(text: str, instruction: str, max_new_tokens: int = 256) -> str: """Run the model on a single text with the given instruction.""" user_msg = f"{instruction}\n\n{text}" prompt = TOKENIZER.apply_chat_template( [{"role": "user", "content": user_msg}], tokenize=False, add_generation_prompt=True, ) inputs = TOKENIZER(prompt, return_tensors="pt", truncation=True, max_length=1024).to(MODEL.device) with torch.no_grad(): out_ids = MODEL.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, temperature=1.0, repetition_penalty=1.05, pad_token_id=TOKENIZER.pad_token_id, ) new_tokens = out_ids[0][inputs["input_ids"].shape[1]:] raw = TOKENIZER.decode(new_tokens, skip_special_tokens=True).strip() return raw def handler(event): """RunPod serverless entry point.""" try: t_total = time.time() load_model() inp = event.get("input", {}) or {} texts = inp.get("texts") if not texts or not isinstance(texts, list): return {"error": "Missing 'texts' list in input"} mode = inp.get("mode", "edit") custom_instruction = inp.get("instruction") skip_post = bool(inp.get("skip_post_processing", False)) max_new_tokens = int(inp.get("max_new_tokens", 256)) instruction = custom_instruction or INSTRUCTIONS.get(mode, INSTRUCTIONS["edit"]) edited = [] fallback_used = [] total_tokens = 0 for i, text in enumerate(texts): if not text or not text.strip(): edited.append(text) continue try: raw = generate_one(text, instruction, max_new_tokens=max_new_tokens) parsed = strip_reasoning(raw) if mode == "edit" and not skip_post: # Hallucination guard guarded, is_fallback = hallucination_guard(text, parsed) # Brand fixes (applied to both fallback and edit) guarded = apply_brand_fixes(guarded) if is_fallback: fallback_used.append(i) edited.append(guarded) else: edited.append(parsed) total_tokens += len(raw.split()) except Exception as e: print(f"[ERR] segment {i}: {e}", flush=True) traceback.print_exc() # On any failure, return the original text unchanged edited.append(text) fallback_used.append(i) elapsed = time.time() - t_total return { "edited": edited, "stats": { "count": len(texts), "time_s": round(elapsed, 2), "tokens_per_s": round(total_tokens / elapsed, 1) if elapsed > 0 else 0, }, "fallback_used": fallback_used, "mode": mode, "model": ADAPTER_REPO, } except Exception as e: traceback.print_exc() return {"error": str(e)} # ═══════════════════════════════════════════════════════════════════════ # ENTRY POINT # ═══════════════════════════════════════════════════════════════════════ if __name__ == "__main__": print(f"[BOOT] total bootstrap time: {time.time()-t0:.1f}s", flush=True) runpod.serverless.start({"handler": handler})