Spaces:
Runtime error
Runtime error
| # app.py | |
| import os | |
| import uuid | |
| import io | |
| import base64 | |
| from PIL import Image | |
| import gradio as gr | |
| import numpy as np | |
| # CLIP via Sentence-Transformers | |
| from sentence_transformers import SentenceTransformer | |
| # Gemini (Google) client | |
| from google import genai | |
| # Qdrant client & helpers | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import VectorParams, Distance, PointStruct | |
| # ------------------------- | |
| # CONFIG (reads env vars) | |
| # ------------------------- | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "").strip() | |
| QDRANT_URL = os.environ.get("QDRANT_URL", "").strip() | |
| QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", "").strip() | |
| # ------------------------- | |
| # Initialize clients/models | |
| # ------------------------- | |
| print("Loading CLIP model (this may take 20-60s the first time)...") | |
| MODEL_ID = "sentence-transformers/clip-ViT-B-32-multilingual-v1" | |
| clip_model = SentenceTransformer(MODEL_ID) | |
| genai_client = genai.Client(api_key=GEMINI_API_KEY) if GEMINI_API_KEY else None | |
| if not QDRANT_URL: | |
| raise RuntimeError("Please set QDRANT_URL environment variable") | |
| qclient = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) | |
| COLLECTION = "lost_found_items" | |
| VECTOR_SIZE = clip_model.get_sentence_embedding_dimension() | |
| # Create collection if missing | |
| try: | |
| if not qclient.collection_exists(COLLECTION): | |
| qclient.create_collection( | |
| collection_name=COLLECTION, | |
| vectors_config=VectorParams(size=VECTOR_SIZE, distance=Distance.COSINE), | |
| ) | |
| except Exception as e: | |
| print("Error initializing Qdrant collection:", e) | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def embed_text(text: str): | |
| return clip_model.encode(text, convert_to_numpy=True) | |
| def embed_image_pil(pil_img: Image.Image): | |
| pil_img = pil_img.convert("RGB") | |
| return clip_model.encode(pil_img, convert_to_numpy=True) | |
| def gen_tags_from_image_file(image_bytes: io.BytesIO) -> str: | |
| if genai_client is None: | |
| return "" | |
| try: | |
| file_obj = genai_client.files.upload(file=image_bytes) | |
| prompt_text = ( | |
| "Give 4 short tags (comma-separated) describing this item in the image. " | |
| "Tags should be short single words or two-word phrases (e.g. 'black backpack', 'water bottle'). " | |
| "Respond only with tags, no extra explanation." | |
| ) | |
| response = genai_client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=[prompt_text, file_obj], | |
| ) | |
| return response.text.strip() | |
| except Exception: | |
| return "" | |
| # ------------------------- | |
| # App logic: add item | |
| # ------------------------- | |
| def add_item(mode: str, uploaded_image, text_description: str, finder_name: str, finder_phone: str): | |
| item_id = str(uuid.uuid4()) | |
| payload = {"mode": mode, "text": text_description} | |
| # Found item extra details | |
| if mode == "found": | |
| payload["finder_name"] = finder_name | |
| payload["finder_phone"] = finder_phone | |
| if uploaded_image is not None: | |
| img_bytes = io.BytesIO() | |
| uploaded_image.convert("RGB").save(img_bytes, format="PNG") | |
| img_bytes.seek(0) | |
| vec = embed_image_pil(uploaded_image).tolist() | |
| payload["has_image"] = True | |
| payload["tags"] = gen_tags_from_image_file(img_bytes) | |
| payload["image_b64"] = base64.b64encode(img_bytes.getvalue()).decode("utf-8") | |
| else: | |
| vec = embed_text(text_description).tolist() | |
| payload["has_image"] = False | |
| if genai_client: | |
| try: | |
| resp = genai_client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=f"Give 4 short, comma-separated tags for this item described as: {text_description}. Reply only with tags." | |
| ) | |
| payload["tags"] = resp.text.strip() | |
| except Exception: | |
| payload["tags"] = "" | |
| else: | |
| payload["tags"] = "" | |
| try: | |
| point = PointStruct(id=item_id, vector=vec, payload=payload) | |
| qclient.upsert(collection_name=COLLECTION, points=[point], wait=True) | |
| except Exception as e: | |
| return f"Error saving to Qdrant: {e}" | |
| return f"β Saved item id: {item_id}\nTags: {payload.get('tags','')}" | |
| # ------------------------- | |
| # App logic: search | |
| # ------------------------- | |
| def search_items(query_image, query_text, limit: int = 5, min_score: float = 0.90): | |
| if query_image is not None: | |
| qvec = embed_image_pil(query_image).tolist() | |
| elif query_text and len(query_text.strip()) > 0: | |
| qvec = embed_text(query_text).tolist() | |
| else: | |
| return "β οΈ Please provide a query image or some query text.", [] | |
| try: | |
| hits = qclient.search(collection_name=COLLECTION, query_vector=qvec, limit=limit) | |
| except Exception as e: | |
| return f"β Error querying Qdrant: {e}", [] | |
| if not hits: | |
| return "No results found.", [] | |
| results_text = [] | |
| results_imgs = [] | |
| for h in hits: | |
| score = getattr(h, "score", None) | |
| if score is None or score < min_score: | |
| continue | |
| payload = h.payload or {} | |
| text_entry = ( | |
| f"id:{h.id} | score:{score:.4f} | mode:{payload.get('mode','')} | tags:{payload.get('tags','')} " | |
| f"| text:{payload.get('text','')} | finder:{payload.get('finder_name','-')} | phone:{payload.get('finder_phone','-')}" | |
| ) | |
| results_text.append(text_entry) | |
| if payload.get("has_image") and "image_b64" in payload: | |
| try: | |
| img = Image.open(io.BytesIO(base64.b64decode(payload["image_b64"]))) | |
| results_imgs.append(img) | |
| except Exception: | |
| pass | |
| if not results_text: | |
| return f"No results above similarity threshold {min_score}", [] | |
| return "\n\n".join(results_text), results_imgs | |
| # ------------------------- | |
| # App logic: clear images | |
| # ------------------------- | |
| def clear_all_images(): | |
| try: | |
| qclient.delete( | |
| collection_name=COLLECTION, | |
| points_selector={ | |
| "filter": {"must": [{"key": "has_image", "match": {"value": True}}]} | |
| } | |
| ) | |
| return "ποΈ All items with images have been cleared!" | |
| except Exception as e: | |
| return f"β Error while clearing images: {e}" | |
| # ------------------------- | |
| # Gradio UI | |
| # ------------------------- | |
| with gr.Blocks(title="Lost & Found β Simple Helper") as demo: | |
| gr.Markdown("## Lost & Found Helper (image/text search) β upload items, then search by image or text.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| mode = gr.Radio(choices=["lost", "found"], value="lost", label="Add as") | |
| upload_img = gr.Image(type="pil", label="Item photo (optional)") | |
| text_desc = gr.Textbox(lines=2, placeholder="Short description (e.g. 'black backpack with blue zipper')", label="Description (optional)") | |
| finder_name = gr.Textbox(label="Finder Name (only if found)", placeholder="e.g. John Doe") | |
| finder_phone = gr.Textbox(label="Finder Phone (only if found)", placeholder="e.g. +1234567890") | |
| add_btn = gr.Button("Add item") | |
| add_out = gr.Textbox(label="Add result", interactive=False) | |
| clear_btn = gr.Button("Clear All Images") | |
| clear_out = gr.Textbox(label="Clear Result", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Search") | |
| query_img = gr.Image(type="pil", label="Search by image (optional)") | |
| query_text = gr.Textbox(lines=2, label="Search by text (optional)") | |
| score_slider = gr.Slider(0.5, 1.0, value=0.90, step=0.01, label="Min similarity threshold") | |
| search_btn = gr.Button("Search") | |
| search_out = gr.Textbox(label="Search results (text)", interactive=False) | |
| gallery = gr.Gallery(label="Search Results", show_label=True, elem_id="gallery", columns=2, height="auto") | |
| add_btn.click(add_item, inputs=[mode, upload_img, text_desc, finder_name, finder_phone], outputs=[add_out]) | |
| search_btn.click(search_items, inputs=[query_img, query_text, gr.Number(value=5, visible=False), score_slider], outputs=[search_out, gallery]) | |
| clear_btn.click(clear_all_images, outputs=[clear_out]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |