DermalCare / app.py
pixel3user
some changes
103cddb
"""
PetBull‑7B‑VL demo – ZeroGPU‑ready (Qwen2.5‑VL API)
"""
import os
import json
import spaces
import torch
import gradio as gr
import transformers, accelerate, numpy as np
from PIL import Image
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration
from peft import PeftModel
from qwen_vl_utils import process_vision_info # pip install qwen-vl-utils
from tools.retriever import search as product_search
print("VERSIONS:", transformers.__version__, accelerate.__version__, torch.__version__, np.__version__)
os.environ["ACCELERATE_USE_SLOW_RETRIEVAL"] = "true"
# ---- Config ----
BASE_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
ADAPTER_REPO = "ColdSlim/PetBull-7B" # your LoRA
ADAPTER_REV = "master"
OFFLOAD_DIR = "offload"
DTYPE = torch.float16
# ---- Processor (no GPU) ----
processor = AutoProcessor.from_pretrained(BASE_MODEL, trust_remote_code=True)
# ---- Base model ON CPU (do NOT touch CUDA here) ----
base = Qwen2_5_VLForConditionalGeneration.from_pretrained(
BASE_MODEL,
torch_dtype=DTYPE,
low_cpu_mem_usage=True,
device_map={"": "cpu"},
offload_folder=OFFLOAD_DIR,
trust_remote_code=True,
)
# ---- Attach LoRA ON CPU ----
model = PeftModel.from_pretrained(
base,
ADAPTER_REPO,
revision=ADAPTER_REV,
device_map={"": "cpu"},
).eval()
_model_on_gpu = False # once-per-session move
def format_candidates_for_llm(cands, budget_twd=None):
filtered = []
for c in cands:
if (
budget_twd
and c.get("price_currency") == "TWD"
and c.get("price_value")
and c["price_value"] > budget_twd
):
continue
filtered.append({
"id": c.get("id"),
"brand_en": c.get("brand_en"),
"brand_zh": c.get("brand_zh"),
"product_name_en": c.get("product_name_en"),
"product_name_zh": c.get("product_name_zh"),
"category_en": c.get("category_en"),
"category_zh": c.get("category_zh"),
"price_value": c.get("price_value"),
"price_currency": c.get("price_currency"),
"source_url": c.get("source_url"),
"image_url": c.get("image_url"),
"score": c.get("score"),
})
return json.dumps(filtered, ensure_ascii=False, indent=2), filtered
DERMA_SAFETY = (
"Safety notes: For broken/infected skin, pregnancy/lactation, infants, "
"or if symptoms worsen—seek a qualified dermatologist. Patch-test first."
)
def recommend_products(query_text: str, budget_twd: int | None = None, k: int = 8):
# 1) Retrieve candidates
cands = product_search(query_text, k=k)
# 2) Build short grounded context
context_json, _ = format_candidates_for_llm(cands, budget_twd=budget_twd)
# 3) Ask your LLM to pick & explain (plug into your existing generation path)
system = (
"You are DermalCare’s assistant. Recommend up to 3 products strictly "
"from the provided list. Include a one-line why-it-helps and a brief how-to-use. "
"Respect budget and do not invent products."
)
user = f"User need: {query_text}\nCandidate products (JSON array):\n{context_json}\n{DERMA_SAFETY}"
# --- if you already have Qwen2-VL loaded as text generator, reuse it.
# Example skeleton (pseudo—replace with your app’s generate() function):
try:
# Replace this with your actual text-generation helper:
answer = f"(LLM picks here)\n\nContext:\n{context_json}"
except Exception as e:
answer = f"❌ Generation error: {e}\n\nHere are candidates:\n{context_json}"
return answer
def _parse_recommendation_json(raw: str):
if not raw:
return None
cleaned = raw.strip()
if cleaned.startswith("```"):
lines = [line for line in cleaned.splitlines() if not line.strip().startswith("```")]
cleaned = "\n".join(lines)
start = cleaned.find('{')
end = cleaned.rfind('}')
if start == -1 or end == -1 or end <= start:
return None
try:
return json.loads(cleaned[start:end + 1])
except Exception:
return None
def _build_recommendation_sections(rec_data, candidate_lookup):
if not rec_data:
return None, None
recommend_flag = rec_data.get("recommend")
if isinstance(recommend_flag, str):
recommend_flag = recommend_flag.strip().lower() in {"yes", "true", "1"}
elif isinstance(recommend_flag, (int, float)):
recommend_flag = bool(recommend_flag)
if not recommend_flag:
return None, None
recommendations = rec_data.get("recommendations", [])
if not isinstance(recommendations, list):
return None, None
lines = ["### Suggested Products", ""]
products_payload = []
for idx, item in enumerate(recommendations[:3], start=1):
if not isinstance(item, dict):
continue
raw_id = item.get("id")
if raw_id is None:
continue
pid = str(raw_id).strip()
if not pid:
continue
candidate = candidate_lookup.get(pid, {})
brand = (
candidate.get("brand_en")
or candidate.get("brand_zh")
or item.get("brand")
or ""
)
name = (
candidate.get("product_name_en")
or candidate.get("product_name_zh")
or item.get("name")
or f"Product {idx}"
)
category = (
candidate.get("category_en")
or candidate.get("category_zh")
or item.get("category")
or None
)
price_value = candidate.get("price_value")
price_currency = candidate.get("price_currency")
why = item.get("why") or "Supports the user’s concern."
how = item.get("how") or "Use as directed on the product label."
url = candidate.get("source_url") or item.get("url")
image_url = candidate.get("image_url") or item.get("image_url")
lines.extend([
f"{idx}. **{name}**",
f"- **Why it helps:** {why}",
f"- **How to use:** {how}",
"",
])
products_payload.append({
"id": pid,
"brand": brand,
"name": name,
"category": category,
"price_value": price_value,
"price_currency": price_currency,
"why": why,
"how": how,
"url": url,
"image_url": image_url,
})
if not products_payload:
return None, None
suggestion_text = "\n".join(lines).strip()
product_json_payload = json.dumps(
{"version": 1, "products": products_payload},
ensure_ascii=False,
)
return suggestion_text, product_json_payload
# ---- Inference on GPU (ZeroGPU pattern) ----
@spaces.GPU(duration=120)
def generate_answer(image, question, temperature=0.7, top_p=0.95, max_tokens=256):
"""
Uses Qwen2.5-VL chat template + qwen_vl_utils to prepare image+text, then generate.
"""
global _model_on_gpu
if image is None:
image = Image.new("RGB", (224, 224), color="white")
if not _model_on_gpu:
model.to("cuda")
_model_on_gpu = True
# Build chat messages in Qwen format
messages = [{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": question or "Describe this image."},
],
}]
# Processor helpers
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
# Pack tensors on GPU
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = {k: (v.to("cuda") if hasattr(v, "to") else v) for k, v in inputs.items()}
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
)
# Trim prompt tokens before decode (Qwen style)
trimmed = [o[len(i):] for i, o in zip(inputs["input_ids"], out)]
return processor.batch_decode(trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
# ---- PetCare answer + product suggestions (ONE output) ----
@spaces.GPU(duration=120)
def pet_answer_with_recs(image, question, temperature=0.7, top_p=0.95, max_tokens=256, budget_twd=None):
"""
1) Get the normal PetBull answer (image + text).
2) Run vector search on the user's question.
3) Ask the LLM (text-only) to decide if any candidates are relevant for the user's issue.
If yes, append a 'Suggested products' section (up to 3 items from the list).
If not, append 'No relevant products.'.
"""
# Step 1: normal PetBull answer
base = generate_answer(image, question, temperature, top_p, max_tokens)
# Step 2: retrieve product candidates (humans/skincare; model will decide relevance)
cands = product_search(question, k=8)
cand_block_json, cand_list = format_candidates_for_llm(cands, budget_twd=budget_twd)
candidate_lookup = {
str(c.get("id")).strip(): c for c in cand_list if c.get("id") is not None
}
# Step 3: build a small, text-only prompt for suggestions
# IMPORTANT: we use the same Qwen2.5-VL model in text mode
messages = [{
"role": "user",
"content": [
{"type": "text", "text":
"You are DermalCare's assistant.\n"
"Respond ONLY with valid JSON (no markdown, no explanations).\n"
"Expected schema: {\"recommend\": bool, \"recommendations\": [ {\"id\": str, \"why\": str, \"how\": str } ], \"notes\": str }.\n"
"Use candidate_products as the exclusive source of items. If a product is recommended, its id must exist in candidate_products.\n"
"If no products are relevant, return {\"recommend\": false, \"recommendations\": [], \"notes\": \"No relevant products.\"}."}
]
},{
"role": "user",
"content": [
{"type": "text", "text": f"User message:\n{question}"},
{"type": "text", "text": f"candidate_products = {cand_block_json}"}
]
}]
# Prepare inputs on GPU (text-only)
text_prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[text_prompt], images=None, videos=None, padding=True, return_tensors="pt",
)
inputs = {k: (v.to("cuda") if hasattr(v, "to") else v) for k, v in inputs.items()}
with torch.no_grad():
out = model.generate(
**inputs,
max_new_tokens=200,
temperature=0.2, # keep precise/grounded
top_p=0.95,
)
trimmed = [o[len(i):] for i, o in zip(inputs["input_ids"], out)]
raw_response = processor.batch_decode(
trimmed,
skip_special_tokens=True,
clean_up_tokenization_spaces=False,
)[0]
rec_data = _parse_recommendation_json(raw_response)
sections = [base.strip()]
suggestion_text = None
product_json_payload = None
if rec_data:
recommend_flag = rec_data.get("recommend")
if isinstance(recommend_flag, str):
recommend_flag = recommend_flag.strip().lower() in {"yes", "true", "1"}
elif isinstance(recommend_flag, (int, float)):
recommend_flag = bool(recommend_flag)
recs = []
for item in rec_data.get("recommendations", []):
if isinstance(item, dict) and item.get("id"):
recs.append(item)
if recommend_flag and recs:
suggestion_lines = ["### Suggested Products", ""]
products_payload = []
for idx, rec in enumerate(recs[:3], start=1):
pid = rec.get("id")
candidate = candidate_lookup.get(pid, {})
brand = (
candidate.get("brand_en")
or candidate.get("brand_zh")
or rec.get("brand")
or ""
)
name = (
candidate.get("product_name_en")
or candidate.get("product_name_zh")
or rec.get("name")
or f"Product {idx}"
)
category = (
candidate.get("category_en")
or candidate.get("category_zh")
or rec.get("category")
or None
)
price_value = candidate.get("price_value")
price_currency = candidate.get("price_currency")
why = rec.get("why") or "Supports the user’s concern."
how = rec.get("how") or "Use as directed on the product label."
url = candidate.get("source_url") or rec.get("url")
image_url = candidate.get("image_url") or rec.get("image_url")
suggestion_lines.extend([
f"{idx}. **{name}**",
f"- **Why it helps:** {why}",
f"- **How to use:** {how}",
"",
])
products_payload.append({
"id": pid,
"brand": brand,
"name": name,
"category": category,
"price_value": price_value,
"price_currency": price_currency,
"why": why,
"how": how,
"url": url,
"image_url": image_url,
})
if products_payload:
suggestion_text = "\n".join(suggestion_lines).strip()
product_json_payload = json.dumps(
{"version": 1, "products": products_payload},
ensure_ascii=False,
)
if suggestion_text and product_json_payload:
sections.append(
"Suggested products:\n"
f"{suggestion_text}\n\n"
f"<DERMACARE_PRODUCTS_JSON>{product_json_payload}</DERMACARE_PRODUCTS_JSON>"
)
sections.append(DERMA_SAFETY)
return "\n\n".join([s for s in sections if s])
# ---- UI ----
with gr.Blocks(title="DermalCare - Pet & Skincare Assistant") as demo:
gr.Markdown("# DermalCare - Your AI Assistant for Pet Care and Skincare")
with gr.Tabs():
with gr.Tab("Pet Care"):
gr.Markdown("## PetBull‑7B‑VL – Ask a Vet\nUpload a photo and/or type a question.")
with gr.Row():
with gr.Column():
img_in = gr.Image(type="pil", label="Pet photo (optional)")
txt_in = gr.Textbox(lines=3, placeholder="Describe the issue…")
ask = gr.Button("Ask PetBull")
temp = gr.Slider(0.1, 1.5, 0.7, label="Temperature")
topp = gr.Slider(0.1, 1.0, 0.95, label="Top‑p")
max_tok = gr.Slider(32, 512, 256, step=8, label="Max tokens")
with gr.Column():
answer = gr.Textbox(lines=12, label="Assistant", interactive=False)
ask.click(
pet_answer_with_recs,
inputs=[img_in, txt_in, temp, topp, max_tok], # (budget optional: add at the end if you want)
outputs=answer
)
demo.queue().launch(show_api=False, share=True)