Spaces:
Sleeping
Sleeping
| import os | |
| # Disable CUDA paths before importing torch | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" | |
| import numpy as np # IMPORTANT: must be before torch in some environments | |
| import torch | |
| import gradio as gr | |
| from transformers import AutoModel, AutoTokenizer | |
| import tempfile | |
| import shutil | |
| from PIL import Image, ImageDraw, ImageFont, ImageOps | |
| import fitz # PyMuPDF | |
| import re | |
| import base64 | |
| from io import StringIO, BytesIO | |
| """ | |
| DeepSeek-OCR (CPU-only) Space app | |
| - No FlashAttention / no CUDA required. | |
| - Designed to run on Hugging Face CPU spaces (VERY SLOW). | |
| """ | |
| MODEL_NAME = "deepseek-ai/DeepSeek-OCR" | |
| # Keep CPU threads reasonable (optional) | |
| try: | |
| torch.set_num_threads(max(1, min(8, os.cpu_count() or 1))) | |
| except Exception: | |
| pass | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) | |
| model = AutoModel.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.float32, | |
| trust_remote_code=True, | |
| use_safetensors=True, | |
| ) | |
| model = model.eval() # stays on CPU | |
| MODEL_CONFIGS = { | |
| "Tiny": {"base_size": 512, "image_size": 512, "crop_mode": False}, | |
| "Small": {"base_size": 640, "image_size": 640, "crop_mode": False}, | |
| "Base": {"base_size": 1024, "image_size": 1024, "crop_mode": False}, | |
| "Gundam": {"base_size": 1024, "image_size": 640, "crop_mode": True}, | |
| "Large": {"base_size": 1280, "image_size": 1280, "crop_mode": False}, | |
| } | |
| TASK_PROMPTS = { | |
| "๐ Markdown": {"prompt": "<image>\n<|grounding|>Convert the document to markdown.", "has_grounding": True}, | |
| "๐ Free OCR": {"prompt": "<image>\nFree OCR.", "has_grounding": False}, | |
| "๐ Locate": {"prompt": "<image>\nLocate <|ref|>text<|/ref|> in the image.", "has_grounding": True}, | |
| "๐ Describe": {"prompt": "<image>\nDescribe this image in detail.", "has_grounding": False}, | |
| "โ๏ธ Custom": {"prompt": "", "has_grounding": False}, | |
| } | |
| def extract_grounding_references(text: str): | |
| pattern = r'(<\|ref\|>(.*?)<\|/ref\|><\|det\|>(.*?)<\|/det\|>)' | |
| return re.findall(pattern, text, re.DOTALL) | |
| def draw_bounding_boxes(image: Image.Image, refs, extract_images: bool = False): | |
| img_w, img_h = image.size | |
| img_draw = image.copy() | |
| draw = ImageDraw.Draw(img_draw) | |
| overlay = Image.new("RGBA", img_draw.size, (0, 0, 0, 0)) | |
| draw2 = ImageDraw.Draw(overlay) | |
| font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" | |
| try: | |
| font = ImageFont.truetype(font_path, 30) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| crops = [] | |
| color_map = {} | |
| np.random.seed(42) | |
| for ref in refs: | |
| label = ref[1] | |
| if label not in color_map: | |
| color_map[label] = ( | |
| int(np.random.randint(50, 255)), | |
| int(np.random.randint(50, 255)), | |
| int(np.random.randint(50, 255)), | |
| ) | |
| color = color_map[label] | |
| try: | |
| coords = eval(ref[2]) | |
| except Exception: | |
| continue | |
| color_a = color + (60,) | |
| for box in coords: | |
| x1, y1, x2, y2 = ( | |
| int(box[0] / 999 * img_w), | |
| int(box[1] / 999 * img_h), | |
| int(box[2] / 999 * img_w), | |
| int(box[3] / 999 * img_h), | |
| ) | |
| if extract_images and label == "image": | |
| crops.append(image.crop((x1, y1, x2, y2))) | |
| width = 5 if label == "title" else 3 | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=width) | |
| draw2.rectangle([x1, y1, x2, y2], fill=color_a) | |
| try: | |
| text_bbox = draw.textbbox((0, 0), label, font=font) | |
| tw, th = text_bbox[2] - text_bbox[0], text_bbox[3] - text_bbox[1] | |
| except Exception: | |
| tw, th = (len(label) * 10, 20) | |
| ty = max(0, y1 - 20) | |
| draw.rectangle([x1, ty, x1 + tw + 4, ty + th + 4], fill=color) | |
| draw.text((x1 + 2, ty + 2), label, font=font, fill=(255, 255, 255)) | |
| img_draw.paste(overlay, (0, 0), overlay) | |
| return img_draw, crops | |
| def clean_output(text: str, include_images: bool = False) -> str: | |
| if not text: | |
| return "" | |
| pattern = r'(<\|ref\|>(.*?)<\|/ref\|><\|det\|>(.*?)<\|/det\|>)' | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| img_num = 0 | |
| for match in matches: | |
| if "<|ref|>image<|/ref|>" in match[0]: | |
| if include_images: | |
| text = text.replace(match[0], f"\n\n**[Figure {img_num + 1}]**\n\n", 1) | |
| img_num += 1 | |
| else: | |
| text = text.replace(match[0], "", 1) | |
| else: | |
| text = re.sub(rf"(?m)^[^\n]*{re.escape(match[0])}[^\n]*\n?", "", text) | |
| return text.strip() | |
| def embed_images(markdown: str, crops): | |
| if not crops: | |
| return markdown | |
| for i, img in enumerate(crops): | |
| buf = BytesIO() | |
| img.save(buf, format="PNG") | |
| b64 = base64.b64encode(buf.getvalue()).decode() | |
| markdown = markdown.replace( | |
| f"**[Figure {i + 1}]**", | |
| f"\n\n\n\n", | |
| 1, | |
| ) | |
| return markdown | |
| def infer_with_model(prompt: str, jpg_path: str, out_dir: str, base_size: int, image_size: int, crop_mode: bool) -> str: | |
| # DeepSeek model prints to stdout; capture it safely. | |
| import sys as _sys | |
| old_stdout = _sys.stdout | |
| _sys.stdout = StringIO() | |
| try: | |
| model.infer( | |
| tokenizer=tokenizer, | |
| prompt=prompt, | |
| image_file=jpg_path, | |
| output_path=out_dir, | |
| base_size=base_size, | |
| image_size=image_size, | |
| crop_mode=crop_mode, | |
| ) | |
| raw = _sys.stdout.getvalue() | |
| finally: | |
| _sys.stdout = old_stdout | |
| return raw | |
| def process_image(image: Image.Image, mode: str, task: str, custom_prompt: str): | |
| if image is None: | |
| return "Error: Upload image", "", "", None, [] | |
| if task in ["โ๏ธ Custom", "๐ Locate"] and not custom_prompt.strip(): | |
| return "Error: Enter prompt", "", "", None, [] | |
| if image.mode in ("RGBA", "LA", "P"): | |
| image = image.convert("RGB") | |
| image = ImageOps.exif_transpose(image) | |
| config = MODEL_CONFIGS[mode] | |
| if task == "โ๏ธ Custom": | |
| prompt = f"<image>\n{custom_prompt.strip()}" | |
| has_grounding = "<|grounding|>" in custom_prompt | |
| elif task == "๐ Locate": | |
| prompt = f"<image>\nLocate <|ref|>{custom_prompt.strip()}<|/ref|> in the image." | |
| has_grounding = True | |
| else: | |
| prompt = TASK_PROMPTS[task]["prompt"] | |
| has_grounding = TASK_PROMPTS[task]["has_grounding"] | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") | |
| image.save(tmp.name, "JPEG", quality=95) | |
| tmp.close() | |
| out_dir = tempfile.mkdtemp() | |
| try: | |
| raw_stdout = infer_with_model( | |
| prompt=prompt, | |
| jpg_path=tmp.name, | |
| out_dir=out_dir, | |
| base_size=config["base_size"], | |
| image_size=config["image_size"], | |
| crop_mode=config["crop_mode"], | |
| ) | |
| # Filter noisy lines (progress/debug) | |
| result = "\n".join( | |
| [ | |
| l | |
| for l in raw_stdout.split("\n") | |
| if not any( | |
| s in l | |
| for s in [ | |
| "image:", | |
| "other:", | |
| "PATCHES", | |
| "====", | |
| "BASE:", | |
| "%|", | |
| "torch.Size", | |
| ] | |
| ) | |
| ] | |
| ).strip() | |
| if not result: | |
| return "No text", "", "", None, [] | |
| cleaned = clean_output(result, False) | |
| markdown = clean_output(result, True) | |
| img_out = None | |
| crops = [] | |
| if has_grounding and "<|ref|>" in result: | |
| refs = extract_grounding_references(result) | |
| if refs: | |
| img_out, crops = draw_bounding_boxes(image, refs, True) | |
| markdown = embed_images(markdown, crops) | |
| return cleaned, markdown, result, img_out, crops | |
| except Exception as e: | |
| return f"Runtime error: {type(e).__name__}: {e}", "", "", None, [] | |
| finally: | |
| try: | |
| os.unlink(tmp.name) | |
| except Exception: | |
| pass | |
| shutil.rmtree(out_dir, ignore_errors=True) | |
| def process_pdf(path: str, mode: str, task: str, custom_prompt: str): | |
| doc = fitz.open(path) | |
| total_pages = len(doc) | |
| all_cleaned, all_markdown, all_raw, all_crops = [], [], [], [] | |
| img_out = None | |
| try: | |
| for page_idx in range(total_pages): | |
| page = doc.load_page(page_idx) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(300 / 72, 300 / 72), alpha=False) | |
| img = Image.open(BytesIO(pix.tobytes("png"))) | |
| cleaned, markdown, raw, page_img_out, page_crops = process_image(img, mode, task, custom_prompt) | |
| all_cleaned.append(cleaned) | |
| all_markdown.append(markdown) | |
| all_raw.append(raw) | |
| all_crops.extend(page_crops) | |
| if page_img_out is not None: | |
| img_out = page_img_out | |
| combined_cleaned = "\n\n--- Page Break ---\n\n".join(all_cleaned) | |
| combined_markdown = "\n\n--- Page Break ---\n\n".join(all_markdown) | |
| combined_raw = "\n\n--- Page Break ---\n\n".join(all_raw) | |
| return combined_cleaned, combined_markdown, combined_raw, img_out, all_crops | |
| finally: | |
| doc.close() | |
| def run(image, file_path, mode, task, custom_prompt): | |
| if file_path: | |
| if file_path.lower().endswith(".pdf"): | |
| return process_pdf(file_path, mode, task, custom_prompt) | |
| return process_image(Image.open(file_path), mode, task, custom_prompt) | |
| if image is not None: | |
| return process_image(image, mode, task, custom_prompt) | |
| return "Error: upload file or image", "", "", None, [] | |
| def toggle_prompt(task): | |
| if task == "โ๏ธ Custom": | |
| return gr.update(visible=True, label="Custom Prompt", placeholder="Add <|grounding|> for boxes") | |
| if task == "๐ Locate": | |
| return gr.update(visible=True, label="Text to Locate", placeholder="Enter text") | |
| return gr.update(visible=False) | |
| with gr.Blocks(theme=gr.themes.Soft(), title="DeepSeek-OCR (CPU)") as demo: | |
| gr.Markdown( | |
| """ | |
| # ๐ข DeepSeek-OCR (CPU) | |
| โ ๏ธ CPU is **very slow** and may fail on large images/PDFs due to RAM/time limits. | |
| Prefer **Tiny/Small** mode on CPU. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_in = gr.File(label="Upload Image or PDF", file_types=["image", ".pdf"], type="filepath") | |
| input_img = gr.Image(label="Input Image", type="pil", height=300) | |
| mode = gr.Dropdown(list(MODEL_CONFIGS.keys()), value="Tiny", label="Mode") | |
| task = gr.Dropdown(list(TASK_PROMPTS.keys()), value="๐ Free OCR", label="Task") | |
| prompt = gr.Textbox(label="Prompt", lines=2, visible=False) | |
| btn = gr.Button("Extract", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.Tab("Text"): | |
| text_out = gr.Textbox(lines=20, show_copy_button=True, show_label=False) | |
| with gr.Tab("Markdown Preview"): | |
| md_out = gr.Markdown("") | |
| with gr.Tab("Boxes"): | |
| img_out = gr.Image(type="pil", height=500, show_label=False) | |
| with gr.Tab("Cropped Images"): | |
| gallery = gr.Gallery(show_label=False, columns=3, height=400) | |
| with gr.Tab("Raw Text"): | |
| raw_out = gr.Textbox(lines=20, show_copy_button=True, show_label=False) | |
| task.change(toggle_prompt, [task], [prompt]) | |
| btn.click( | |
| run, | |
| [input_img, file_in, mode, task, prompt], | |
| [text_out, md_out, raw_out, img_out, gallery], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=10).launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |