| | import gradio as gr |
| | import os |
| | import re |
| | import torch |
| | import gc |
| | from PIL import Image |
| | from transformers import pipeline |
| | from langchain_chroma import Chroma |
| | from langchain_community.document_loaders import PyPDFLoader, TextLoader |
| | from langchain_text_splitters import RecursiveCharacterTextSplitter |
| | from langchain_core.documents import Document |
| | from langchain_huggingface import HuggingFaceEmbeddings |
| | from ultralytics import YOLO |
| |
|
| | |
| | CHROMA_PATH = "/tmp/chroma_db" |
| | VISION_MODEL = "HuggingFaceTB/SmolVLM-Instruct" |
| |
|
| | |
| | print("โ๏ธ Loading Stable Vision Engine...") |
| | vision_pipe = pipeline( |
| | "image-text-to-text", |
| | model=VISION_MODEL, |
| | model_kwargs={"dtype": torch.float32}, |
| | device="cpu" |
| | ) |
| |
|
| | print("๐ Loading Embedding Engine...") |
| | embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
| |
|
| | |
| | def get_bottle_crops(image_path): |
| | print(f"๐ DEBUG: Starting YOLO on {image_path}") |
| | found_crops = [] |
| | |
| | try: |
| | original_img = Image.open(image_path).convert("RGB") |
| | img_w, img_h = original_img.size |
| | |
| | yolo_model = YOLO("yolov8n.pt") |
| | results = yolo_model(image_path, verbose=False, conf=0.1) |
| | |
| | for r in results: |
| | for box in r.boxes: |
| | if int(box.cls) in [39, 40, 41]: |
| | x1, y1, x2, y2 = box.xyxy[0].tolist() |
| | |
| | |
| | box_w, box_h = x2 - x1, y2 - y1 |
| | pad_x, pad_y = int(box_w * 0.25), int(box_h * 0.25) |
| | |
| | x1, y1 = max(0, x1 - pad_x), max(0, y1 - pad_y) |
| | x2, y2 = min(img_w, x2 + pad_x), min(img_h, y2 + pad_y) |
| | |
| | found_crops.append(original_img.crop((x1, y1, x2, y2))) |
| |
|
| | del yolo_model |
| | gc.collect() |
| | return found_crops if found_crops else [original_img] |
| | except Exception as e: |
| | print(f"โ YOLO Error: {e}") |
| | return [] |
| |
|
| | |
| | def ingest_recipes(files): |
| | if not files: return "โ No files uploaded." |
| | |
| | docs = [] |
| | for f in files: |
| | try: |
| | if f.name.endswith(".txt"): docs.extend(TextLoader(f.name).load()) |
| | elif f.name.endswith(".pdf"): docs.extend(PyPDFLoader(f.name).load()) |
| | except Exception as e: print(f"Error: {e}") |
| | |
| | if not docs: return "โ Could not extract text." |
| |
|
| | full_text = "\n".join([d.page_content for d in docs]) |
| | raw_chunks = re.split(r'(?m)^(?=Recipe:)', full_text) |
| | |
| | split_docs = [] |
| | for chunk in raw_chunks: |
| | clean_chunk = re.sub(r'โธป+', '', chunk).strip() |
| | if len(clean_chunk) > 20: |
| | split_docs.append(Document(page_content=clean_chunk)) |
| |
|
| | try: |
| | Chroma.from_documents(split_docs, embed_model, persist_directory=CHROMA_PATH) |
| | return f"โ
Bar library updated. Strictly split into {len(split_docs)} individual recipes." |
| | except Exception as e: |
| | return f"โ Database Error: {e}" |
| |
|
| | |
| | def bartend(message, history, img_path, inventory): |
| | debug_images = [] |
| | |
| | if img_path: |
| | crops = get_bottle_crops(img_path) |
| | debug_images = crops |
| | |
| | |
| | target_img = crops[0] if crops else Image.open(img_path).convert("RGB") |
| | |
| | def identify_spirit(image_input): |
| | |
| | |
| | fast_img = image_input.copy() |
| | if fast_img.mode != "RGB": fast_img = fast_img.convert("RGB") |
| | |
| | |
| | fast_img.thumbnail((384, 384)) |
| | |
| | prompt = "User: <image>\nRead the label. What is the specific brand and type of alcohol? Be precise.\nAssistant:" |
| | |
| | |
| | out = vision_pipe(fast_img, prompt, generate_kwargs={"max_new_tokens": 15}) |
| | text = out[0]['generated_text'] |
| | if "Assistant:" in text: return text.split("Assistant:")[-1].strip() |
| | return text.replace("User: <image>", "").strip() |
| |
|
| | try: |
| | inventory = identify_spirit(target_img) |
| | inventory = re.sub(r'<.*?>', '', inventory).strip().split('.')[0] |
| | print(f"๐ Pass 1 Result: {inventory}") |
| | |
| | generic_terms = ["vodka", "gin", "rum", "tequila", "whiskey", "whisky", "bourbon", "brandy", "alcohol", "liquor", "spirit", "bottle", "drink"] |
| | |
| | |
| | if inventory.lower() in generic_terms or len(inventory) < 4: |
| | print("โ ๏ธ Result too generic. Trying FULL IMAGE...") |
| | full_img_result = identify_spirit(Image.open(img_path)) |
| | full_img_result = re.sub(r'<.*?>', '', full_img_result).strip().split('.')[0] |
| | if len(full_img_result) > len(inventory): |
| | inventory = full_img_result |
| | print(f"โ
Pass 2 Result: {inventory}") |
| | |
| | except Exception as e: |
| | print(f"โ Vision Failed: {e}") |
| | inventory = "Unknown Spirit" |
| |
|
| | recipe_context = "" |
| | if inventory and inventory not in ["Empty Shelf", "Unknown Spirit", ""]: |
| | try: |
| | if os.path.exists(CHROMA_PATH): |
| | vs = Chroma(persist_directory=CHROMA_PATH, embedding_function=embed_model) |
| | search_query = f"Cocktail recipe using {inventory}" |
| | |
| | |
| | results = vs.similarity_search(search_query, k=4) |
| | recipe_context = "\n\n---\n\n".join([d.page_content for d in results]) |
| | except Exception as e: |
| | print(f"Search error: {e}") |
| |
|
| | if inventory == "Unknown Spirit": |
| | response = "I'm having trouble reading that label. Check the 'Vision Debug' gallery belowโis the crop clear?" |
| | elif recipe_context: |
| | response = f"I see you have **{inventory}**. Here are a few options from your collection:\n\n{recipe_context}" |
| | else: |
| | response = f"I see you have **{inventory}**! I don't have a specific recipe for that in the current library. Should I suggest a classic drink?" |
| |
|
| | history.append({"role": "user", "content": message}) |
| | history.append({"role": "assistant", "content": response}) |
| |
|
| | return history, inventory, debug_images |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# ๐ธ LocalAGI: The AI Sommelier") |
| | inv_state = gr.State("Empty Shelf") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | file_up = gr.File(label="1. Upload Recipe PDFs/TXTs", file_count="multiple") |
| | ingest_btn = gr.Button("๐ฅ Load Recipes into Memory") |
| | status = gr.Textbox(label="System Status", value="Ready") |
| | |
| | gr.Markdown("---") |
| | img = gr.Image(type="filepath", label="2. Photo of your Bottle") |
| | |
| | with gr.Accordion("๐ Vision Debug", open=False): |
| | debug_gallery = gr.Gallery(label="YOLO Crops", columns=2, height="auto") |
| | |
| | with gr.Column(scale=2): |
| | chatbot = gr.Chatbot(height=500, label="Bartender Chat") |
| | msg = gr.Textbox(label="3. Your Message", placeholder="Ask for a drink suggestion...") |
| | send_btn = gr.Button("Mix It Up", variant="primary") |
| |
|
| | ingest_btn.click(ingest_recipes, file_up, status) |
| | msg.submit(bartend, [msg, chatbot, img, inv_state], [chatbot, inv_state, debug_gallery]) |
| | send_btn.click(bartend, [msg, chatbot, img, inv_state], [chatbot, inv_state, debug_gallery]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(theme=gr.themes.Soft()) |