""" DeepSeek-OCR-2 API — HuggingFace Spaces (CPU) ============================================== POST /ocr – صورة + bbox واحد اختياري POST /ocr/batch – صورة + قائمة boxes دفعة واحدة ← الجديد POST /ocr/base64 – JSON base64 GET /health – فحص الحالة GET /demo – واجهة ويب مدمجة """ import os, io, base64, json, tempfile, logging, time from contextlib import asynccontextmanager from typing import Optional, List import torch from PIL import Image from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, HTMLResponse from transformers import AutoModel, AutoTokenizer from pydantic import BaseModel logging.basicConfig(level=logging.INFO) log = logging.getLogger("ocr-api") MODEL_NAME = "deepseek-ai/DeepSeek-OCR-2" model = None tokenizer = None # ─── Startup ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): global model, tokenizer log.info("Loading %s ...", MODEL_NAME) tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True) model = AutoModel.from_pretrained( MODEL_NAME, _attn_implementation="eager", trust_remote_code=True, torch_dtype=torch.bfloat16, ) model.eval() log.info("Model ready (cpu, bfloat16)") yield del model, tokenizer # ─── App ────────────────────────────────────────────────────────────────────── app = FastAPI(title="DeepSeek-OCR-2 API", version="2.0.0", lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # ─── CPU monkey-patch context manager ──────────────────────────────────────── from contextlib import contextmanager @contextmanager def force_cpu(): """ DeepSeek-OCR-2's model.infer() has two CPU-breaking issues: 1. Hardcodes .cuda() calls → patched: .cuda() becomes a no-op 2. Casts tensors to bfloat16 while model weights are float32 → patched: bfloat16 requests are silently changed to float32 3. Uses torch.autocast("cuda") which can still cast internally → patched: autocast is replaced with a no-op context manager All patches are reverted after the 'with' block. """ import contextlib _tensor_cuda = torch.Tensor.cuda _module_cuda = torch.nn.Module.cuda _tensor_to = torch.Tensor.to _module_to = torch.nn.Module.to _tensor_bf16 = torch.Tensor.bfloat16 # model may call .bfloat16() directly _autocast = torch.autocast # 1. .cuda() → stay on CPU (no-op) def _noop_tensor_cuda(self, device=None, *args, **kwargs): return self def _noop_module_cuda(self, device=None): return self # 2a. .to() → strip CUDA device args; keep dtype as-is # (model is loaded in bfloat16 so dtype is already consistent) def _safe_tensor_to(self, *args, **kwargs): new_args = [a for a in args if not (isinstance(a, (str, torch.device)) and "cuda" in str(a))] kwargs.pop("device", None) if not new_args and not kwargs: return self try: return _tensor_to(self, *new_args, **kwargs) except Exception: return self def _safe_module_to(self, *args, **kwargs): new_args = [a for a in args if not (isinstance(a, (str, torch.device)) and "cuda" in str(a))] kwargs.pop("device", None) if not new_args and not kwargs: return self try: return _module_to(self, *new_args, **kwargs) except Exception: return self # 2b. .bfloat16() direct calls → no-op (tensor already in bfloat16) def _noop_tensor_bf16(self): return self # 3. torch.autocast("cuda", ...) → nullcontext (no-op on CPU) def _noop_autocast(*args, **kwargs): return contextlib.nullcontext() torch.Tensor.cuda = _noop_tensor_cuda torch.nn.Module.cuda = _noop_module_cuda torch.Tensor.to = _safe_tensor_to torch.nn.Module.to = _safe_module_to torch.Tensor.bfloat16 = _noop_tensor_bf16 torch.autocast = _noop_autocast try: yield finally: torch.Tensor.cuda = _tensor_cuda torch.nn.Module.cuda = _module_cuda torch.Tensor.to = _tensor_to torch.nn.Module.to = _module_to torch.Tensor.bfloat16 = _tensor_bf16 torch.autocast = _autocast # ─── Core OCR inference ─────────────────────────────────────────────────────── def run_ocr(pil_image: Image.Image, mode: str = "free") -> str: """ Run DeepSeek-OCR-2 on a PIL image and return extracted text. Works on both CPU (HF free tier) and GPU. """ prompt_text = ( "Convert the document to markdown." if mode == "markdown" else "Please OCR the image and return all text exactly." ) with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: tmp_path = tmp.name pil_image.save(tmp_path, format="PNG") try: if hasattr(model, "infer"): # ── Strategy 1: capture stdout ────────────────────────────────── # model.infer() prints the OCR result to stdout instead of returning it. # We capture stdout + also try save_results=True as backup. import io, sys from contextlib import redirect_stdout with tempfile.TemporaryDirectory() as out_dir: stdout_buf = io.StringIO() with force_cpu(), redirect_stdout(stdout_buf): result = model.infer( tokenizer, prompt=f"\n{prompt_text}", image_file=tmp_path, output_path=out_dir, base_size=1024, image_size=768, crop_mode=True, save_results=True, # also write to file as backup ) # Echo captured stdout to real stdout for server logs captured = stdout_buf.getvalue() sys.stdout.write(captured) sys.stdout.flush() # ── Extract text: priority order ───────────────────────────── text = "" # 1) Return value (if model returns text directly) if result: if isinstance(result, dict): text = result.get("text", result.get("output", "")) elif isinstance(result, str): text = result # 2) Captured stdout (most reliable for this model) if not text and captured: # The model prints: "===================== " # Strip the separator and any leading/trailing whitespace cleaned = captured.strip() for sep in ["=====================", "=====", "-----"]: if sep in cleaned: cleaned = cleaned.split(sep, 1)[-1].strip() break text = cleaned # 3) Output files written by save_results=True if not text: import glob for ext in ["*.txt", "*.md", "*.json"]: files = glob.glob(os.path.join(out_dir, "**", ext), recursive=True) for fpath in files: try: with open(fpath, "r", encoding="utf-8") as f: file_text = f.read().strip() if file_text: text = file_text break except Exception: pass if text: break return text # ── Fallback: standard generate() if model.infer() is not available ── messages = [{"role": "user", "content": [ {"type": "image", "image": tmp_path}, {"type": "text", "text": prompt_text}, ]}] text_in = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = tokenizer(text_in, return_tensors="pt") with torch.no_grad(): out = model.generate(**inputs, max_new_tokens=1024, do_sample=False) new_ids = out[:, inputs["input_ids"].shape[1]:] return tokenizer.decode(new_ids[0], skip_special_tokens=True) finally: os.unlink(tmp_path) def crop_img(img: Image.Image, x: int, y: int, w: int, h: int) -> Image.Image: iw, ih = img.size x1, y1 = max(0, x), max(0, y) x2, y2 = min(iw, x + w), min(ih, y + h) if x2 <= x1 or y2 <= y1: raise ValueError(f"Invalid bbox x={x} y={y} w={w} h={h} for {iw}×{ih} image") return img.crop((x1, y1, x2, y2)) # ─── Routes ─────────────────────────────────────────────────────────────────── @app.get("/") async def root(): return {"status": "ok", "model": MODEL_NAME, "device": "cpu", "demo": "/demo", "docs": "/docs"} @app.get("/health") async def health(): return {"status": "ok", "model_loaded": model is not None} # ── /ocr (صورة + bbox واحد اختياري) ───────────────────────────────────────── @app.post("/ocr") async def ocr_single( image: UploadFile = File(...), x: Optional[int] = Form(None), y: Optional[int] = Form(None), w: Optional[int] = Form(None), h: Optional[int] = Form(None), box_id: Optional[int] = Form(None, description="رقم المربع للتعرف عليه في النتيجة"), mode: str = Form("free"), ): if model is None: raise HTTPException(503, "Model not loaded yet — wait a moment and retry") data = await image.read() try: pil = Image.open(io.BytesIO(data)).convert("RGB") except Exception as e: raise HTTPException(400, f"Cannot decode image: {e}") img_w, img_h = pil.size cropped = False if all(v is not None for v in [x, y, w, h]): try: pil = crop_img(pil, x, y, w, h) cropped = True except ValueError as e: raise HTTPException(400, str(e)) t0 = time.time() try: text = run_ocr(pil, mode=mode) except Exception as e: log.exception("OCR error") raise HTTPException(500, f"OCR failed: {e}") return JSONResponse({ "box_id": box_id, "text": text, "mode": mode, "cropped": cropped, "bbox": {"x": x, "y": y, "w": w, "h": h} if cropped else None, "image_size": {"w": img_w, "h": img_h}, "elapsed_sec": round(time.time() - t0, 2), }) # ── /ocr/batch (صورة + قائمة boxes JSON دفعة واحدة) ──────────────────────── @app.post("/ocr/batch") async def ocr_batch( image: UploadFile = File(...), boxes: str = Form(..., description=""" JSON array of box objects, e.g.: [{"id":1,"x":10,"y":20,"w":100,"h":50}, {"id":2,"x":200,"y":30,"w":150,"h":60}] id, x, y, w, h are all required per box. """), mode: str = Form("free"), ): """ استقبال صورة + قائمة مربعات JSON → OCR لكل مربع → نتائج مرتبة بنفس الترتيب. طلب واحد بدلاً من N طلب منفصل. """ if model is None: raise HTTPException(503, "Model not loaded yet") # ── parse image ────────────────────────────────────────────────────────── data = await image.read() try: pil_full = Image.open(io.BytesIO(data)).convert("RGB") except Exception as e: raise HTTPException(400, f"Cannot decode image: {e}") img_w, img_h = pil_full.size # ── parse boxes JSON ───────────────────────────────────────────────────── try: box_list = json.loads(boxes) if not isinstance(box_list, list): raise ValueError("boxes must be a JSON array") for b in box_list: for k in ("id", "x", "y", "w", "h"): if k not in b: raise ValueError(f"Each box must have '{k}' field") except (json.JSONDecodeError, ValueError) as e: raise HTTPException(400, f"Invalid boxes JSON: {e}") # ── process each box ───────────────────────────────────────────────────── t_total = time.time() results = [] for b in box_list: bid = b["id"] t0 = time.time() try: cropped_pil = crop_img(pil_full, b["x"], b["y"], b["w"], b["h"]) text = run_ocr(cropped_pil, mode=mode) status = "ok" error = None except ValueError as e: text = "" status = "invalid_bbox" error = str(e) except Exception as e: log.exception("OCR error box_id=%s", bid) text = "" status = "error" error = str(e) results.append({ "box_id": bid, "x": b["x"], "y": b["y"], "w": b["w"], "h": b["h"], "text": text, "status": status, "error": error, "elapsed_sec": round(time.time() - t0, 2), }) log.info("box %s done in %.1fs — status=%s", bid, results[-1]["elapsed_sec"], status) return JSONResponse({ "mode": mode, "image_size": {"w": img_w, "h": img_h}, "total_boxes": len(results), "total_elapsed_sec": round(time.time() - t_total, 2), "results": results, }) # ── /ocr/base64 (JSON body بدل form-data) ──────────────────────────────────── class BoxItem(BaseModel): id: int x: int y: int w: int h: int class OCRBatchB64Request(BaseModel): image_b64: str boxes: List[BoxItem] mode: str = "free" class OCRSingleB64Request(BaseModel): image_b64: str box_id: Optional[int] = None x: Optional[int] = None y: Optional[int] = None w: Optional[int] = None h: Optional[int] = None mode: str = "free" @app.post("/ocr/base64") async def ocr_base64(req: OCRSingleB64Request): if model is None: raise HTTPException(503, "Model not loaded yet") try: pil = Image.open(io.BytesIO(base64.b64decode(req.image_b64))).convert("RGB") except Exception as e: raise HTTPException(400, f"Bad base64: {e}") img_w, img_h = pil.size cropped = False if all(v is not None for v in [req.x, req.y, req.w, req.h]): try: pil = crop_img(pil, req.x, req.y, req.w, req.h) cropped = True except ValueError as e: raise HTTPException(400, str(e)) t0 = time.time() try: text = run_ocr(pil, mode=req.mode) except Exception as e: raise HTTPException(500, f"OCR failed: {e}") return JSONResponse({ "box_id": req.box_id, "text": text, "mode": req.mode, "cropped": cropped, "bbox": {"x": req.x, "y": req.y, "w": req.w, "h": req.h} if cropped else None, "image_size": {"w": img_w, "h": img_h}, "elapsed_sec": round(time.time() - t0, 2), }) @app.post("/ocr/batch/base64") async def ocr_batch_base64(req: OCRBatchB64Request): if model is None: raise HTTPException(503, "Model not loaded yet") try: pil_full = Image.open(io.BytesIO(base64.b64decode(req.image_b64))).convert("RGB") except Exception as e: raise HTTPException(400, f"Bad base64: {e}") img_w, img_h = pil_full.size t_total = time.time() results = [] for b in req.boxes: t0 = time.time() try: cropped_pil = crop_img(pil_full, b.x, b.y, b.w, b.h) text = run_ocr(cropped_pil, mode=req.mode) status, err = "ok", None except ValueError as e: text, status, err = "", "invalid_bbox", str(e) except Exception as e: text, status, err = "", "error", str(e) results.append({ "box_id": b.id, "x": b.x, "y": b.y, "w": b.w, "h": b.h, "text": text, "status": status, "error": err, "elapsed_sec": round(time.time() - t0, 2), }) return JSONResponse({ "mode": req.mode, "image_size": {"w": img_w, "h": img_h}, "total_boxes": len(results), "total_elapsed_sec": round(time.time() - t_total, 2), "results": results, }) # ─── Embedded Demo Page ─────────────────────────────────────────────────────── @app.get("/demo", response_class=HTMLResponse) async def demo(): return HTMLResponse(content=DEMO_HTML) DEMO_HTML = r""" OCR Batch — استخراج النص

🔍 OCR Batch — تحديد مربعات متعددة

أدخل رابط الـ API ثم ارفع صورة

اسحب الصورة هنا أو اضغط للاختيار

JPG · PNG · WEBP

0 مربع
إرسال:
"""