Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import requests, json, uuid, os | |
| from io import BytesIO | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import time | |
| # ================================================================ | |
| # FILE NORMALIZER (IMPORTANT for HuggingFace Spaces) | |
| # ================================================================ | |
| def resolve_file(file): | |
| """ | |
| Normalize Gradio file object into a real filesystem file path. | |
| Handles: | |
| - dict {name, data} (HF Spaces) | |
| - NamedString | |
| - tempfile object | |
| """ | |
| # Case 1: HF dict | |
| if isinstance(file, dict) and "data" in file: | |
| raw = file["data"] | |
| fname = file.get("name", f"{uuid.uuid4().hex}.bin") | |
| path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(fname)}" | |
| with open(path, "wb") as f: | |
| f.write(raw if isinstance(raw, bytes) else raw.read()) | |
| return path | |
| # Case 2: NamedString (file.name only) | |
| if hasattr(file, "name") and not hasattr(file, "path"): | |
| tmp_path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(file.name)}" | |
| with open(tmp_path, "wb") as f: | |
| f.write(open(file.name, "rb").read()) | |
| return tmp_path | |
| # Case 3: normal tempfile with path | |
| if hasattr(file, "name"): | |
| return file.name | |
| raise ValueError("Unsupported file format:", file) | |
| # ================================================================ | |
| # UNIVERSAL RESIZE: max bounds 800Γ1800, 1800Γ800, 1200Γ1200 | |
| # ================================================================ | |
| def resize_to_max_bounds(img, | |
| max_w1=800, max_h1=1800, | |
| max_w2=1800, max_h2=800, | |
| max_ws=1200, max_hs=1200): | |
| """Resize image so it stays under max bounds while preserving aspect ratio.""" | |
| w, h = img.size | |
| bounds = [ | |
| (max_w1, max_h1), | |
| (max_w2, max_h2), | |
| (max_ws, max_hs), | |
| ] | |
| scale = 1.0 | |
| for max_w, max_h in bounds: | |
| scale_w = max_w / w | |
| scale_h = max_h / h | |
| scale = min(scale, min(scale_w, scale_h)) | |
| if scale >= 1.0: | |
| return img | |
| new_size = (int(w * scale), int(h * scale)) | |
| return img.resize(new_size, Image.Resampling.LANCZOS) | |
| # ================================================================ | |
| # Preview resize | |
| # ================================================================ | |
| def resize_preview(img, max_size=400): | |
| w, h = img.size | |
| if max(w, h) <= max_size: | |
| return img | |
| scale = max_size / max(w, h) | |
| return img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS) | |
| # ================================================================ | |
| # Typhoon OCR API call | |
| # ================================================================ | |
| def run_typhoon_ocr(img_bytes, api_key, model, task_type, | |
| max_tokens, temperature, top_p, repetition_penalty): | |
| url = "https://api.opentyphoon.ai/v1/ocr" | |
| files = {"file": ("page.jpg", img_bytes, "image/jpeg")} | |
| data = { | |
| "model": model, | |
| "task_type": task_type, | |
| "max_tokens": str(max_tokens), | |
| "temperature": str(temperature), | |
| "top_p": str(top_p), | |
| "repetition_penalty": str(repetition_penalty), | |
| } | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| r = requests.post(url, files=files, data=data, headers=headers) | |
| if r.status_code != 200: | |
| return f"β Error {r.status_code}\n{r.text}" | |
| result = r.json() | |
| texts = [] | |
| for page in result.get("results", []): | |
| if page.get("success") and page.get("message"): | |
| content = page["message"]["choices"][0]["message"]["content"] | |
| try: | |
| parsed = json.loads(content) | |
| text = parsed.get("natural_text", content) | |
| except: | |
| text = content | |
| texts.append(text) | |
| else: | |
| texts.append(f"β Error: {page.get('error')}") | |
| return "\n\n".join(texts) | |
| # ================================================================ | |
| # PDF β Images (PyMuPDF) | |
| # ================================================================ | |
| def pdf_to_images_pymupdf(pdf_path, dpi=220): | |
| doc = fitz.open(pdf_path) | |
| zoom = dpi / 72 | |
| mat = fitz.Matrix(zoom, zoom) | |
| images = [] | |
| for page in doc: | |
| pix = page.get_pixmap(matrix=mat) | |
| img = Image.open(BytesIO(pix.tobytes("png"))) | |
| images.append(img) | |
| return images | |
| # ================================================================ | |
| # PREVIEW (GRID) | |
| # ================================================================ | |
| def preview_files(files): | |
| previews = [] | |
| for file in files: | |
| real_path = resolve_file(file) | |
| fp = real_path.lower() | |
| if fp.endswith(".pdf"): | |
| pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=120) | |
| for img in pdf_imgs: | |
| img = resize_to_max_bounds(img) | |
| previews.append(resize_preview(img)) | |
| else: | |
| img = Image.open(real_path) | |
| if img.mode == "RGBA": | |
| img = img.convert("RGB") | |
| img = resize_to_max_bounds(img) | |
| previews.append(resize_preview(img)) | |
| return previews | |
| # ================================================================ | |
| # OCR 1 PAGE (PARALLEL) | |
| # ================================================================ | |
| def ocr_single_page(page_img, label, | |
| api_key, model, task_type, max_tokens, | |
| temperature, top_p, repetition_penalty): | |
| buf = BytesIO() | |
| page_img.convert("RGB").save(buf, format="JPEG") | |
| buf.seek(0) | |
| txt = run_typhoon_ocr( | |
| buf.getvalue(), api_key, model, task_type, | |
| max_tokens, temperature, top_p, repetition_penalty | |
| ) | |
| return label, txt | |
| # ================================================================ | |
| # MAIN OCR LOGIC | |
| # ================================================================ | |
| def extract_text(files, | |
| api_key, model, task_type, max_tokens, | |
| temperature, top_p, repetition_penalty, | |
| progress=gr.Progress(track_tqdm=True)): | |
| if not files: | |
| return "β No files uploaded.", None | |
| images_to_ocr = [] | |
| labels = [] | |
| # LOAD FILES | |
| for file in files: | |
| real_path = resolve_file(file) | |
| fp = real_path.lower() | |
| if fp.endswith(".pdf"): | |
| pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=220) | |
| for idx, img in enumerate(pdf_imgs, start=1): | |
| img = resize_to_max_bounds(img) | |
| images_to_ocr.append(img) | |
| labels.append(f"{os.path.basename(real_path)} - Page {idx}") | |
| else: | |
| img = Image.open(real_path) | |
| if img.mode == "RGBA": | |
| img = img.convert("RGB") | |
| img = resize_to_max_bounds(img) | |
| images_to_ocr.append(img) | |
| labels.append(os.path.basename(real_path)) | |
| total = len(images_to_ocr) | |
| progress(0.03, desc=f"Loaded {total} pages/images") | |
| # PARALLEL OCR | |
| results = {} | |
| start = time.time() | |
| with ThreadPoolExecutor(max_workers=4) as ex: | |
| futures = [] | |
| for img, lbl in zip(images_to_ocr, labels): | |
| futures.append( | |
| ex.submit( | |
| ocr_single_page, img, lbl, | |
| api_key, model, task_type, | |
| max_tokens, temperature, top_p, repetition_penalty | |
| ) | |
| ) | |
| done = 0 | |
| for f in as_completed(futures): | |
| lbl, txt = f.result() | |
| results[lbl] = txt | |
| done += 1 | |
| elapsed = time.time() - start | |
| eta = (total - done) * (elapsed / max(done, 1)) | |
| progress(done / total, | |
| desc=f"OCR {done}/{total} | ETA {eta:.1f}s") | |
| progress(1, desc="OCR Completed β") | |
| # MERGE RESULT | |
| merged = "" | |
| for lbl in sorted(results.keys()): | |
| merged += f"## {lbl}\n{results[lbl]}\n\n" | |
| out_path = f"/tmp/ocr_{uuid.uuid4().hex}.txt" | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| f.write(merged) | |
| return merged, out_path | |
| # ================================================================ | |
| # UI | |
| # ================================================================ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # π Typhoon OCR v1.5 | |
| ### Multi-file OCR β’ Parallel Processing β’ ETA β’ PDF/Image Support | |
| β‘ **High-speed OCR powered by Typhoon** | |
| π Upload **multiple images or PDFs** | |
| π Parallel OCR with ETA | |
| π Auto preview grid for all pages | |
| --- | |
| ## π Get Your API Key | |
| π https://playground.opentyphoon.ai/settings/api-key | |
| After logging in, look at the **top-right corner** β you'll see **API Key** menu. | |
| Click it to generate or copy your key. | |
| """) | |
| gr.Markdown("### π How to get API Key (step-by-step)") | |
| with gr.Row(): | |
| gr.Gallery( | |
| [ | |
| ("ocr_login.png", "1) Login"), | |
| ("ocr_first.png", "2) Find API Key Menu"), | |
| ("ocr_getkey.png", "3) Copy Your Key"), | |
| ], | |
| columns=3, | |
| height=250, | |
| show_label=False, | |
| ) | |
| file_input = gr.Files(label="Upload images or PDFs", file_count="multiple") | |
| preview_gallery = gr.Gallery(label="Preview", columns=3, height="auto") | |
| file_input.change(preview_files, inputs=file_input, outputs=preview_gallery) | |
| # ADVANCED SETTINGS | |
| with gr.Accordion("βοΈ Advanced Settings", open=False): | |
| model_box = gr.Textbox(value="typhoon-ocr", label="Model") | |
| task_type_box = gr.Textbox(value="v1.5", label="Task Type") | |
| max_tokens_box = gr.Number(value=16000, label="Max Tokens") | |
| temperature_box = gr.Number(value=0.1, label="Temperature") | |
| top_p_box = gr.Number(value=0.6, label="Top-p") | |
| repetition_penalty_box = gr.Number(value=1.2, label="Repetition Penalty") | |
| api_key_box = gr.Textbox(label="API Key", type="password") | |
| run_btn = gr.Button("π Run OCR") | |
| output_box = gr.Markdown(label="OCR Output") | |
| download_btn = gr.File(label="Download (.txt)") | |
| run_btn.click( | |
| extract_text, | |
| inputs=[ | |
| file_input, | |
| api_key_box, | |
| model_box, | |
| task_type_box, | |
| max_tokens_box, | |
| temperature_box, | |
| top_p_box, | |
| repetition_penalty_box, | |
| ], | |
| outputs=[output_box, download_btn], | |
| ) | |
| demo.launch() | |