| | import os |
| | import tempfile |
| | import uuid |
| | import concurrent.futures |
| | from typing import List, Tuple |
| |
|
| | import fitz |
| | import torch |
| | import gradio as gr |
| | import spaces |
| | import easyocr |
| | import warnings |
| |
|
| | |
| | warnings.filterwarnings("ignore", "RNN module weights are not part") |
| |
|
| | |
| | |
| | |
| | SUPPORTED_FILE_TYPES = [ |
| | ".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif" |
| | ] |
| | LANGUAGES = ["en", "nl", "de", "fr", "es", "it", "pt", "ru", "zh_cn", "ja", "ar"] |
| | |
| | OCR_THREADS = min(int(os.getenv("OCR_THREADS", "2")), 2) |
| |
|
| | |
| | |
| | |
| | _READERS = {} |
| |
|
| | def get_reader(lang_codes: Tuple[str, ...]): |
| | """ |
| | Lazily initialize or retrieve an EasyOCR Reader for the given languages. |
| | Uses spaces.is_gpu_enabled() to decide whether to run on GPU or CPU. |
| | """ |
| | key = tuple(sorted(lang_codes)) |
| | if key not in _READERS: |
| | gpu_flag = spaces.is_gpu_enabled() |
| | _READERS[key] = easyocr.Reader(list(key), gpu=gpu_flag) |
| | print(f"[Init] EasyOCR reader for {key} (GPU={'yes' if gpu_flag else 'no'})") |
| | return _READERS[key] |
| |
|
| | |
| | |
| | |
| | @spaces.GPU(duration=600) |
| | def run_ocr_pages(pdf_path: str, page_ids: List[int], lang_codes: Tuple[str, ...]) -> List[Tuple[int, str]]: |
| | """ |
| | OCR the specified pages of a PDF. |
| | Runs only when GPU is allocated (ZeroGPU); falls back to CPU if unavailable. |
| | Processes pages in parallel threads, with per-page error handling. |
| | """ |
| | reader = get_reader(lang_codes) |
| | results = [] |
| |
|
| | with fitz.open(pdf_path) as doc: |
| | def ocr_page(idx: int) -> Tuple[int, str]: |
| | try: |
| | page = doc[idx - 1] |
| | |
| | scale = 2 if max(page.rect.width, page.rect.height) <= 600 else 1.5 |
| | pix = page.get_pixmap(matrix=fitz.Matrix(scale, scale)) |
| | img_path = os.path.join(tempfile.gettempdir(), f"ocr_{uuid.uuid4().hex}.png") |
| | pix.save(img_path) |
| |
|
| | |
| | if len(lang_codes) == 1: |
| | items = reader.readtext(img_path, detail=1) |
| | lines = [t for _, t, conf in items if conf > 0.2] |
| | else: |
| | lines = reader.readtext(img_path, detail=0) |
| |
|
| | os.remove(img_path) |
| | return idx, "\n".join(lines) |
| | except Exception as e: |
| | |
| | msg = f"⚠️ OCR error on page {idx}: {e}" |
| | print(msg) |
| | return idx, msg |
| |
|
| | |
| | workers = min(OCR_THREADS, len(page_ids)) |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: |
| | futures = {pool.submit(ocr_page, pid): pid for pid in page_ids} |
| | for fut in concurrent.futures.as_completed(futures): |
| | results.append(fut.result()) |
| |
|
| | return results |
| |
|
| | def run_ocr_image(image_path: str, lang_codes: Tuple[str, ...]) -> str: |
| | """ |
| | OCR a single image file. |
| | Mirrors run_ocr_pages' logic but for one-shot image inputs. |
| | """ |
| | reader = get_reader(lang_codes) |
| | try: |
| | if len(lang_codes) == 1: |
| | items = reader.readtext(image_path, detail=1) |
| | lines = [t for _, t, conf in items if conf > 0.2] |
| | else: |
| | lines = reader.readtext(image_path, detail=0) |
| | return "\n".join(lines) |
| | except Exception as e: |
| | msg = f"⚠️ OCR error on image: {e}" |
| | print(msg) |
| | return msg |
| |
|
| | |
| | |
| | |
| | def emit_chunk(chunk: str, combined: str, tmp_file) -> Tuple[str, None]: |
| | """ |
| | Append 'chunk' to the in-memory combined text and the temp file, |
| | then return the updated combined text for streaming. |
| | """ |
| | combined += chunk |
| | tmp_file.write(chunk.encode("utf-8")) |
| | return combined, None |
| |
|
| | |
| | |
| | |
| | def pipeline(upload, langs, mode): |
| | """ |
| | Handles PDF or image uploads, emits native and OCR text incrementally, |
| | and provides a downloadable .txt at the end. |
| | """ |
| | if upload is None: |
| | raise gr.Error("Please upload a file.") |
| | |
| | if os.path.getsize(upload.name) > 200 * 1024 * 1024: |
| | raise gr.Error("File larger than 200 MB; please split it.") |
| |
|
| | |
| | langs = langs if isinstance(langs, list) else [langs] |
| | lang_tuple = tuple(langs) |
| | tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") |
| | combined = "" |
| |
|
| | ext = os.path.splitext(upload.name)[1].lower() |
| |
|
| | |
| | if ext == ".pdf": |
| | |
| | progress = gr.Progress(track_tqdm=False) |
| | with fitz.open(upload.name) as doc: |
| | total_pages = doc.page_count |
| |
|
| | |
| | ocr_pages = [] |
| | with fitz.open(upload.name) as doc: |
| | for i, page in enumerate(doc, start=1): |
| | text = page.get_text("text") if mode in ("native", "auto") else "" |
| | if text.strip(): |
| | chunk = f"--- Page {i} (native) ---\n{text}\n" |
| | combined, _ = emit_chunk(chunk, combined, tmp) |
| | yield combined, None |
| | else: |
| | if mode in ("ocr", "auto"): |
| | ocr_pages.append(i) |
| | progress(i / total_pages) |
| |
|
| | |
| | if ocr_pages: |
| | ocr_results = run_ocr_pages(upload.name, ocr_pages, lang_tuple) |
| | for idx, txt in sorted(ocr_results, key=lambda x: x[0]): |
| | chunk = f"--- Page {idx} (OCR) ---\n{txt}\n" |
| | combined, _ = emit_chunk(chunk, combined, tmp) |
| | yield combined, None |
| |
|
| | |
| | else: |
| | txt = run_ocr_image(upload.name, lang_tuple) |
| | chunk = f"--- Image OCR ---\n{txt}\n" |
| | combined, _ = emit_chunk(chunk, combined, tmp) |
| | yield combined, None |
| |
|
| | tmp.close() |
| | |
| | yield combined or "⚠️ No text detected.", tmp.name |
| |
|
| | |
| | |
| | |
| | theme = gr.themes.Base(primary_hue="purple") |
| | with gr.Blocks(theme=theme, title="ZeroGPU OCR PDF & Image Extractor") as demo: |
| | gr.Markdown("## 📚 ZeroGPU Multilingual OCR Extractor") |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | file_in = gr.File(label="Upload PDF or image", |
| | file_types=SUPPORTED_FILE_TYPES) |
| | lang_in = gr.Dropdown(LANGUAGES, multiselect=True, value=["en"], |
| | label="OCR language(s)") |
| | mode_in = gr.Radio(["native", "ocr", "auto"], value="auto", |
| | label="Mode", |
| | info="native=text · ocr=image · auto=mix") |
| | btn = gr.Button("Extract", variant="primary") |
| | with gr.Column(scale=2): |
| | out_txt = gr.Textbox(label="Extracted Text", lines=18, |
| | show_copy_button=True) |
| | dl = gr.File(label="Download .txt") |
| |
|
| | |
| | btn.click( |
| | fn=pipeline, |
| | inputs=[file_in, lang_in, mode_in], |
| | outputs=[out_txt, dl] |
| | ) |
| | demo.queue() |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|