# app.py — DeepSeek-OCR + DeepSeek-R1 Medical Mini (GGUF local rápido) — Gradio 5 import os, tempfile, traceback import gradio as gr import torch from PIL import Image from transformers import AutoModel, AutoTokenizer import spaces from huggingface_hub import hf_hub_download from llama_cpp import Llama # =============================================================== # CHAT: DeepSeek-R1 Medical Mini — SOLO LOCAL (GGUF) para máxima rapidez sin tokens # - Puedes forzar un archivo con GGUF_REPO / GGUF_FILE # - Si no especificas, probamos Q4 (rápido) y caemos a f16 si no está # =============================================================== GGUF_REPO = os.getenv("GGUF_REPO", "mradermacher/DeepSeek-r1-Medical-Mini-GGUF").strip() GGUF_FILE = os.getenv("GGUF_FILE", "").strip() # Orden de preferencia (más rápido -> más pesado). Cambia nombres si tu repo usa otros. _DEFAULT_CANDIDATES = [ "DeepSeek-r1-Medical-Mini.Q4_K_M.gguf", "DeepSeek-r1-Medical-Mini.Q4_0.gguf", "DeepSeek-r1-Medical-Mini.Q5_0.gguf", "DeepSeek-r1-Medical-Mini.Q8_0.gguf", "DeepSeek-r1-Medical-Mini.f16.gguf", ] GGUF_CANDIDATES = [GGUF_FILE] if GGUF_FILE else _DEFAULT_CANDIDATES N_CTX = int(os.getenv("N_CTX", "2048")) N_THREADS = int(os.getenv("N_THREADS", str(os.cpu_count() or 4))) N_GPU_LAYERS = int(os.getenv("N_GPU_LAYERS", "0")) # Zero/CPU => 0 N_BATCH = int(os.getenv("N_BATCH", "96")) _llm = None def _download_gguf(): last_err = None for fname in GGUF_CANDIDATES: try: path = hf_hub_download(repo_id=GGUF_REPO, filename=fname) return path, fname except Exception as e: last_err = e raise RuntimeError(f"No se pudo descargar GGUF desde {GGUF_REPO}. Último error: {last_err}") def get_llm(): global _llm if _llm is not None: return _llm gguf_path, used = _download_gguf() print(f"[R1/llama.cpp] usando: {used}") _llm = Llama( model_path=gguf_path, n_ctx=N_CTX, n_threads=N_THREADS, n_gpu_layers=N_GPU_LAYERS, n_batch=N_BATCH, verbose=False, ) return _llm def _format_chatml(messages): parts = [] for m in messages: parts.append(f"<|im_start|>{m.get('role','user')}\n{m.get('content','')}<|im_end|>\n") parts.append("<|im_start|>assistant\n") return "".join(parts) def r1_chat_local(messages, temperature=0.2, max_tokens=384): # llama.cpp acepta messages directamente; si tu build no, usa prompt=_format_chatml(messages) llm = get_llm() out = llm.create_chat_completion(messages=messages, temperature=temperature, max_tokens=max_tokens) return out["choices"][0]["message"]["content"] # Warmup opcional if os.getenv("WARMUP", "0") == "1": try: get_llm() except Exception: pass # =============================================================== # DeepSeek-OCR (intacto) con fallback si no hay FlashAttention2 # =============================================================== def _best_dtype(): if torch.cuda.is_available(): return torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 return torch.float32 def _load_ocr_model(): model_name = "deepseek-ai/DeepSeek-OCR" ocr_tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) attn_impl = os.getenv("OCR_ATTN_IMPL", "flash_attention_2") try: ocr_model = AutoModel.from_pretrained( model_name, _attn_implementation=attn_impl, trust_remote_code=True, use_safetensors=True ).eval() return ocr_tokenizer, ocr_model except Exception as e: if any(k in str(e).lower() for k in ["flash_attn", "flashattention2", "flash_attention_2"]): ocr_model = AutoModel.from_pretrained( model_name, _attn_implementation="eager", trust_remote_code=True, use_safetensors=True ).eval() return ocr_tokenizer, ocr_model raise tokenizer, model = _load_ocr_model() @spaces.GPU def process_image(image, model_size, task_type, is_eval_mode): if image is None: return None, "Please upload an image first.", "Please upload an image first." dtype = _best_dtype() model_device = model.cuda().to(dtype) if torch.cuda.is_available() else model.to(dtype) with tempfile.TemporaryDirectory() as output_path: prompt = "\nFree OCR. " if task_type == "Free OCR" else "\n<|grounding|>Convert the document to markdown. " temp_image_path = os.path.join(output_path, "temp_image.jpg") image.save(temp_image_path) size_configs = { "Tiny": {"base_size": 512, "image_size": 512, "crop_mode": False}, "Small": {"base_size": 640, "image_size": 640, "crop_mode": False}, "Base": {"base_size": 1024, "image_size": 1024, "crop_mode": False}, "Large": {"base_size": 1280, "image_size": 1280, "crop_mode": False}, "Gundam (Recommended)": {"base_size": 1024, "image_size": 640, "crop_mode": True}, } config = size_configs.get(model_size, size_configs["Gundam (Recommended)"]) plain_text_result = model_device.infer( tokenizer, prompt=prompt, image_file=temp_image_path, output_path=output_path, base_size=config["base_size"], image_size=config["image_size"], crop_mode=config["crop_mode"], save_results=True, test_compress=True, eval_mode=is_eval_mode, ) image_result_path = os.path.join(output_path, "result_with_boxes.jpg") markdown_result_path = os.path.join(output_path, "result.mmd") markdown_content = "Markdown result was not generated. This is expected for 'Free OCR' task." if os.path.exists(markdown_result_path): with open(markdown_result_path, "r", encoding="utf-8") as f: markdown_content = f.read() result_image = None if os.path.exists(image_result_path): result_image = Image.open(image_result_path); result_image.load() text_result = plain_text_result if plain_text_result else markdown_content return result_image, markdown_content, text_result # =============================================================== # Chat (inyecta OCR) — con R1 local # =============================================================== def _truncate(text, max_chars=3000): return (text or "")[:max_chars] def _system_prompt(): return ("Eres un asistente clínico educativo. No sustituyes el juicio médico. " "Usa CONTEXTO_OCR si existe; si falta, pídelo. Evita diagnósticos definitivos.") def _ocr_context(ocr_md, ocr_txt): return _truncate(ocr_md) or _truncate(ocr_txt) or "" def to_chat_messages(chat_msgs, ocr_md, ocr_txt): sys = _system_prompt() ctx = _ocr_context(ocr_md, ocr_txt) if ctx: sys += ("\n\n---\n" "CONTEXTO_OCR (fuente principal; si falta un dato, dilo explícitamente):\n" f"{ctx}\n---") msgs = [{"role": "system", "content": sys}] for m in (chat_msgs or []): if m.get("role") in ("user", "assistant"): msgs.append({"role": m["role"], "content": m.get("content", "")}) return msgs def r1_reply(user_msg, chat_msgs, ocr_md, ocr_txt): if not user_msg: user_msg = "Analiza el CONTEXTO_OCR anterior y responde a partir de ese contenido." try: msgs = to_chat_messages(chat_msgs, ocr_md, ocr_txt) + [{"role": "user", "content": user_msg}] answer = r1_chat_local(msgs, temperature=0.2, max_tokens=512) updated = (chat_msgs or []) + [{"role": "user", "content": user_msg}, {"role": "assistant", "content": answer}] return updated, "", gr.update(value="") except Exception as e: err = f"{e.__class__.__name__}: {str(e) or repr(e)}" tb = traceback.format_exc(limit=2) updated = (chat_msgs or []) + [{"role": "user", "content": user_msg or ""}, {"role": "assistant", "content": f"⚠️ Error LLM: {err}"}] return updated, "", gr.update(value=f"{err}\n{tb}") def clear_chat(): return [], "", gr.update(value="") # =============================================================== # UI (Gradio 5) # =============================================================== with gr.Blocks(title="DeepSeek-OCR + R1 Medical (GGUF rápido)", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # DeepSeek-OCR → Chat Médico con **DeepSeek-R1 Medical Mini (GGUF local rápido)** 1) **Sube una imagen** y corre **OCR** (imagen anotada, Markdown y texto). 2) **Chatea** con **R1 Medical Mini** usando automáticamente el **OCR** como contexto. *Uso educativo; no reemplaza consejo médico.* """ ) ocr_md_state = gr.State("") ocr_txt_state = gr.State("") with gr.Row(): with gr.Column(scale=1): image_input = gr.Image(type="pil", label="Upload Image", sources=["upload", "clipboard", "webcam"]) model_size = gr.Dropdown(choices=["Tiny", "Small", "Base", "Large", "Gundam (Recommended)"], value="Gundam (Recommended)", label="Model Size") task_type = gr.Dropdown(choices=["Free OCR", "Convert to Markdown"], value="Convert to Markdown", label="Task Type") eval_mode_checkbox = gr.Checkbox(value=False, label="Enable Evaluation Mode", info="Solo texto (más rápido). Desmárcalo para ver imagen anotada y markdown.") submit_btn = gr.Button("Process Image", variant="primary") with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("Annotated Image"): output_image = gr.Image(interactive=False) with gr.TabItem("Markdown Preview"): output_markdown = gr.Markdown() with gr.TabItem("Markdown Source (or Eval Output)"): output_text = gr.Textbox(lines=18, show_copy_button=True, interactive=False) with gr.Row(): md_preview = gr.Textbox(label="Snapshot Markdown OCR", lines=10, interactive=False) txt_preview = gr.Textbox(label="Snapshot Texto OCR", lines=10, interactive=False) gr.Markdown("## Chat Clínico (R1 Medical Mini — GGUF local)") with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(label="Asistente OCR (R1 GGUF)", type="messages", height=420) user_in = gr.Textbox(label="Mensaje", placeholder="Escribe tu consulta… (vacío = analiza solo el OCR)", lines=2) with gr.Row(): send_btn = gr.Button("Enviar", variant="primary") clear_btn = gr.Button("Limpiar") with gr.Column(scale=1): error_box = gr.Textbox(label="Debug (si hay error)", lines=8, interactive=False) submit_btn.click( fn=process_image, inputs=[image_input, model_size, task_type, eval_mode_checkbox], outputs=[output_image, output_markdown, output_text], ).then( fn=lambda md, tx: (md, tx, md, tx), inputs=[output_markdown, output_text], outputs=[ocr_md_state, ocr_txt_state, md_preview, txt_preview], ) send_btn.click(fn=r1_reply, inputs=[user_in, chatbot, ocr_md_state, ocr_txt_state], outputs=[chatbot, user_in, error_box]) clear_btn.click(fn=clear_chat, outputs=[chatbot, user_in, error_box]) if __name__ == "__main__": demo.queue(max_size=20) demo.launch()