Spaces:
Runtime error
Runtime error
| """ | |
| PetBull‑7B‑VL demo – ZeroGPU‑ready (Qwen2.5‑VL API) | |
| """ | |
| import os | |
| import json | |
| import spaces | |
| import torch | |
| import gradio as gr | |
| import transformers, accelerate, numpy as np | |
| from PIL import Image | |
| from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration | |
| from peft import PeftModel | |
| from qwen_vl_utils import process_vision_info # pip install qwen-vl-utils | |
| from tools.retriever import search as product_search | |
| print("VERSIONS:", transformers.__version__, accelerate.__version__, torch.__version__, np.__version__) | |
| os.environ["ACCELERATE_USE_SLOW_RETRIEVAL"] = "true" | |
| # ---- Config ---- | |
| BASE_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| ADAPTER_REPO = "ColdSlim/PetBull-7B" # your LoRA | |
| ADAPTER_REV = "master" | |
| OFFLOAD_DIR = "offload" | |
| DTYPE = torch.float16 | |
| # ---- Processor (no GPU) ---- | |
| processor = AutoProcessor.from_pretrained(BASE_MODEL, trust_remote_code=True) | |
| # ---- Base model ON CPU (do NOT touch CUDA here) ---- | |
| base = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| BASE_MODEL, | |
| torch_dtype=DTYPE, | |
| low_cpu_mem_usage=True, | |
| device_map={"": "cpu"}, | |
| offload_folder=OFFLOAD_DIR, | |
| trust_remote_code=True, | |
| ) | |
| # ---- Attach LoRA ON CPU ---- | |
| model = PeftModel.from_pretrained( | |
| base, | |
| ADAPTER_REPO, | |
| revision=ADAPTER_REV, | |
| device_map={"": "cpu"}, | |
| ).eval() | |
| _model_on_gpu = False # once-per-session move | |
| def format_candidates_for_llm(cands, budget_twd=None): | |
| filtered = [] | |
| for c in cands: | |
| if ( | |
| budget_twd | |
| and c.get("price_currency") == "TWD" | |
| and c.get("price_value") | |
| and c["price_value"] > budget_twd | |
| ): | |
| continue | |
| filtered.append({ | |
| "id": c.get("id"), | |
| "brand_en": c.get("brand_en"), | |
| "brand_zh": c.get("brand_zh"), | |
| "product_name_en": c.get("product_name_en"), | |
| "product_name_zh": c.get("product_name_zh"), | |
| "category_en": c.get("category_en"), | |
| "category_zh": c.get("category_zh"), | |
| "price_value": c.get("price_value"), | |
| "price_currency": c.get("price_currency"), | |
| "source_url": c.get("source_url"), | |
| "image_url": c.get("image_url"), | |
| "score": c.get("score"), | |
| }) | |
| return json.dumps(filtered, ensure_ascii=False, indent=2), filtered | |
| DERMA_SAFETY = ( | |
| "Safety notes: For broken/infected skin, pregnancy/lactation, infants, " | |
| "or if symptoms worsen—seek a qualified dermatologist. Patch-test first." | |
| ) | |
| def recommend_products(query_text: str, budget_twd: int | None = None, k: int = 8): | |
| # 1) Retrieve candidates | |
| cands = product_search(query_text, k=k) | |
| # 2) Build short grounded context | |
| context_json, _ = format_candidates_for_llm(cands, budget_twd=budget_twd) | |
| # 3) Ask your LLM to pick & explain (plug into your existing generation path) | |
| system = ( | |
| "You are DermalCare’s assistant. Recommend up to 3 products strictly " | |
| "from the provided list. Include a one-line why-it-helps and a brief how-to-use. " | |
| "Respect budget and do not invent products." | |
| ) | |
| user = f"User need: {query_text}\nCandidate products (JSON array):\n{context_json}\n{DERMA_SAFETY}" | |
| # --- if you already have Qwen2-VL loaded as text generator, reuse it. | |
| # Example skeleton (pseudo—replace with your app’s generate() function): | |
| try: | |
| # Replace this with your actual text-generation helper: | |
| answer = f"(LLM picks here)\n\nContext:\n{context_json}" | |
| except Exception as e: | |
| answer = f"❌ Generation error: {e}\n\nHere are candidates:\n{context_json}" | |
| return answer | |
| def _parse_recommendation_json(raw: str): | |
| if not raw: | |
| return None | |
| cleaned = raw.strip() | |
| if cleaned.startswith("```"): | |
| lines = [line for line in cleaned.splitlines() if not line.strip().startswith("```")] | |
| cleaned = "\n".join(lines) | |
| start = cleaned.find('{') | |
| end = cleaned.rfind('}') | |
| if start == -1 or end == -1 or end <= start: | |
| return None | |
| try: | |
| return json.loads(cleaned[start:end + 1]) | |
| except Exception: | |
| return None | |
| def _build_recommendation_sections(rec_data, candidate_lookup): | |
| if not rec_data: | |
| return None, None | |
| recommend_flag = rec_data.get("recommend") | |
| if isinstance(recommend_flag, str): | |
| recommend_flag = recommend_flag.strip().lower() in {"yes", "true", "1"} | |
| elif isinstance(recommend_flag, (int, float)): | |
| recommend_flag = bool(recommend_flag) | |
| if not recommend_flag: | |
| return None, None | |
| recommendations = rec_data.get("recommendations", []) | |
| if not isinstance(recommendations, list): | |
| return None, None | |
| lines = ["### Suggested Products", ""] | |
| products_payload = [] | |
| for idx, item in enumerate(recommendations[:3], start=1): | |
| if not isinstance(item, dict): | |
| continue | |
| raw_id = item.get("id") | |
| if raw_id is None: | |
| continue | |
| pid = str(raw_id).strip() | |
| if not pid: | |
| continue | |
| candidate = candidate_lookup.get(pid, {}) | |
| brand = ( | |
| candidate.get("brand_en") | |
| or candidate.get("brand_zh") | |
| or item.get("brand") | |
| or "" | |
| ) | |
| name = ( | |
| candidate.get("product_name_en") | |
| or candidate.get("product_name_zh") | |
| or item.get("name") | |
| or f"Product {idx}" | |
| ) | |
| category = ( | |
| candidate.get("category_en") | |
| or candidate.get("category_zh") | |
| or item.get("category") | |
| or None | |
| ) | |
| price_value = candidate.get("price_value") | |
| price_currency = candidate.get("price_currency") | |
| why = item.get("why") or "Supports the user’s concern." | |
| how = item.get("how") or "Use as directed on the product label." | |
| url = candidate.get("source_url") or item.get("url") | |
| image_url = candidate.get("image_url") or item.get("image_url") | |
| lines.extend([ | |
| f"{idx}. **{name}**", | |
| f"- **Why it helps:** {why}", | |
| f"- **How to use:** {how}", | |
| "", | |
| ]) | |
| products_payload.append({ | |
| "id": pid, | |
| "brand": brand, | |
| "name": name, | |
| "category": category, | |
| "price_value": price_value, | |
| "price_currency": price_currency, | |
| "why": why, | |
| "how": how, | |
| "url": url, | |
| "image_url": image_url, | |
| }) | |
| if not products_payload: | |
| return None, None | |
| suggestion_text = "\n".join(lines).strip() | |
| product_json_payload = json.dumps( | |
| {"version": 1, "products": products_payload}, | |
| ensure_ascii=False, | |
| ) | |
| return suggestion_text, product_json_payload | |
| # ---- Inference on GPU (ZeroGPU pattern) ---- | |
| def generate_answer(image, question, temperature=0.7, top_p=0.95, max_tokens=256): | |
| """ | |
| Uses Qwen2.5-VL chat template + qwen_vl_utils to prepare image+text, then generate. | |
| """ | |
| global _model_on_gpu | |
| if image is None: | |
| image = Image.new("RGB", (224, 224), color="white") | |
| if not _model_on_gpu: | |
| model.to("cuda") | |
| _model_on_gpu = True | |
| # Build chat messages in Qwen format | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image}, | |
| {"type": "text", "text": question or "Describe this image."}, | |
| ], | |
| }] | |
| # Processor helpers | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| # Pack tensors on GPU | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| inputs = {k: (v.to("cuda") if hasattr(v, "to") else v) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| out = model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| # Trim prompt tokens before decode (Qwen style) | |
| trimmed = [o[len(i):] for i, o in zip(inputs["input_ids"], out)] | |
| return processor.batch_decode(trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] | |
| # ---- PetCare answer + product suggestions (ONE output) ---- | |
| def pet_answer_with_recs(image, question, temperature=0.7, top_p=0.95, max_tokens=256, budget_twd=None): | |
| """ | |
| 1) Get the normal PetBull answer (image + text). | |
| 2) Run vector search on the user's question. | |
| 3) Ask the LLM (text-only) to decide if any candidates are relevant for the user's issue. | |
| If yes, append a 'Suggested products' section (up to 3 items from the list). | |
| If not, append 'No relevant products.'. | |
| """ | |
| # Step 1: normal PetBull answer | |
| base = generate_answer(image, question, temperature, top_p, max_tokens) | |
| # Step 2: retrieve product candidates (humans/skincare; model will decide relevance) | |
| cands = product_search(question, k=8) | |
| cand_block_json, cand_list = format_candidates_for_llm(cands, budget_twd=budget_twd) | |
| candidate_lookup = { | |
| str(c.get("id")).strip(): c for c in cand_list if c.get("id") is not None | |
| } | |
| # Step 3: build a small, text-only prompt for suggestions | |
| # IMPORTANT: we use the same Qwen2.5-VL model in text mode | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": | |
| "You are DermalCare's assistant.\n" | |
| "Respond ONLY with valid JSON (no markdown, no explanations).\n" | |
| "Expected schema: {\"recommend\": bool, \"recommendations\": [ {\"id\": str, \"why\": str, \"how\": str } ], \"notes\": str }.\n" | |
| "Use candidate_products as the exclusive source of items. If a product is recommended, its id must exist in candidate_products.\n" | |
| "If no products are relevant, return {\"recommend\": false, \"recommendations\": [], \"notes\": \"No relevant products.\"}."} | |
| ] | |
| },{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": f"User message:\n{question}"}, | |
| {"type": "text", "text": f"candidate_products = {cand_block_json}"} | |
| ] | |
| }] | |
| # Prepare inputs on GPU (text-only) | |
| text_prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = processor( | |
| text=[text_prompt], images=None, videos=None, padding=True, return_tensors="pt", | |
| ) | |
| inputs = {k: (v.to("cuda") if hasattr(v, "to") else v) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| out = model.generate( | |
| **inputs, | |
| max_new_tokens=200, | |
| temperature=0.2, # keep precise/grounded | |
| top_p=0.95, | |
| ) | |
| trimmed = [o[len(i):] for i, o in zip(inputs["input_ids"], out)] | |
| raw_response = processor.batch_decode( | |
| trimmed, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=False, | |
| )[0] | |
| rec_data = _parse_recommendation_json(raw_response) | |
| sections = [base.strip()] | |
| suggestion_text = None | |
| product_json_payload = None | |
| if rec_data: | |
| recommend_flag = rec_data.get("recommend") | |
| if isinstance(recommend_flag, str): | |
| recommend_flag = recommend_flag.strip().lower() in {"yes", "true", "1"} | |
| elif isinstance(recommend_flag, (int, float)): | |
| recommend_flag = bool(recommend_flag) | |
| recs = [] | |
| for item in rec_data.get("recommendations", []): | |
| if isinstance(item, dict) and item.get("id"): | |
| recs.append(item) | |
| if recommend_flag and recs: | |
| suggestion_lines = ["### Suggested Products", ""] | |
| products_payload = [] | |
| for idx, rec in enumerate(recs[:3], start=1): | |
| pid = rec.get("id") | |
| candidate = candidate_lookup.get(pid, {}) | |
| brand = ( | |
| candidate.get("brand_en") | |
| or candidate.get("brand_zh") | |
| or rec.get("brand") | |
| or "" | |
| ) | |
| name = ( | |
| candidate.get("product_name_en") | |
| or candidate.get("product_name_zh") | |
| or rec.get("name") | |
| or f"Product {idx}" | |
| ) | |
| category = ( | |
| candidate.get("category_en") | |
| or candidate.get("category_zh") | |
| or rec.get("category") | |
| or None | |
| ) | |
| price_value = candidate.get("price_value") | |
| price_currency = candidate.get("price_currency") | |
| why = rec.get("why") or "Supports the user’s concern." | |
| how = rec.get("how") or "Use as directed on the product label." | |
| url = candidate.get("source_url") or rec.get("url") | |
| image_url = candidate.get("image_url") or rec.get("image_url") | |
| suggestion_lines.extend([ | |
| f"{idx}. **{name}**", | |
| f"- **Why it helps:** {why}", | |
| f"- **How to use:** {how}", | |
| "", | |
| ]) | |
| products_payload.append({ | |
| "id": pid, | |
| "brand": brand, | |
| "name": name, | |
| "category": category, | |
| "price_value": price_value, | |
| "price_currency": price_currency, | |
| "why": why, | |
| "how": how, | |
| "url": url, | |
| "image_url": image_url, | |
| }) | |
| if products_payload: | |
| suggestion_text = "\n".join(suggestion_lines).strip() | |
| product_json_payload = json.dumps( | |
| {"version": 1, "products": products_payload}, | |
| ensure_ascii=False, | |
| ) | |
| if suggestion_text and product_json_payload: | |
| sections.append( | |
| "Suggested products:\n" | |
| f"{suggestion_text}\n\n" | |
| f"<DERMACARE_PRODUCTS_JSON>{product_json_payload}</DERMACARE_PRODUCTS_JSON>" | |
| ) | |
| sections.append(DERMA_SAFETY) | |
| return "\n\n".join([s for s in sections if s]) | |
| # ---- UI ---- | |
| with gr.Blocks(title="DermalCare - Pet & Skincare Assistant") as demo: | |
| gr.Markdown("# DermalCare - Your AI Assistant for Pet Care and Skincare") | |
| with gr.Tabs(): | |
| with gr.Tab("Pet Care"): | |
| gr.Markdown("## PetBull‑7B‑VL – Ask a Vet\nUpload a photo and/or type a question.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_in = gr.Image(type="pil", label="Pet photo (optional)") | |
| txt_in = gr.Textbox(lines=3, placeholder="Describe the issue…") | |
| ask = gr.Button("Ask PetBull") | |
| temp = gr.Slider(0.1, 1.5, 0.7, label="Temperature") | |
| topp = gr.Slider(0.1, 1.0, 0.95, label="Top‑p") | |
| max_tok = gr.Slider(32, 512, 256, step=8, label="Max tokens") | |
| with gr.Column(): | |
| answer = gr.Textbox(lines=12, label="Assistant", interactive=False) | |
| ask.click( | |
| pet_answer_with_recs, | |
| inputs=[img_in, txt_in, temp, topp, max_tok], # (budget optional: add at the end if you want) | |
| outputs=answer | |
| ) | |
| demo.queue().launch(show_api=False, share=True) | |