Spaces:
Sleeping
Sleeping
| import re | |
| import time | |
| import os | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer | |
| from ultralytics import YOLO | |
| # Lazy globals so the UI can start even when model downloads are flaky. | |
| DETECTOR = None | |
| EMBEDDER = None | |
| EMBEDDER_NAME = "" | |
| EMBEDDING_MODEL_CANDIDATES = [ | |
| "sentence-transformers/msmarco-MiniLM-L6-v3", | |
| "sentence-transformers/all-MiniLM-L6-v2", | |
| ] | |
| def _get_detector(): | |
| global DETECTOR | |
| if DETECTOR is None: | |
| DETECTOR = YOLO("yolov5nu.pt") | |
| return DETECTOR | |
| def _try_load_embedder(model_name: str, local_files_only: bool = False): | |
| return SentenceTransformer(model_name, device="cpu", local_files_only=local_files_only) | |
| def _get_embedder(): | |
| global EMBEDDER, EMBEDDER_NAME | |
| if EMBEDDER is not None: | |
| return EMBEDDER | |
| last_error = None | |
| for model_name in EMBEDDING_MODEL_CANDIDATES: | |
| for attempt in range(3): | |
| try: | |
| EMBEDDER = _try_load_embedder(model_name, local_files_only=False) | |
| EMBEDDER_NAME = model_name | |
| return EMBEDDER | |
| except Exception as exc: | |
| last_error = exc | |
| time.sleep(1.0 + attempt) | |
| # Final attempt in local-only mode in case files already exist in cache. | |
| for model_name in EMBEDDING_MODEL_CANDIDATES: | |
| try: | |
| EMBEDDER = _try_load_embedder(model_name, local_files_only=True) | |
| EMBEDDER_NAME = model_name | |
| return EMBEDDER | |
| except Exception as exc: | |
| last_error = exc | |
| raise RuntimeError(f"Could not load embedding model. Last error: {last_error}") | |
| def _normalize(v: np.ndarray) -> np.ndarray: | |
| norm = np.linalg.norm(v) | |
| if norm == 0: | |
| return v | |
| return v / norm | |
| def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| return float(np.dot(_normalize(a), _normalize(b))) | |
| def _tokenize(text: str): | |
| return set(re.findall(r"[a-z0-9]+", text.lower())) | |
| def _fallback_similarity(prompt: str, label: str) -> float: | |
| prompt_tokens = _tokenize(prompt) | |
| label_tokens = _tokenize(label) | |
| if not prompt_tokens or not label_tokens: | |
| return 0.0 | |
| inter = len(prompt_tokens & label_tokens) | |
| union = len(prompt_tokens | label_tokens) | |
| return inter / union | |
| def detect_and_match(image: Image.Image, task_prompt: str): | |
| if image is None: | |
| return None, "No image provided.", [] | |
| prompt = (task_prompt or "").strip() | |
| if not prompt: | |
| return image, "Please enter a task prompt.", [] | |
| try: | |
| detector = _get_detector() | |
| except Exception as exc: | |
| return image, f"Detector load failed: {exc}", [] | |
| results = detector.predict(image, verbose=False, device="cpu") | |
| result = results[0] | |
| boxes = result.boxes | |
| names = result.names | |
| if boxes is None or len(boxes) == 0: | |
| return image, "No objects detected.", [] | |
| detections = [] | |
| for i in range(len(boxes)): | |
| cls_id = int(boxes.cls[i].item()) | |
| conf = float(boxes.conf[i].item()) | |
| x1, y1, x2, y2 = boxes.xyxy[i].tolist() | |
| label = names.get(cls_id, str(cls_id)) if isinstance(names, dict) else names[cls_id] | |
| detections.append( | |
| { | |
| "index": i, | |
| "label": label, | |
| "confidence": round(conf, 4), | |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], | |
| } | |
| ) | |
| labels = [d["label"] for d in detections] | |
| best_idx = -1 | |
| best_score = -1.0 | |
| match_mode = "embedding" | |
| try: | |
| embedder = _get_embedder() | |
| prompt_emb = embedder.encode(prompt, convert_to_numpy=True) | |
| label_embs = embedder.encode(labels, convert_to_numpy=True) | |
| for i, emb in enumerate(label_embs): | |
| score = _cosine_similarity(prompt_emb, emb) | |
| detections[i]["similarity"] = round(score, 4) | |
| if score > best_score: | |
| best_score = score | |
| best_idx = i | |
| except Exception: | |
| match_mode = "keyword-fallback" | |
| for i, label in enumerate(labels): | |
| score = _fallback_similarity(prompt, label) | |
| detections[i]["similarity"] = round(score, 4) | |
| if score > best_score: | |
| best_score = score | |
| best_idx = i | |
| if best_idx < 0: | |
| return image, "Could not compute a match for detected objects.", [] | |
| annotated = image.convert("RGB").copy() | |
| draw = ImageDraw.Draw(annotated) | |
| for i, d in enumerate(detections): | |
| x1, y1, x2, y2 = d["bbox"] | |
| if i == best_idx: | |
| color = (255, 0, 0) | |
| width = 5 | |
| else: | |
| color = (0, 200, 0) | |
| width = 2 | |
| draw.rectangle([x1, y1, x2, y2], outline=color, width=width) | |
| text = f"{d['label']} conf={d['confidence']} sim={d['similarity']}" | |
| text_y = max(0, y1 - 14) | |
| draw.text((x1, text_y), text, fill=color) | |
| best = detections[best_idx] | |
| best_summary = ( | |
| f"Prompt: {prompt}\n" | |
| f"Mode: {match_mode}\n" | |
| f"Embedding model: {EMBEDDER_NAME if EMBEDDER_NAME else 'unavailable'}\n" | |
| f"Best match: {best['label']}\n" | |
| f"Confidence: {best['confidence']}\n" | |
| f"Similarity: {best['similarity']}\n" | |
| f"BBox: {best['bbox']}" | |
| ) | |
| detections_table = [ | |
| [d["index"], d["label"], d["confidence"], d["similarity"], str(d["bbox"])] | |
| for d in detections | |
| ] | |
| return annotated, best_summary, detections_table | |
| with gr.Blocks(title="DV CON design contest") as demo: | |
| gr.Markdown("# DV CON design contest") | |
| gr.Markdown( | |
| "Upload/capture an image, give a task prompt (example: 'I need to sit'), " | |
| "and the app highlights the closest detected object." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| image_input = gr.Image(type="pil", sources=["upload", "webcam"], label="Input Image") | |
| prompt_input = gr.Textbox( | |
| label="Task Prompt", | |
| placeholder="Example: I need to sit", | |
| lines=2, | |
| ) | |
| run_button = gr.Button("Run Detection + Task Match", variant="primary") | |
| with gr.Column(): | |
| annotated_output = gr.Image(type="pil", label="Annotated Output") | |
| best_output = gr.Textbox(label="Best Match") | |
| table_output = gr.Dataframe( | |
| headers=["idx", "label", "confidence", "similarity", "bbox"], | |
| datatype=["number", "str", "number", "number", "str"], | |
| label="All Detections", | |
| ) | |
| run_button.click( | |
| fn=detect_and_match, | |
| inputs=[image_input, prompt_input], | |
| outputs=[annotated_output, best_output, table_output], | |
| ) | |
| if __name__ == "__main__": | |
| is_space = bool(os.getenv("SPACE_ID")) | |
| if is_space: | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |
| else: | |
| demo.launch(server_name="127.0.0.1", server_port=7860) | |