import gradio as gr import fitz # PyMuPDF from PIL import Image import requests, json, uuid, os from io import BytesIO from concurrent.futures import ThreadPoolExecutor, as_completed import time # ================================================================ # FILE NORMALIZER (IMPORTANT for HuggingFace Spaces) # ================================================================ def resolve_file(file): """ Normalize Gradio file object into a real filesystem file path. Handles: - dict {name, data} (HF Spaces) - NamedString - tempfile object """ # Case 1: HF dict if isinstance(file, dict) and "data" in file: raw = file["data"] fname = file.get("name", f"{uuid.uuid4().hex}.bin") path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(fname)}" with open(path, "wb") as f: f.write(raw if isinstance(raw, bytes) else raw.read()) return path # Case 2: NamedString (file.name only) if hasattr(file, "name") and not hasattr(file, "path"): tmp_path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(file.name)}" with open(tmp_path, "wb") as f: f.write(open(file.name, "rb").read()) return tmp_path # Case 3: normal tempfile with path if hasattr(file, "name"): return file.name raise ValueError("Unsupported file format:", file) # ================================================================ # UNIVERSAL RESIZE: max bounds 800×1800, 1800×800, 1200×1200 # ================================================================ def resize_to_max_bounds(img, max_w1=800, max_h1=1800, max_w2=1800, max_h2=800, max_ws=1200, max_hs=1200): """Resize image so it stays under max bounds while preserving aspect ratio.""" w, h = img.size bounds = [ (max_w1, max_h1), (max_w2, max_h2), (max_ws, max_hs), ] scale = 1.0 for max_w, max_h in bounds: scale_w = max_w / w scale_h = max_h / h scale = min(scale, min(scale_w, scale_h)) if scale >= 1.0: return img new_size = (int(w * scale), int(h * scale)) return img.resize(new_size, Image.Resampling.LANCZOS) # ================================================================ # Preview resize # ================================================================ def resize_preview(img, max_size=400): w, h = img.size if max(w, h) <= max_size: return img scale = max_size / max(w, h) return img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS) # ================================================================ # Typhoon OCR API call # ================================================================ def run_typhoon_ocr(img_bytes, api_key, model, task_type, max_tokens, temperature, top_p, repetition_penalty): url = "https://api.opentyphoon.ai/v1/ocr" files = {"file": ("page.jpg", img_bytes, "image/jpeg")} data = { "model": model, "task_type": task_type, "max_tokens": str(max_tokens), "temperature": str(temperature), "top_p": str(top_p), "repetition_penalty": str(repetition_penalty), } headers = {"Authorization": f"Bearer {api_key}"} r = requests.post(url, files=files, data=data, headers=headers) if r.status_code != 200: return f"❌ Error {r.status_code}\n{r.text}" result = r.json() texts = [] for page in result.get("results", []): if page.get("success") and page.get("message"): content = page["message"]["choices"][0]["message"]["content"] try: parsed = json.loads(content) text = parsed.get("natural_text", content) except: text = content texts.append(text) else: texts.append(f"❌ Error: {page.get('error')}") return "\n\n".join(texts) # ================================================================ # PDF → Images (PyMuPDF) # ================================================================ def pdf_to_images_pymupdf(pdf_path, dpi=220): doc = fitz.open(pdf_path) zoom = dpi / 72 mat = fitz.Matrix(zoom, zoom) images = [] for page in doc: pix = page.get_pixmap(matrix=mat) img = Image.open(BytesIO(pix.tobytes("png"))) images.append(img) return images # ================================================================ # PREVIEW (GRID) # ================================================================ def preview_files(files): previews = [] for file in files: real_path = resolve_file(file) fp = real_path.lower() if fp.endswith(".pdf"): pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=120) for img in pdf_imgs: img = resize_to_max_bounds(img) previews.append(resize_preview(img)) else: img = Image.open(real_path) if img.mode == "RGBA": img = img.convert("RGB") img = resize_to_max_bounds(img) previews.append(resize_preview(img)) return previews # ================================================================ # OCR 1 PAGE (PARALLEL) # ================================================================ def ocr_single_page(page_img, label, api_key, model, task_type, max_tokens, temperature, top_p, repetition_penalty): buf = BytesIO() page_img.convert("RGB").save(buf, format="JPEG") buf.seek(0) txt = run_typhoon_ocr( buf.getvalue(), api_key, model, task_type, max_tokens, temperature, top_p, repetition_penalty ) return label, txt # ================================================================ # MAIN OCR LOGIC # ================================================================ def extract_text(files, api_key, model, task_type, max_tokens, temperature, top_p, repetition_penalty, progress=gr.Progress(track_tqdm=True)): if not files: return "❌ No files uploaded.", None images_to_ocr = [] labels = [] # LOAD FILES for file in files: real_path = resolve_file(file) fp = real_path.lower() if fp.endswith(".pdf"): pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=220) for idx, img in enumerate(pdf_imgs, start=1): img = resize_to_max_bounds(img) images_to_ocr.append(img) labels.append(f"{os.path.basename(real_path)} - Page {idx}") else: img = Image.open(real_path) if img.mode == "RGBA": img = img.convert("RGB") img = resize_to_max_bounds(img) images_to_ocr.append(img) labels.append(os.path.basename(real_path)) total = len(images_to_ocr) progress(0.03, desc=f"Loaded {total} pages/images") # PARALLEL OCR results = {} start = time.time() with ThreadPoolExecutor(max_workers=4) as ex: futures = [] for img, lbl in zip(images_to_ocr, labels): futures.append( ex.submit( ocr_single_page, img, lbl, api_key, model, task_type, max_tokens, temperature, top_p, repetition_penalty ) ) done = 0 for f in as_completed(futures): lbl, txt = f.result() results[lbl] = txt done += 1 elapsed = time.time() - start eta = (total - done) * (elapsed / max(done, 1)) progress(done / total, desc=f"OCR {done}/{total} | ETA {eta:.1f}s") progress(1, desc="OCR Completed ✔") # MERGE RESULT merged = "" for lbl in sorted(results.keys()): merged += f"## {lbl}\n{results[lbl]}\n\n" out_path = f"/tmp/ocr_{uuid.uuid4().hex}.txt" with open(out_path, "w", encoding="utf-8") as f: f.write(merged) return merged, out_path # ================================================================ # UI # ================================================================ with gr.Blocks() as demo: gr.Markdown(""" # 🔍 Typhoon OCR v1.5 ### Multi-file OCR • Parallel Processing • ETA • PDF/Image Support ⚡ **High-speed OCR powered by Typhoon** 📄 Upload **multiple images or PDFs** 🚀 Parallel OCR with ETA 🔍 Auto preview grid for all pages --- ## 🔑 Get Your API Key 👉 https://playground.opentyphoon.ai/settings/api-key After logging in, look at the **top-right corner** → you'll see **API Key** menu. Click it to generate or copy your key. """) gr.Markdown("### 📘 How to get API Key (step-by-step)") with gr.Row(): gr.Gallery( [ ("ocr_login.png", "1) Login"), ("ocr_first.png", "2) Find API Key Menu"), ("ocr_getkey.png", "3) Copy Your Key"), ], columns=3, height=250, show_label=False, ) file_input = gr.Files(label="Upload images or PDFs", file_count="multiple") preview_gallery = gr.Gallery(label="Preview", columns=3, height="auto") file_input.change(preview_files, inputs=file_input, outputs=preview_gallery) # ADVANCED SETTINGS with gr.Accordion("⚙️ Advanced Settings", open=False): model_box = gr.Textbox(value="typhoon-ocr", label="Model") task_type_box = gr.Textbox(value="v1.5", label="Task Type") max_tokens_box = gr.Number(value=16000, label="Max Tokens") temperature_box = gr.Number(value=0.1, label="Temperature") top_p_box = gr.Number(value=0.6, label="Top-p") repetition_penalty_box = gr.Number(value=1.2, label="Repetition Penalty") api_key_box = gr.Textbox(label="API Key", type="password") run_btn = gr.Button("🚀 Run OCR") output_box = gr.Markdown(label="OCR Output") download_btn = gr.File(label="Download (.txt)") run_btn.click( extract_text, inputs=[ file_input, api_key_box, model_box, task_type_box, max_tokens_box, temperature_box, top_p_box, repetition_penalty_box, ], outputs=[output_box, download_btn], ) demo.launch()