Spaces:
Running
Running
| """ | |
| Local AI Media Scout - Privacy-First Semantic Media Search | |
| MCP Server runs on your local machine | |
| Features: | |
| - Web UI for testing (http://localhost:7860) | |
| - MCP server for Claude Desktop (http://localhost:7860/gradio_api/mcp/sse) | |
| - All AI processing happens locally | |
| - No data sent to cloud | |
| - Powered by SigLIP so400m (higher accuracy than CLIP) | |
| For MCP 1st Birthday Hackathon | |
| Track: Building MCP - Productivity | |
| """ | |
| import gradio as gr | |
| import json | |
| import os | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from ai_indexer import LocalMediaIndexer | |
| import logging | |
| from huggingface_hub import snapshot_download | |
| # Load environment variables (before AppSignal so .env keys are available locally) | |
| load_dotenv() | |
| # --- AppSignal APM --- | |
| from appsignal import Appsignal, set_category, send_error, send_error_with_context, set_params, set_gauge, increment_counter | |
| import time | |
| from opentelemetry.instrumentation.starlette import StarletteInstrumentor | |
| from opentelemetry import trace | |
| appsignal_client = Appsignal( | |
| active=True, | |
| name="MediaSearchMCP", | |
| push_api_key=os.getenv("APPSIGNAL_PUSH_API_KEY", ""), | |
| environment=os.getenv("APPSIGNAL_APP_ENV", "development"), | |
| enable_host_metrics=True, | |
| ) | |
| appsignal_client.start() | |
| tracer = trace.get_tracer(__name__) | |
| # Detect if running on Hugging Face Spaces | |
| IS_HUGGINGFACE_SPACE = os.getenv("SPACE_ID") is not None | |
| # Configuration | |
| MEDIA_DIR = os.getenv('MEDIA_DIR', 'media') | |
| INDEX_DIR = os.getenv('INDEX_DIR', 'index') | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Print startup banner | |
| print("=" * 60) | |
| print("Local AI Media Scout") | |
| print("Privacy-First Semantic Media Search") | |
| print("Powered by SigLIP so400m (higher accuracy than CLIP)") | |
| print("=" * 60) | |
| print(f"Media Directory: {MEDIA_DIR}") | |
| print(f"Index Directory: {INDEX_DIR}") | |
| print("=" * 60) | |
| # Download demo media files from HF dataset if not present locally | |
| if IS_HUGGINGFACE_SPACE or not os.path.exists(MEDIA_DIR) or not os.listdir(MEDIA_DIR): | |
| logger.info("Downloading demo media files from Hugging Face dataset...") | |
| try: | |
| snapshot_download( | |
| repo_id="mugdhav/media-search-demo-files", | |
| repo_type="dataset", | |
| local_dir=MEDIA_DIR, | |
| allow_patterns=["images/**", "videos/**"], | |
| ignore_patterns=["*.md", ".gitattributes"] | |
| ) | |
| logger.info("Media files downloaded successfully") | |
| except Exception as e: | |
| with send_error_with_context(e): | |
| set_params({"severity": "warning", "source": "media_download"}) | |
| logger.error(f"Failed to download media files: {e}") | |
| logger.warning("Continuing without demo media files") | |
| # Initialize local AI indexer | |
| logger.info("Initializing Local AI Media Indexer...") | |
| try: | |
| indexer = LocalMediaIndexer( | |
| media_dir=MEDIA_DIR, | |
| index_dir=INDEX_DIR | |
| ) | |
| # Build/load index on startup | |
| indexer.index_local_directory(force_reindex=False) | |
| except Exception as e: | |
| with send_error_with_context(e): | |
| set_params({"severity": "critical", "source": "indexer_init"}) | |
| logger.error(f"β Failed to initialize indexer: {e}") | |
| raise | |
| # ============================================================================ | |
| # MCP Tool Functions | |
| # These functions are exposed as MCP tools to Claude Desktop | |
| # ============================================================================ | |
| def semantic_search(query: str, media_type: str = "all", top_k: int = 5) -> str: | |
| """ | |
| Search LOCAL media using natural language descriptions. | |
| All processing happens on your machine - no data leaves your computer. | |
| Args: | |
| query: Natural language search (e.g., "ducks in lily pond", "graduation ceremony") | |
| media_type: Filter by type - "image", "video", or "all" | |
| top_k: Number of results to return (1-20) | |
| Returns: | |
| JSON with ranked search results and similarity scores | |
| """ | |
| with tracer.start_as_current_span("semantic_search") as span: | |
| set_category("mcp_tool.semantic_search") | |
| span.set_attribute("search.query", query or "") | |
| span.set_attribute("search.media_type", media_type) | |
| span.set_attribute("search.top_k", top_k) | |
| t_start = time.time() | |
| try: | |
| if not query or not query.strip(): | |
| with send_error_with_context(ValueError("Query cannot be empty")): | |
| set_params({"severity": "warning", "source": "semantic_search"}) | |
| increment_counter("semantic_search.errors", 1) | |
| return json.dumps({"error": "Query cannot be empty"}, indent=2) | |
| top_k = max(1, min(20, int(top_k))) | |
| # Perform semantic search (locally) | |
| logger.info(f"Searching for: '{query}'") | |
| results = indexer.search(query, top_k=top_k) | |
| # Filter by media type | |
| if media_type != "all": | |
| results = [r for r in results if r.get('media_type') == media_type] | |
| response = { | |
| "query": query, | |
| "media_type": media_type, | |
| "count": len(results), | |
| "results": results, | |
| "note": "All processing done locally on your machine" | |
| } | |
| # Custom metrics for AppSignal triggers | |
| duration_ms = (time.time() - t_start) * 1000 | |
| set_gauge("semantic_search.duration_ms", duration_ms) | |
| set_gauge("semantic_search.result_count", float(len(results))) | |
| increment_counter("semantic_search.calls", 1) | |
| if results: | |
| set_gauge("semantic_search.top_similarity", results[0]["similarity_score"]) | |
| avg_score = sum(r["similarity_score"] for r in results) / len(results) | |
| set_gauge("semantic_search.avg_similarity", avg_score) | |
| else: | |
| increment_counter("semantic_search.zero_results", 1) | |
| span.set_attribute("search.result_count", len(results)) | |
| return json.dumps(response, indent=2) | |
| except Exception as e: | |
| increment_counter("semantic_search.errors", 1) | |
| with send_error_with_context(e): | |
| set_params({"severity": "error", "source": "semantic_search"}) | |
| logger.error(f"[ERROR] Search error: {e}") | |
| return json.dumps({"error": str(e)}, indent=2) | |
| def get_media_details(file_path: str) -> str: | |
| """ | |
| Get detailed information about a LOCAL media file. | |
| Args: | |
| file_path: Path to the local media file | |
| Returns: | |
| JSON with file details and metadata | |
| """ | |
| with tracer.start_as_current_span("get_media_details") as span: | |
| set_category("mcp_tool.get_media_details") | |
| span.set_attribute("media.file_path", file_path or "") | |
| try: | |
| if not os.path.exists(file_path): | |
| with send_error_with_context(FileNotFoundError(f"File not found: {file_path}")): | |
| set_params({"severity": "warning", "source": "get_media_details", "file_path": file_path}) | |
| return json.dumps({"error": f"File not found: {file_path}"}, indent=2) | |
| stat = os.stat(file_path) | |
| ext = Path(file_path).suffix.lower() | |
| image_exts = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'} | |
| video_exts = {'.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv'} | |
| media_type = "image" if ext in image_exts else "video" if ext in video_exts else "unknown" | |
| details = { | |
| "file_path": file_path, | |
| "file_name": os.path.basename(file_path), | |
| "media_type": media_type, | |
| "file_size_mb": round(stat.st_size / (1024*1024), 2), | |
| "extension": ext, | |
| "exists": True, | |
| "in_index": file_path in indexer.file_paths | |
| } | |
| span.set_attribute("media.type", media_type) | |
| return json.dumps(details, indent=2) | |
| except Exception as e: | |
| with send_error_with_context(e): | |
| set_params({"severity": "error", "source": "get_media_details"}) | |
| logger.error(f"[ERROR] Error getting details: {e}") | |
| return json.dumps({"error": str(e)}, indent=2) | |
| def reindex_media(force: bool = False) -> str: | |
| """ | |
| Rebuild the LOCAL media index. | |
| Scans your media directory and re-indexes all files. | |
| Args: | |
| force: Force complete reindex even if index exists | |
| Returns: | |
| Status message | |
| """ | |
| with tracer.start_as_current_span("reindex_media") as span: | |
| set_category("mcp_tool.reindex_media") | |
| span.set_attribute("reindex.force", force) | |
| t_start = time.time() | |
| try: | |
| logger.info("Starting reindex...") | |
| indexer.index_local_directory(force_reindex=force) | |
| response = { | |
| "status": "success", | |
| "message": f"Reindexed {len(indexer.file_paths)} files", | |
| "media_directory": MEDIA_DIR, | |
| "index_directory": INDEX_DIR | |
| } | |
| duration_ms = (time.time() - t_start) * 1000 | |
| set_gauge("reindex.duration_ms", duration_ms) | |
| set_gauge("reindex.file_count", float(len(indexer.file_paths))) | |
| increment_counter("reindex.calls", 1) | |
| span.set_attribute("reindex.file_count", len(indexer.file_paths)) | |
| return json.dumps(response, indent=2) | |
| except Exception as e: | |
| increment_counter("reindex.errors", 1) | |
| with send_error_with_context(e): | |
| set_params({"severity": "error", "source": "reindex_media"}) | |
| logger.error(f"[ERROR] Reindex error: {e}") | |
| return json.dumps({"error": str(e)}, indent=2) | |
| def get_index_stats() -> str: | |
| """ | |
| Get statistics about the LOCAL media index. | |
| Returns: | |
| JSON with index statistics | |
| """ | |
| with tracer.start_as_current_span("get_index_stats") as span: | |
| set_category("mcp_tool.get_index_stats") | |
| try: | |
| image_count = sum(1 for fp in indexer.file_paths | |
| if indexer.file_metadata.get(fp, {}).get('type') == 'image') | |
| video_count = sum(1 for fp in indexer.file_paths | |
| if indexer.file_metadata.get(fp, {}).get('type') == 'video') | |
| total_size = sum(meta.get('size', 0) | |
| for meta in indexer.file_metadata.values()) | |
| stats = { | |
| "total_files": len(indexer.file_paths), | |
| "images": image_count, | |
| "videos": video_count, | |
| "total_size_mb": round(total_size / (1024*1024), 2), | |
| "media_directory": MEDIA_DIR, | |
| "index_directory": INDEX_DIR, | |
| "model_device": indexer.device, | |
| "model_used": "google/siglip-so400m-patch14-384", | |
| "privacy_note": "All data processed locally - nothing sent to cloud" | |
| } | |
| set_gauge("index.total_files", float(len(indexer.file_paths))) | |
| set_gauge("index.images", float(image_count)) | |
| set_gauge("index.videos", float(video_count)) | |
| increment_counter("get_index_stats.calls", 1) | |
| span.set_attribute("stats.total_files", len(indexer.file_paths)) | |
| span.set_attribute("stats.images", image_count) | |
| span.set_attribute("stats.videos", video_count) | |
| return json.dumps(stats, indent=2) | |
| except Exception as e: | |
| with send_error_with_context(e): | |
| set_params({"severity": "error", "source": "get_index_stats"}) | |
| logger.error(f"[ERROR] Error getting stats: {e}") | |
| return json.dumps({"error": str(e)}, indent=2) | |
| # ============================================================================ | |
| # UI Helper Functions | |
| # ============================================================================ | |
| def format_table_view(results_json) -> str: | |
| """Format search results as HTML table with thumbnails""" | |
| import base64 | |
| import io | |
| try: | |
| # Handle both dict (from gr.JSON) and string (from semantic_search) | |
| if isinstance(results_json, str): | |
| data = json.loads(results_json) | |
| else: | |
| data = results_json | |
| if "error" in data: | |
| return f"<p style='color: red;'>Error: {data['error']}</p>" | |
| results = data.get("results", []) | |
| if not results: | |
| return "<p>No results found</p>" | |
| html = """ | |
| <style> | |
| .results-table { width: 100%; border-collapse: collapse; margin-top: 1rem; } | |
| .results-table th { background: #f3f4f6; padding: 0.75rem; text-align: left; border-bottom: 2px solid #e5e7eb; } | |
| .results-table td { padding: 0.75rem; border-bottom: 1px solid #e5e7eb; vertical-align: middle; } | |
| .results-table tr:hover { background: #f9fafb; } | |
| .thumbnail { width: 100px; height: 100px; object-fit: cover; border-radius: 0.5rem; } | |
| .thumbnail-placeholder { width: 100px; height: 100px; border-radius: 0.5rem; display: flex; align-items: center; justify-content: center; font-weight: bold; } | |
| .view-link { color: #2563eb; text-decoration: none; font-size: 0.875rem; } | |
| .view-link:hover { text-decoration: underline; } | |
| </style> | |
| <table class="results-table"> | |
| <thead> | |
| <tr> | |
| <th>Thumbnail</th> | |
| <th>File Name</th> | |
| <th>Size (MB)</th> | |
| <th>Type</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """ | |
| for result in results: | |
| file_path = result.get("file_path", "") | |
| file_name = result.get("file_name", "Unknown") | |
| media_type = result.get("media_type", "unknown") | |
| size_mb = result.get("file_size_mb", 0) | |
| # Create thumbnail and view link | |
| thumbnail_html = "" | |
| view_link = "" | |
| if media_type == "image" and os.path.exists(file_path): | |
| try: | |
| from PIL import Image | |
| img = Image.open(file_path) | |
| img.thumbnail((100, 100)) | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="PNG") | |
| img_str = base64.b64encode(buffer.getvalue()).decode() | |
| thumbnail_html = f'<img src="data:image/png;base64,{img_str}" class="thumbnail" alt="{file_name}">' | |
| view_link = f'<a href="/file={file_path}" target="_blank" class="view-link">[view]</a>' | |
| except Exception: | |
| thumbnail_html = '<div class="thumbnail-placeholder" style="background: #e5e7eb;">IMG</div>' | |
| elif media_type == "video" and os.path.exists(file_path): | |
| try: | |
| import cv2 as _cv2 | |
| from PIL import Image | |
| cap = _cv2.VideoCapture(file_path) | |
| total = int(cap.get(_cv2.CAP_PROP_FRAME_COUNT)) | |
| if total > 0: | |
| cap.set(_cv2.CAP_PROP_POS_FRAMES, total // 2) | |
| ret, frame = cap.read() | |
| if ret: | |
| frame_rgb = _cv2.cvtColor(frame, _cv2.COLOR_BGR2RGB) | |
| vimg = Image.fromarray(frame_rgb) | |
| vimg.thumbnail((100, 100)) | |
| vbuf = io.BytesIO() | |
| vimg.save(vbuf, format="PNG") | |
| vstr = base64.b64encode(vbuf.getvalue()).decode() | |
| thumbnail_html = f'<img src="data:image/png;base64,{vstr}" class="thumbnail" alt="{file_name}">' | |
| else: | |
| thumbnail_html = '<div class="thumbnail-placeholder" style="background: #dbeafe; color: #1e40af;">VIDEO</div>' | |
| else: | |
| thumbnail_html = '<div class="thumbnail-placeholder" style="background: #dbeafe; color: #1e40af;">VIDEO</div>' | |
| cap.release() | |
| except Exception: | |
| thumbnail_html = '<div class="thumbnail-placeholder" style="background: #dbeafe; color: #1e40af;">VIDEO</div>' | |
| view_link = f'<a href="/file={file_path}" target="_blank" class="view-link">[view]</a>' | |
| else: | |
| thumbnail_html = '<div class="thumbnail-placeholder" style="background: #e5e7eb;">?</div>' | |
| file_info_html = f'<strong>{file_name}</strong><br><small style="color: #6b7280;">{file_path} {view_link}</small>' | |
| html += f""" | |
| <tr> | |
| <td>{thumbnail_html}</td> | |
| <td>{file_info_html}</td> | |
| <td>{size_mb}</td> | |
| <td><span style="text-transform: uppercase; font-weight: 500; color: #6b7280;">{media_type}</span></td> | |
| </tr> | |
| """ | |
| html += "</tbody></table>" | |
| return html | |
| except Exception as e: | |
| logger.error(f"[GRADIO-ERROR] format_table_view: {e}") | |
| return f"<p style='color: red;'>Error formatting table: {str(e)}</p>" | |
| # ============================================================================ | |
| # Search and Display Helper Functions | |
| # ============================================================================ | |
| def search_and_display(query: str, media_type: str, top_k: int, current_view: str): | |
| """ | |
| Perform search and return results formatted for current view. | |
| Returns: (state_data, json_data, table_html, table_visible, json_visible) | |
| """ | |
| # Perform search | |
| results_json_str = semantic_search(query, media_type, top_k) | |
| # Parse to dict for state storage | |
| try: | |
| results_dict = json.loads(results_json_str) if results_json_str else None | |
| except: | |
| results_dict = None | |
| # Render based on current view | |
| if current_view == "JSON": | |
| return ( | |
| results_dict, | |
| gr.update(value=results_dict, visible=True), | |
| gr.update(value="", visible=False), | |
| ) | |
| else: # Table view | |
| table_html = format_table_view(results_dict) if results_dict else "<p>No results</p>" | |
| return ( | |
| results_dict, | |
| gr.update(value=results_dict, visible=False), | |
| gr.update(value=table_html, visible=True), | |
| ) | |
| def toggle_view(view_type: str, stored_results): | |
| """ | |
| Toggle between JSON and Table view using stored results. | |
| Returns: (json_data, table_html, table_visible, json_visible) | |
| """ | |
| if not stored_results: | |
| return ( | |
| gr.update(value=None, visible=True), | |
| gr.update(value="<p>No search results yet. Please search first.</p>", visible=False), | |
| ) | |
| if view_type == "JSON": | |
| return ( | |
| gr.update(value=stored_results, visible=True), | |
| gr.update(value="", visible=False), | |
| ) | |
| else: # Table view | |
| table_html = format_table_view(stored_results) | |
| return ( | |
| gr.update(value=stored_results, visible=False), | |
| gr.update(value=table_html, visible=True), | |
| ) | |
| # ============================================================================ | |
| # Gradio Interface | |
| # Creates both Web UI and MCP server | |
| # ============================================================================ | |
| with gr.Blocks(title="Local AI Media Scout") as demo: | |
| # Privacy banner - conditional based on environment | |
| if IS_HUGGINGFACE_SPACE: | |
| banner_text = """πΊ DEMO MODE: Running on HF Spaces with <a href="https://huggingface.co/datasets/mugdhav/media-search-demo-files" | |
| style="color: white; text-decoration: underline;" target="_blank">sample media</a>. | |
| <a href="https://github.com/mugdhav/local_media_search_mcp_server" | |
| style="color: white; text-decoration: underline;" target="_blank"> | |
| Install locally</a> for use with your media files.""" | |
| else: | |
| banner_text = """π 100% Local & Private: Your media stays on your machine. | |
| AI runs locally. Zero cloud uploads.""" | |
| gr.HTML(f""" | |
| <div style="background: linear-gradient(90deg, #10b981 0%, #059669 100%); color: white; padding: 1rem; border-radius: 0.5rem; margin-bottom: 1rem; text-align: center; font-weight: bold;"> | |
| {banner_text} | |
| </div> | |
| """) | |
| # Header | |
| gr.Markdown(""" | |
| # π¬ Local AI Media Scout | |
| ### MCP Server for AI-Powered Local Media Indexing and Search | |
| """) | |
| # Three cards in a row with equal height | |
| with gr.Row(equal_height=True): | |
| # Card 1: What is this + Use it with | |
| with gr.Column(scale=1): | |
| # Conditional text for demo vs local mode | |
| if IS_HUGGINGFACE_SPACE: | |
| web_ui_text = "π This Web UI (demo with sample media)" | |
| else: | |
| web_ui_text = "π This Web UI on your machine" | |
| gr.HTML(f""" | |
| <div style="background: #f9fafb; border: 1px solid #e5e7eb; border-radius: 0.5rem; padding: 1.5rem; min-height: 320px; display: flex; flex-direction: column;"> | |
| <div> | |
| <h3 style="margin-top: 0; color: #1f2937; font-size: 1.125rem;">π‘ What is this?</h3> | |
| <p style="color: #4b5563; line-height: 1.6;"> | |
| An MCP (Model Context Protocol) server that gives AI agents access to your local photos and videos through indexing and natural language search β all processing happens on your machine. | |
| </p> | |
| <h3 style="color: #1f2937; font-size: 1.125rem; margin-top: 1rem;">π Use it with:</h3> | |
| <ul style="color: #4b5563; line-height: 1.8; margin-bottom: 0;"> | |
| <li>{web_ui_text}</li> | |
| <li>π€ Claude Desktop (via MCP integration)</li> | |
| <li>π» Coding agents and AI assistants (local or cloud)</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """) | |
| # Card 2: How it works | |
| with gr.Column(scale=1): | |
| gr.HTML(""" | |
| <div style="background: #f0fdf4; border: 1px solid #bbf7d0; border-radius: 0.5rem; padding: 1.5rem; min-height: 320px; display: flex; flex-direction: column;"> | |
| <div> | |
| <h3 style="margin-top: 0; color: #166534; font-size: 1.125rem;">βοΈ How it works</h3> | |
| <ol style="color: #15803d; line-height: 1.8; margin-bottom: 0; padding-left: 1.25rem;"> | |
| <li>Indexes your local media using <a href="https://huggingface.co/google/siglip-so400m-patch14-384" target="_blank" style="color: #16a34a; text-decoration: underline;">SigLIP</a> (local AI vision model)</li> | |
| <li>Exposes semantic search via MCP protocol</li> | |
| <li>Enables AI agents to look up your local media using natural language queries</li> | |
| </ol> | |
| </div> | |
| </div> | |
| """) | |
| # Card 3: Privacy-First Architecture | |
| with gr.Column(scale=1): | |
| gr.HTML(""" | |
| <div style="background: #eff6ff; border: 1px solid #bfdbfe; border-radius: 0.5rem; padding: 1.5rem; min-height: 320px; display: flex; flex-direction: column;"> | |
| <div> | |
| <h3 style="margin-top: 0; color: #1e40af; font-size: 1.125rem;">π Privacy-First Architecture</h3> | |
| <ul style="color: #1e3a8a; line-height: 1.8; margin-bottom: 0; list-style: none; padding-left: 0;"> | |
| <li>β SigLIP AI model runs entirely on your CPU/GPU</li> | |
| <li>β All indexing and search happens locally</li> | |
| <li>β Media files never uploaded or transmitted</li> | |
| <li>β MCP connections are local-only (localhost)</li> | |
| <!-- li>β No external API calls or cloud dependencies</li --> | |
| </ul> | |
| </div> | |
| </div> | |
| """) | |
| # Available MCP Tools section | |
| gr.Markdown(""" | |
| **Try these MCP Tools:** | |
| """) | |
| # Tab 1: Semantic Search | |
| with gr.Tab("Semantic Search"): | |
| gr.Markdown(f""" | |
| Search Your Media with Natural Language | |
| AI-powered semantic search finds photos and videos by description, not just filenames. | |
| **Try queries like:** | |
| - "sunset at the beach" β’ "people celebrating" β’ "food on a table" β’ "outdoor activities" β’ "technology or computers" β’ "natural scenery" | |
| **Currently indexed:** `{MEDIA_DIR}` | |
| """) | |
| search_query = gr.Textbox( | |
| label="What are you looking for?", | |
| placeholder="e.g., 'sunset at the beach', 'people celebrating', 'natural scenery'" | |
| ) | |
| with gr.Row(): | |
| search_type = gr.Dropdown( | |
| choices=["all", "image", "video"], | |
| value="all", | |
| label="Media Type", | |
| scale=1 | |
| ) | |
| search_count = gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| value=5, | |
| step=1, | |
| label="Number of Results", | |
| scale=2 | |
| ) | |
| view_toggle = gr.Radio( | |
| choices=["Table", "JSON"], | |
| value="JSON", | |
| label="Result View", | |
| scale=1 | |
| ) | |
| search_btn = gr.Button("Search Locally", variant="primary", size="lg") | |
| # State to store latest search results | |
| search_results_state = gr.State(value=None) | |
| # Output components | |
| with gr.Group(): | |
| table_output = gr.HTML(label="Results", visible=False) | |
| json_output = gr.JSON(label="Results - JSON", visible=True) | |
| # Event: Search button click - update state and current view | |
| search_btn.click( | |
| fn=search_and_display, | |
| inputs=[search_query, search_type, search_count, view_toggle], | |
| outputs=[search_results_state, json_output, table_output] | |
| ) | |
| # Event: Toggle between views - re-render from state | |
| view_toggle.change( | |
| fn=toggle_view, | |
| inputs=[view_toggle, search_results_state], | |
| outputs=[json_output, table_output] | |
| ) | |
| # Tab 2: File Details | |
| with gr.Tab("π File Details"): | |
| gr.Markdown("Get details about a specific file in your local media library") | |
| file_path_input = gr.Textbox( | |
| label="File Path", | |
| placeholder=f"e.g., {MEDIA_DIR}\\images\\example.jpg" | |
| ) | |
| details_btn = gr.Button("Get Details", variant="primary") | |
| details_output = gr.JSON(label="File Information") | |
| details_btn.click( | |
| fn=get_media_details, | |
| inputs=file_path_input, | |
| outputs=details_output | |
| ) | |
| # Tab 3: Index Statistics | |
| with gr.Tab("π Index Statistics"): | |
| gr.Markdown("View statistics about your local media index") | |
| stats_btn = gr.Button("π Get Statistics", variant="primary", size="lg") | |
| stats_output = gr.JSON(label="Index Statistics") | |
| stats_btn.click( | |
| fn=get_index_stats, | |
| outputs=stats_output | |
| ) | |
| # Tab 4: Reindex | |
| with gr.Tab("π Reindex"): | |
| gr.Markdown(f""" | |
| ### Rebuild LOCAL Media Index | |
| Scans `{MEDIA_DIR}` for new files and rebuilds the AI index. | |
| **When to reindex:** | |
| - After adding new media files | |
| - After deleting files | |
| - If search results seem outdated | |
| **Note:** Reindexing may take several minutes depending on number of files. | |
| """) | |
| force_reindex = gr.Checkbox( | |
| label="Force complete reindex (slower but thorough)", | |
| value=False | |
| ) | |
| reindex_btn = gr.Button("π Rebuild Local Index", variant="primary", size="lg") | |
| reindex_output = gr.JSON(label="Reindex Status") | |
| reindex_btn.click( | |
| fn=reindex_media, | |
| inputs=force_reindex, | |
| outputs=reindex_output | |
| ) | |
| # Footer | |
| gr.Markdown(f""" | |
| --- | |
| **Available MCP Tools:** | |
| AI assistants connected to this server can use these tools: | |
| - **`semantic_search(query, media_type, top_k)`** | |
| Search your media using natural language (for example, "sunset at beach", "people celebrating") | |
| - **`get_media_details(file_path)`** | |
| Get metadata about a specific media file (size, type, indexed status) | |
| - **`get_index_stats()`** | |
| View statistics about indexed media (file counts, total size, media types) | |
| - **`reindex_media(force)`** | |
| Rebuild the search index after adding/removing media files | |
| **Connection Details:** | |
| - π Web UI: `http://localhost:7860` | |
| - π MCP Endpoint: `http://localhost:7860/gradio_api/mcp/sse` | |
| - π Media Directory: `{MEDIA_DIR}` | |
| - π Index Storage: `{INDEX_DIR}` | |
| **How to Connect Claude Desktop:** | |
| Add this to your Claude Desktop MCP config: | |
| ```json | |
| {{ | |
| "mcpServers": {{ | |
| "media-search": {{ | |
| "url": "http://localhost:7860/gradio_api/mcp/sse" | |
| }} | |
| }} | |
| }} | |
| ``` | |
| **Technology Stack:** | |
| Gradio 6 (MCP Server) β’ SigLIP (Local AI) β’ FAISS (Vector Search) β’ PyTorch | |
| Built for MCP 1st Birthday Hackathon | Track: Building MCP - Productivity | |
| """) | |
| # ============================================================================ | |
| # Launch Server | |
| # Runs both Web UI and MCP server | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("\n" + "=" * 60) | |
| print("Starting Local AI Media Scout") | |
| print("=" * 60) | |
| print("Web UI: http://localhost:7860") | |
| print("MCP Endpoint: http://localhost:7860/gradio_api/mcp/sse") | |
| print("=" * 60) | |
| print("\nPress Ctrl+C to stop\n") | |
| # Check if running in Hugging Face Space | |
| is_space = os.getenv("SPACE_ID") is not None | |
| launch_kwargs = { | |
| "mcp_server": True, | |
| "show_error": True, | |
| "quiet": False, | |
| "allowed_paths": [os.path.abspath(MEDIA_DIR)] | |
| } | |
| if not is_space: | |
| # Local settings | |
| launch_kwargs["server_name"] = "127.0.0.1" | |
| launch_kwargs["server_port"] = 7860 | |
| launch_kwargs["share"] = False | |
| # Instrument Gradio's internal Starlette/FastAPI app (per AppSignal docs) | |
| StarletteInstrumentor().instrument_app(demo.app) | |
| demo.launch(**launch_kwargs) | |