fruk19's picture
update resize image
1dc843e verified
import gradio as gr
import fitz # PyMuPDF
from PIL import Image
import requests, json, uuid, os
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# ================================================================
# FILE NORMALIZER (IMPORTANT for HuggingFace Spaces)
# ================================================================
def resolve_file(file):
"""
Normalize Gradio file object into a real filesystem file path.
Handles:
- dict {name, data} (HF Spaces)
- NamedString
- tempfile object
"""
# Case 1: HF dict
if isinstance(file, dict) and "data" in file:
raw = file["data"]
fname = file.get("name", f"{uuid.uuid4().hex}.bin")
path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(fname)}"
with open(path, "wb") as f:
f.write(raw if isinstance(raw, bytes) else raw.read())
return path
# Case 2: NamedString (file.name only)
if hasattr(file, "name") and not hasattr(file, "path"):
tmp_path = f"/tmp/{uuid.uuid4().hex}_{os.path.basename(file.name)}"
with open(tmp_path, "wb") as f:
f.write(open(file.name, "rb").read())
return tmp_path
# Case 3: normal tempfile with path
if hasattr(file, "name"):
return file.name
raise ValueError("Unsupported file format:", file)
# ================================================================
# UNIVERSAL RESIZE: max bounds 800Γ—1800, 1800Γ—800, 1200Γ—1200
# ================================================================
def resize_to_max_bounds(img,
max_w1=800, max_h1=1800,
max_w2=1800, max_h2=800,
max_ws=1200, max_hs=1200):
"""Resize image so it stays under max bounds while preserving aspect ratio."""
w, h = img.size
bounds = [
(max_w1, max_h1),
(max_w2, max_h2),
(max_ws, max_hs),
]
scale = 1.0
for max_w, max_h in bounds:
scale_w = max_w / w
scale_h = max_h / h
scale = min(scale, min(scale_w, scale_h))
if scale >= 1.0:
return img
new_size = (int(w * scale), int(h * scale))
return img.resize(new_size, Image.Resampling.LANCZOS)
# ================================================================
# Preview resize
# ================================================================
def resize_preview(img, max_size=400):
w, h = img.size
if max(w, h) <= max_size:
return img
scale = max_size / max(w, h)
return img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS)
# ================================================================
# Typhoon OCR API call
# ================================================================
def run_typhoon_ocr(img_bytes, api_key, model, task_type,
max_tokens, temperature, top_p, repetition_penalty):
url = "https://api.opentyphoon.ai/v1/ocr"
files = {"file": ("page.jpg", img_bytes, "image/jpeg")}
data = {
"model": model,
"task_type": task_type,
"max_tokens": str(max_tokens),
"temperature": str(temperature),
"top_p": str(top_p),
"repetition_penalty": str(repetition_penalty),
}
headers = {"Authorization": f"Bearer {api_key}"}
r = requests.post(url, files=files, data=data, headers=headers)
if r.status_code != 200:
return f"❌ Error {r.status_code}\n{r.text}"
result = r.json()
texts = []
for page in result.get("results", []):
if page.get("success") and page.get("message"):
content = page["message"]["choices"][0]["message"]["content"]
try:
parsed = json.loads(content)
text = parsed.get("natural_text", content)
except:
text = content
texts.append(text)
else:
texts.append(f"❌ Error: {page.get('error')}")
return "\n\n".join(texts)
# ================================================================
# PDF β†’ Images (PyMuPDF)
# ================================================================
def pdf_to_images_pymupdf(pdf_path, dpi=220):
doc = fitz.open(pdf_path)
zoom = dpi / 72
mat = fitz.Matrix(zoom, zoom)
images = []
for page in doc:
pix = page.get_pixmap(matrix=mat)
img = Image.open(BytesIO(pix.tobytes("png")))
images.append(img)
return images
# ================================================================
# PREVIEW (GRID)
# ================================================================
def preview_files(files):
previews = []
for file in files:
real_path = resolve_file(file)
fp = real_path.lower()
if fp.endswith(".pdf"):
pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=120)
for img in pdf_imgs:
img = resize_to_max_bounds(img)
previews.append(resize_preview(img))
else:
img = Image.open(real_path)
if img.mode == "RGBA":
img = img.convert("RGB")
img = resize_to_max_bounds(img)
previews.append(resize_preview(img))
return previews
# ================================================================
# OCR 1 PAGE (PARALLEL)
# ================================================================
def ocr_single_page(page_img, label,
api_key, model, task_type, max_tokens,
temperature, top_p, repetition_penalty):
buf = BytesIO()
page_img.convert("RGB").save(buf, format="JPEG")
buf.seek(0)
txt = run_typhoon_ocr(
buf.getvalue(), api_key, model, task_type,
max_tokens, temperature, top_p, repetition_penalty
)
return label, txt
# ================================================================
# MAIN OCR LOGIC
# ================================================================
def extract_text(files,
api_key, model, task_type, max_tokens,
temperature, top_p, repetition_penalty,
progress=gr.Progress(track_tqdm=True)):
if not files:
return "❌ No files uploaded.", None
images_to_ocr = []
labels = []
# LOAD FILES
for file in files:
real_path = resolve_file(file)
fp = real_path.lower()
if fp.endswith(".pdf"):
pdf_imgs = pdf_to_images_pymupdf(real_path, dpi=220)
for idx, img in enumerate(pdf_imgs, start=1):
img = resize_to_max_bounds(img)
images_to_ocr.append(img)
labels.append(f"{os.path.basename(real_path)} - Page {idx}")
else:
img = Image.open(real_path)
if img.mode == "RGBA":
img = img.convert("RGB")
img = resize_to_max_bounds(img)
images_to_ocr.append(img)
labels.append(os.path.basename(real_path))
total = len(images_to_ocr)
progress(0.03, desc=f"Loaded {total} pages/images")
# PARALLEL OCR
results = {}
start = time.time()
with ThreadPoolExecutor(max_workers=4) as ex:
futures = []
for img, lbl in zip(images_to_ocr, labels):
futures.append(
ex.submit(
ocr_single_page, img, lbl,
api_key, model, task_type,
max_tokens, temperature, top_p, repetition_penalty
)
)
done = 0
for f in as_completed(futures):
lbl, txt = f.result()
results[lbl] = txt
done += 1
elapsed = time.time() - start
eta = (total - done) * (elapsed / max(done, 1))
progress(done / total,
desc=f"OCR {done}/{total} | ETA {eta:.1f}s")
progress(1, desc="OCR Completed βœ”")
# MERGE RESULT
merged = ""
for lbl in sorted(results.keys()):
merged += f"## {lbl}\n{results[lbl]}\n\n"
out_path = f"/tmp/ocr_{uuid.uuid4().hex}.txt"
with open(out_path, "w", encoding="utf-8") as f:
f.write(merged)
return merged, out_path
# ================================================================
# UI
# ================================================================
with gr.Blocks() as demo:
gr.Markdown("""
# πŸ” Typhoon OCR v1.5
### Multi-file OCR β€’ Parallel Processing β€’ ETA β€’ PDF/Image Support
⚑ **High-speed OCR powered by Typhoon**
πŸ“„ Upload **multiple images or PDFs**
πŸš€ Parallel OCR with ETA
πŸ” Auto preview grid for all pages
---
## πŸ”‘ Get Your API Key
πŸ‘‰ https://playground.opentyphoon.ai/settings/api-key
After logging in, look at the **top-right corner** β†’ you'll see **API Key** menu.
Click it to generate or copy your key.
""")
gr.Markdown("### πŸ“˜ How to get API Key (step-by-step)")
with gr.Row():
gr.Gallery(
[
("ocr_login.png", "1) Login"),
("ocr_first.png", "2) Find API Key Menu"),
("ocr_getkey.png", "3) Copy Your Key"),
],
columns=3,
height=250,
show_label=False,
)
file_input = gr.Files(label="Upload images or PDFs", file_count="multiple")
preview_gallery = gr.Gallery(label="Preview", columns=3, height="auto")
file_input.change(preview_files, inputs=file_input, outputs=preview_gallery)
# ADVANCED SETTINGS
with gr.Accordion("βš™οΈ Advanced Settings", open=False):
model_box = gr.Textbox(value="typhoon-ocr", label="Model")
task_type_box = gr.Textbox(value="v1.5", label="Task Type")
max_tokens_box = gr.Number(value=16000, label="Max Tokens")
temperature_box = gr.Number(value=0.1, label="Temperature")
top_p_box = gr.Number(value=0.6, label="Top-p")
repetition_penalty_box = gr.Number(value=1.2, label="Repetition Penalty")
api_key_box = gr.Textbox(label="API Key", type="password")
run_btn = gr.Button("πŸš€ Run OCR")
output_box = gr.Markdown(label="OCR Output")
download_btn = gr.File(label="Download (.txt)")
run_btn.click(
extract_text,
inputs=[
file_input,
api_key_box,
model_box,
task_type_box,
max_tokens_box,
temperature_box,
top_p_box,
repetition_penalty_box,
],
outputs=[output_box, download_btn],
)
demo.launch()