MedCard / app.py
LLDDWW's picture
feat: add medication analysis with easy explanations
0e6a905
raw
history blame
12.2 kB
import json
import re
from typing import List, Optional, Tuple
import numpy as np
import gradio as gr
import spaces
import torch
from PIL import Image
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
# Qwen2.5-VL ๋ชจ๋ธ ID
MODEL_ID = "Qwen/Qwen2.5-VL-7B-Instruct"
def _extract_assistant_content(decoded: str) -> str:
"""์–ด์‹œ์Šคํ„ดํŠธ ์‘๋‹ต ์ถ”์ถœ"""
if "<|im_start|>assistant" in decoded:
content = decoded.split("<|im_start|>assistant")[-1]
content = content.replace("<|im_end|>", "").strip()
return content
return decoded.strip()
def _extract_json_block(text: str) -> Optional[str]:
"""JSON ๋ธ”๋ก ์ถ”์ถœ"""
match = re.search(r"\{.*\}", text, re.DOTALL)
if not match:
return None
return match.group(0)
@spaces.GPU(duration=180)
def analyze_medication_image(image: Image.Image) -> Tuple[str, str]:
"""์ด๋ฏธ์ง€์—์„œ OCR ์ถ”์ถœ ํ›„ ์•ฝ ์ •๋ณด ๋ถ„์„"""
try:
# Qwen2.5-VL ๋ชจ๋ธ ๋กœ๋“œ
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID,
torch_dtype="auto",
device_map="auto"
)
processor = AutoProcessor.from_pretrained(MODEL_ID)
# Step 1: OCR - ์ด๋ฏธ์ง€์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ
ocr_messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": "์ด ์ด๋ฏธ์ง€์— ์žˆ๋Š” ๋ชจ๋“  ํ…์ŠคํŠธ๋ฅผ ์ •ํ™•ํ•˜๊ฒŒ ์ถ”์ถœํ•ด์ฃผ์„ธ์š”. ํ…์ŠคํŠธ๋งŒ ์ถœ๋ ฅํ•˜๊ณ  ๋‹ค๋ฅธ ์„ค๋ช…์€ ํ•„์š” ์—†์Šต๋‹ˆ๋‹ค."},
],
}
]
text = processor.apply_chat_template(ocr_messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(ocr_messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
with torch.no_grad():
generated_ids = model.generate(**inputs, max_new_tokens=2048)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
ocr_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
if not ocr_text or ocr_text.strip() == "":
return "ํ…์ŠคํŠธ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.", ""
# Step 2: ์•ฝ ์ •๋ณด ๋ถ„์„ - OCR ํ…์ŠคํŠธ๋ฅผ LLM์—๊ฒŒ ์ „๋‹ฌ
analysis_messages = [
{
"role": "user",
"content": [
{"type": "text", "text": f"""๋‹ค์Œ์€ ์•ฝ ๋ด‰ํˆฌ๋‚˜ ์ฒ˜๋ฐฉ์ „์—์„œ ์ถ”์ถœํ•œ ํ…์ŠคํŠธ์ž…๋‹ˆ๋‹ค:
{ocr_text}
์œ„ ํ…์ŠคํŠธ์—์„œ ์•ฝ ์ด๋ฆ„์„ ์ฐพ์•„์„œ, ๊ฐ ์•ฝ์— ๋Œ€ํ•ด ๋‹ค์Œ ์ •๋ณด๋ฅผ **๋…ธ์ธ๊ณผ ์–ด๋ฆฐ์ด ๋ชจ๋‘ ์‰ฝ๊ฒŒ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋„๋ก** ์žฌ๋ฏธ์žˆ๊ณ  ์นœ๊ทผํ•˜๊ฒŒ ์„ค๋ช…ํ•ด์ฃผ์„ธ์š”:
1. **์•ฝ ์ด๋ฆ„**: ์ •ํ™•ํ•œ ์•ฝ ์ด๋ฆ„
2. **ํšจ๋Šฅ**: ์ด ์•ฝ์ด ๋ฌด์—‡์„ ์น˜๋ฃŒํ•˜๊ณ  ์–ด๋–ป๊ฒŒ ๋„์›€์ด ๋˜๋Š”์ง€
3. **๋ถ€์ž‘์šฉ**: ์ฃผ์˜ํ•ด์•ผ ํ•  ๋ถ€์ž‘์šฉ๋“ค
๊ฐ ์•ฝ๋งˆ๋‹ค ์ด๋ชจ์ง€๋ฅผ ์‚ฌ์šฉํ•˜๊ณ , ์‰ฌ์šด ๋‹จ์–ด๋กœ ์„ค๋ช…ํ•ด์ฃผ์„ธ์š”. ํ• ๋จธ๋‹ˆ ํ• ์•„๋ฒ„์ง€๋‚˜ ์ดˆ๋“ฑํ•™์ƒ๋„ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”.
๋งˆํฌ๋‹ค์šด ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”."""},
],
}
]
text = processor.apply_chat_template(analysis_messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[text],
images=None,
videos=None,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
with torch.no_grad():
generated_ids = model.generate(**inputs, max_new_tokens=3072, temperature=0.7)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
analysis_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return ocr_text.strip(), analysis_text.strip()
except Exception as e:
raise Exception(f"๋ถ„์„ ์˜ค๋ฅ˜: {str(e)}")
def extract_medications_from_text(text: str) -> List[str]:
"""Stage 2: Qwen2.5๋กœ ํ…์ŠคํŠธ์—์„œ ์•ฝ ์ด๋ฆ„๋งŒ ์ถ”์ถœ"""
try:
messages = [
{
"role": "system",
"content": "You are a medical text analyzer. Extract only medication names from the given text and return them as a JSON array. Return ONLY valid JSON format."
},
{
"role": "user",
"content": f"Extract all medication names from this text:\n\n{text}\n\nReturn format: {{\"medications\": [\"name1\", \"name2\"]}}"
}
]
prompt = LLM_TOKENIZER.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
inputs = LLM_TOKENIZER(prompt, return_tensors="pt").to(LLM_MODEL.device)
with torch.no_grad():
outputs = LLM_MODEL.generate(
**inputs,
max_new_tokens=512,
temperature=0.3,
top_p=0.9,
do_sample=True,
pad_token_id=LLM_TOKENIZER.eos_token_id,
)
response = LLM_TOKENIZER.decode(outputs[0], skip_special_tokens=True)
# Extract assistant response (Qwen format)
if "<|im_start|>assistant" in response:
response = response.split("<|im_start|>assistant")[-1]
response = response.replace("<|im_end|>", "").strip()
# Parse JSON
json_match = re.search(r'\{.*?\}', response, re.DOTALL)
if json_match:
data = json.loads(json_match.group(0))
medications = data.get("medications", [])
if isinstance(medications, list) and medications:
return [str(m).strip() for m in medications if str(m).strip()]
return ["์•ฝ ์ด๋ฆ„์„ ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค."]
except Exception as e:
raise Exception(f"LLM ๋ถ„์„ ์˜ค๋ฅ˜: {str(e)}")
@spaces.GPU(duration=120)
def extract_medication_names(image: Image.Image) -> Tuple[str, List[str]]:
"""2๋‹จ๊ณ„ ํŒŒ์ดํ”„๋ผ์ธ: OCR โ†’ LLM ๋ถ„์„"""
try:
# Stage 1: OCR๋กœ ํ…์ŠคํŠธ ์ถ”์ถœ
extracted_text = extract_text_from_image(image)
if not extracted_text:
return "", ["ํ…์ŠคํŠธ๋ฅผ ์ถ”์ถœํ•˜์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค."]
# Stage 2: LLM์œผ๋กœ ์•ฝ ์ด๋ฆ„ ์ถ”์ถœ
medications = extract_medications_from_text(extracted_text)
return extracted_text, medications
except Exception as e:
return "", [f"์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}"]
def format_results(extracted_text: str, medications: List[str]) -> Tuple[str, str]:
"""๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํŒ…"""
# ์ถ”์ถœ๋œ ์ „์ฒด ํ…์ŠคํŠธ
text_output = f"### ๐Ÿ“„ ์ถ”์ถœ๋œ ํ…์ŠคํŠธ\n\n```\n{extracted_text}\n```"
# ์•ฝ ์ด๋ฆ„ ๋ฆฌ์ŠคํŠธ
if not medications or medications[0].startswith("์˜ค๋ฅ˜") or medications[0].startswith("์•ฝ ์ด๋ฆ„์„ ์ฐพ์ง€") or medications[0].startswith("ํ…์ŠคํŠธ๋ฅผ"):
med_output = f"### โš ๏ธ {medications[0] if medications else '์•ฝ ์ด๋ฆ„์„ ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.'}"
else:
med_output = f"### ๐Ÿ’Š ๊ฒ€์ถœ๋œ ์•ฝ๋ฌผ ({len(medications)}๊ฐœ)\n\n"
for idx, med_name in enumerate(medications, 1):
med_output += f"{idx}. **{med_name}**\n"
return text_output, med_output
def run_analysis(image: Optional[Image.Image], progress=gr.Progress()):
"""๋ฉ”์ธ ๋ถ„์„ ํŒŒ์ดํ”„๋ผ์ธ: OCR + ์•ฝ ์ •๋ณด ๋ถ„์„"""
if image is None:
return "๐Ÿ“ท ์•ฝ ๋ด‰ํˆฌ๋‚˜ ์ฒ˜๋ฐฉ์ „ ์‚ฌ์ง„์„ ์—…๋กœ๋“œํ•ด์ฃผ์„ธ์š”.", ""
progress(0.3, desc="๐Ÿ“ธ 1๋‹จ๊ณ„: OCR ํ…์ŠคํŠธ ์ถ”์ถœ ์ค‘...")
progress(0.6, desc="๐Ÿค– 2๋‹จ๊ณ„: ์•ฝ ์ •๋ณด ๋ถ„์„ ์ค‘...")
try:
ocr_text, analysis = analyze_medication_image(image)
progress(1.0, desc="โœ… ์™„๋ฃŒ!")
ocr_output = f"### ๐Ÿ“„ ์ถ”์ถœ๋œ ํ…์ŠคํŠธ\n\n```\n{ocr_text}\n```"
analysis_output = f"### ๐Ÿ’Š ์•ฝ ์ •๋ณด ์„ค๋ช…\n\n{analysis}"
return ocr_output, analysis_output
except Exception as e:
return f"### โš ๏ธ ์˜ค๋ฅ˜ ๋ฐœ์ƒ\n\n{str(e)}", ""
# ์‹ฌํ”Œํ•œ CSS
CUSTOM_CSS = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap');
:root {
--primary: #6366f1;
--secondary: #8b5cf6;
}
body {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.gradio-container {
max-width: 900px !important;
margin: auto;
background: rgba(255, 255, 255, 0.98);
border-radius: 24px;
box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.3);
padding: 40px;
}
.hero {
text-align: center;
padding: 30px 20px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 20px;
color: white;
margin-bottom: 30px;
}
.hero h1 {
font-size: 2.5rem;
font-weight: 700;
margin-bottom: 10px;
}
.hero p {
font-size: 1.1rem;
opacity: 0.95;
}
.upload-section {
background: white;
border-radius: 16px;
padding: 30px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.07);
margin-bottom: 20px;
}
.result-section {
background: white;
border-radius: 16px;
padding: 30px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.07);
min-height: 200px;
}
.analyze-btn button {
background: linear-gradient(135deg, var(--primary), var(--secondary)) !important;
color: white !important;
font-weight: 600 !important;
font-size: 1.1rem !important;
padding: 18px 40px !important;
border-radius: 12px !important;
border: none !important;
box-shadow: 0 10px 20px -5px rgba(99, 102, 241, 0.5) !important;
transition: all 0.3s ease !important;
}
.analyze-btn button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 15px 30px -5px rgba(99, 102, 241, 0.6) !important;
}
.gr-image {
border-radius: 12px !important;
}
"""
HERO_HTML = """
<div class="hero">
<h1>๐Ÿ’Š ์šฐ๋ฆฌ ๊ฐ€์กฑ ์•ฝ ๋„์šฐ๋ฏธ</h1>
<p>์•ฝ๋ด‰ํˆฌ/์ฒ˜๋ฐฉ์ „ ์‚ฌ์ง„์—์„œ ์•ฝ ์ •๋ณด๋ฅผ ์‰ฝ๊ณ  ์žฌ๋ฏธ์žˆ๊ฒŒ ์•Œ๋ ค๋“œ๋ ค์š”!</p>
</div>
"""
# Gradio ์ธํ„ฐํŽ˜์ด์Šค
with gr.Blocks(theme=gr.themes.Soft(), css=CUSTOM_CSS) as demo:
gr.HTML(HERO_HTML)
with gr.Column(elem_classes=["upload-section"]):
gr.Markdown("### ๐Ÿ“ธ ์‚ฌ์ง„ ์—…๋กœ๋“œ")
image_input = gr.Image(type="pil", label="์•ฝ๋ด‰ํˆฌ ๋˜๋Š” ์ฒ˜๋ฐฉ์ „ ์‚ฌ์ง„", height=350)
analyze_button = gr.Button("๐Ÿ” ์•ฝ ์ •๋ณด ๋ถ„์„ํ•˜๊ธฐ", elem_classes=["analyze-btn"], size="lg")
with gr.Row():
with gr.Column(elem_classes=["result-section"]):
gr.Markdown("### ๐Ÿ“‹ 1๋‹จ๊ณ„: ์ถ”์ถœ๋œ ํ…์ŠคํŠธ")
ocr_output = gr.Markdown("OCR๋กœ ์ถ”์ถœ๋œ ํ…์ŠคํŠธ๊ฐ€ ์—ฌ๊ธฐ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.")
with gr.Column(elem_classes=["result-section"]):
gr.Markdown("### ๐Ÿ“‹ 2๋‹จ๊ณ„: ์‰ฌ์šด ์•ฝ ์„ค๋ช…")
analysis_output = gr.Markdown("๋…ธ์ธ๊ณผ ์–ด๋ฆฐ์ด๋„ ์ดํ•ดํ•˜๊ธฐ ์‰ฌ์šด ์•ฝ ์ •๋ณด๊ฐ€ ์—ฌ๊ธฐ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.")
analyze_button.click(
run_analysis,
inputs=image_input,
outputs=[ocr_output, analysis_output],
)
gr.Markdown("""
---
**โ„น๏ธ ์‚ฌ์šฉ ๋ฐฉ๋ฒ•**
1. ์•ฝ ๋ด‰ํˆฌ๋‚˜ ์ฒ˜๋ฐฉ์ „ ์‚ฌ์ง„์„ ์—…๋กœ๋“œํ•˜์„ธ์š”
2. '์•ฝ ์ •๋ณด ๋ถ„์„ํ•˜๊ธฐ' ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜์„ธ์š”
3. ์™ผ์ชฝ์—๋Š” ์ถ”์ถœ๋œ ํ…์ŠคํŠธ, ์˜ค๋ฅธ์ชฝ์—๋Š” ์‰ฌ์šด ์„ค๋ช…์ด ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค!
**โš ๏ธ ์ฃผ์˜์‚ฌํ•ญ**
- ์ด ์•ฑ์€ ์ฐธ๊ณ ์šฉ์ด๋ฉฐ, ์‹ค์ œ ๋ณต์•ฝ์€ ๋ฐ˜๋“œ์‹œ ์˜์‚ฌ๋‚˜ ์•ฝ์‚ฌ์˜ ์ง€์‹œ๋ฅผ ๋”ฐ๋ฅด์„ธ์š”
- AI๊ฐ€ ์ƒ์„ฑํ•œ ์ •๋ณด์ด๋ฏ€๋กœ ์ •ํ™•ํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค
**๐Ÿค– ๊ธฐ์ˆ  ์Šคํƒ**
- Qwen2.5-VL-7B-Instruct (OCR + ์•ฝ ์ •๋ณด ๋ถ„์„)
""")
if __name__ == "__main__":
demo.queue().launch()