Spaces:
Sleeping
Sleeping
| """ | |
| MCP Video Agent - HF Space with Modal Backend + Security | |
| Connects to Modal backend with authentication and rate limiting | |
| """ | |
| import os | |
| import gradio as gr | |
| import time | |
| import hashlib | |
| import base64 | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| # ========================================== | |
| # Security: Rate Limiting | |
| # ========================================== | |
| class RateLimiter: | |
| """Simple in-memory rate limiter""" | |
| def __init__(self, max_requests_per_hour=10): | |
| self.max_requests = max_requests_per_hour | |
| self.requests = defaultdict(list) | |
| def is_allowed(self, user_id): | |
| """Check if user is within rate limit""" | |
| now = datetime.now() | |
| cutoff = now - timedelta(hours=1) | |
| # Clean old requests | |
| self.requests[user_id] = [ | |
| req_time for req_time in self.requests[user_id] | |
| if req_time > cutoff | |
| ] | |
| # Check limit | |
| if len(self.requests[user_id]) >= self.max_requests: | |
| return False | |
| # Record new request | |
| self.requests[user_id].append(now) | |
| return True | |
| def get_remaining(self, user_id): | |
| """Get remaining requests for user""" | |
| now = datetime.now() | |
| cutoff = now - timedelta(hours=1) | |
| recent = [t for t in self.requests[user_id] if t > cutoff] | |
| return max(0, self.max_requests - len(recent)) | |
| # Initialize rate limiter (configurable via environment) | |
| MAX_REQUESTS_PER_HOUR = int(os.environ.get("MAX_REQUESTS_PER_HOUR", "10")) | |
| rate_limiter = RateLimiter(max_requests_per_hour=MAX_REQUESTS_PER_HOUR) | |
| # ========================================== | |
| # Modal Connection | |
| # ========================================== | |
| import modal | |
| def get_modal_function(function_name): | |
| """Connect to Modal function""" | |
| try: | |
| func = modal.Function.from_name("mcp-video-agent", function_name) | |
| return func | |
| except Exception as e: | |
| print(f"β Failed to connect to Modal: {e}") | |
| return None | |
| def get_modal_volume(): | |
| """Get Modal Volume for file operations""" | |
| try: | |
| vol = modal.Volume.from_name("video-storage") | |
| return vol | |
| except Exception as e: | |
| print(f"β Failed to connect to Modal Volume: {e}") | |
| return None | |
| def upload_to_modal_volume(local_path, remote_filename): | |
| """Upload file to Modal Volume using SDK batch_upload""" | |
| try: | |
| vol = get_modal_volume() | |
| if vol is None: | |
| return False, "Failed to connect to Modal Volume" | |
| # Use batch_upload with put_file | |
| with vol.batch_upload() as batch: | |
| batch.put_file(local_path, f"/{remote_filename}") | |
| print(f"β Uploaded to Modal Volume: {remote_filename}") | |
| return True, "Success" | |
| except Exception as e: | |
| print(f"β Upload error: {e}") | |
| return False, str(e) | |
| def download_from_modal_volume(remote_filename, local_path): | |
| """Download file from Modal Volume using SDK read_file""" | |
| try: | |
| vol = get_modal_volume() | |
| if vol is None: | |
| return False | |
| # Clear file if exists | |
| if os.path.exists(local_path): | |
| os.remove(local_path) | |
| # Read file from volume (read_file returns an iterator of bytes) | |
| with open(local_path, 'wb') as f: | |
| for chunk in vol.read_file(f"/{remote_filename}"): | |
| f.write(chunk) | |
| print(f"β Downloaded from Modal Volume: {remote_filename}") | |
| return True | |
| except Exception as e: | |
| print(f"β Download error: {e}") | |
| return False | |
| # ========================================== | |
| # Gradio Interface Logic | |
| # ========================================== | |
| # Cache for uploaded videos | |
| uploaded_videos_cache = {} | |
| def process_interaction(user_message, history, video_file, username, request: gr.Request): | |
| """ | |
| Core chatbot logic with Modal backend and security. | |
| """ | |
| if history is None: | |
| history = [] | |
| # Get user identifier for rate limiting | |
| user_id = username # Use authenticated username | |
| # β IMMEDIATELY show user message and "thinking" status | |
| history = history + [{"role": "user", "content": user_message}] | |
| history = history + [{"role": "assistant", "content": "β³ Processing your request..."}] | |
| yield history | |
| # Check rate limit | |
| if not rate_limiter.is_allowed(user_id): | |
| remaining = rate_limiter.get_remaining(user_id) | |
| history[-1] = {"role": "assistant", "content": f"β οΈ Rate limit exceeded. You have {remaining} requests remaining this hour. Please try again later."} | |
| yield history | |
| return | |
| # Show remaining requests | |
| remaining = rate_limiter.get_remaining(user_id) | |
| print(f"π‘ User {user_id}: {remaining} requests remaining this hour") | |
| # 1. Check video upload | |
| if video_file is None: | |
| history[-1] = {"role": "assistant", "content": "β οΈ Please upload a video first!"} | |
| yield history | |
| return | |
| local_path = video_file | |
| # Check file size (100MB limit) | |
| file_size_mb = os.path.getsize(local_path) / (1024 * 1024) | |
| if file_size_mb > 100: | |
| history[-1] = {"role": "assistant", "content": f"β Video too large! Size: {file_size_mb:.1f}MB. Please upload a video smaller than 100MB."} | |
| yield history | |
| return | |
| # Generate unique filename | |
| with open(local_path, 'rb') as f: | |
| file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
| timestamp = int(time.time()) | |
| unique_filename = f"video_{timestamp}_{file_hash}.mp4" | |
| cache_key = f"{local_path}_{file_hash}" | |
| # 2. Upload to Modal Volume if needed | |
| if cache_key not in uploaded_videos_cache: | |
| history[-1] = {"role": "assistant", "content": f"π€ Uploading video ({file_size_mb:.1f}MB)... This may take a moment."} | |
| yield history | |
| try: | |
| success, error_msg = upload_to_modal_volume(local_path, unique_filename) | |
| if not success: | |
| history[-1] = {"role": "assistant", "content": f"β Upload failed: {error_msg}"} | |
| yield history | |
| return | |
| uploaded_videos_cache[cache_key] = unique_filename | |
| print(f"β Video uploaded: {unique_filename}") | |
| # Brief pause to ensure volume sync | |
| time.sleep(1) | |
| except Exception as e: | |
| history[-1] = {"role": "assistant", "content": f"β Upload error: {str(e)}"} | |
| yield history | |
| return | |
| else: | |
| unique_filename = uploaded_videos_cache[cache_key] | |
| history[-1] = {"role": "assistant", "content": "β»οΈ Using cached video..."} | |
| yield history | |
| # 3. Analyze video via Modal | |
| history[-1] = {"role": "assistant", "content": "π€ Analyzing video with Gemini..."} | |
| yield history | |
| try: | |
| analyze_fn = get_modal_function("_internal_analyze_video") | |
| if analyze_fn is None: | |
| history[-1] = {"role": "assistant", "content": "β Failed to connect to Modal backend. Please check deployment."} | |
| yield history | |
| return | |
| text_response = analyze_fn.remote(user_message, video_filename=unique_filename) | |
| except Exception as e: | |
| text_response = f"β Analysis error: {str(e)}" | |
| full_text_response = text_response | |
| # 4. Generate audio if successful | |
| if "β" not in text_response and "β οΈ" not in text_response: | |
| history[-1] = {"role": "assistant", "content": "π£οΈ Generating audio response..."} | |
| yield history | |
| try: | |
| speak_fn = get_modal_function("_internal_speak_text") | |
| if speak_fn is None: | |
| history[-1] = {"role": "assistant", "content": f"β οΈ TTS unavailable.\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} | |
| yield history | |
| return | |
| audio_filename = f"audio_{unique_filename.replace('.mp4', '.mp3')}" | |
| speak_fn.remote(text_response, audio_filename=audio_filename) | |
| # Download audio using SDK | |
| time.sleep(3) # Wait for TTS to complete | |
| local_audio = f"/tmp/{audio_filename}" | |
| # Remove old file if exists | |
| if os.path.exists(local_audio): | |
| os.remove(local_audio) | |
| max_retries = 3 | |
| for retry in range(max_retries): | |
| success = download_from_modal_volume(audio_filename, local_audio) | |
| if success and os.path.exists(local_audio) and os.path.getsize(local_audio) > 1000: | |
| break | |
| # Clean up partial file | |
| if os.path.exists(local_audio): | |
| os.remove(local_audio) | |
| time.sleep(2) | |
| if os.path.exists(local_audio) and os.path.getsize(local_audio) > 1000: | |
| with open(local_audio, 'rb') as f: | |
| audio_bytes = f.read() | |
| audio_base64 = base64.b64encode(audio_bytes).decode() | |
| response_content = f"""ποΈ **Audio Response** ({remaining} requests remaining this hour) | |
| <audio controls autoplay style="width: 100%; margin: 10px 0; background: #f0f0f0; border-radius: 5px;"> | |
| <source src="data:audio/mpeg;base64,{audio_base64}" type="audio/mpeg"> | |
| </audio> | |
| **π Full Text Response:** | |
| <div style="background-color: #000000; color: #00ff00; padding: 25px; border-radius: 10px; font-family: 'Courier New', monospace; line-height: 1.8; font-size: 14px; white-space: normal; word-wrap: break-word; overflow-wrap: break-word; max-width: 100%;"> | |
| {full_text_response} | |
| </div>""" | |
| history[-1] = {"role": "assistant", "content": response_content} | |
| yield history | |
| else: | |
| history[-1] = {"role": "assistant", "content": f"β οΈ Audio generation incomplete.\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} | |
| yield history | |
| except Exception as e: | |
| history[-1] = {"role": "assistant", "content": f"β Audio error: {str(e)}\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} | |
| yield history | |
| else: | |
| history[-1] = {"role": "assistant", "content": text_response} | |
| yield history | |
| # ========================================== | |
| # Gradio Interface with Authentication | |
| # ========================================== | |
| # Get credentials from environment | |
| GRADIO_USERNAME = os.environ.get("GRADIO_USERNAME", "admin") | |
| GRADIO_PASSWORD = os.environ.get("GRADIO_PASSWORD") | |
| # Authentication function (optional for Hackathon/Demo) | |
| def authenticate(username, password): | |
| """Authenticate users - only if password is set""" | |
| if GRADIO_PASSWORD is None: | |
| # No password set, allow anyone (good for Hackathon/Demo) | |
| return True | |
| return username == GRADIO_USERNAME and password == GRADIO_PASSWORD | |
| with gr.Blocks(title="π₯ MCP Video Agent") as demo: | |
| gr.Markdown("# π₯ MCP Video Agent") | |
| gr.Markdown("**π MCP 1st Birthday Hackathon** | Track: MCP in Action (Consumer & Creative)") | |
| gr.Markdown(f""" | |
| ### β‘ Key Innovation: Smart Frame Caching | |
| **First Query**: Video is analyzed deeply and cached (8-12 seconds) | |
| **Follow-up Queries**: Instant responses using cached context (2-3 seconds, 90% cost reduction!) | |
| **Cache Duration**: 1 hour - ask multiple questions without reprocessing | |
| --- | |
| ### π How to Use | |
| 1. **Upload** a video (MP4, max 100MB) | |
| 2. **Ask** your first question - video will be analyzed and cached | |
| 3. **Continue** asking follow-up questions - experience the speed boost! | |
| 4. **Listen** to voice responses (powered by ElevenLabs TTS) | |
| **Pro Tip**: After your first question, try asking 2-3 more to see how fast cached responses are! | |
| --- | |
| ### π‘οΈ Fair Usage Policy | |
| - **Rate Limit**: {MAX_REQUESTS_PER_HOUR} requests per hour per user | |
| - **Video Size**: Max 100MB | |
| - **Shared Resources**: This is a Hackathon demo - please use responsibly | |
| --- | |
| ### π§ Tech Stack | |
| - **Gemini 2.5 Flash**: Multimodal video analysis + Context Caching | |
| - **Modal**: Serverless backend + Persistent storage | |
| - **ElevenLabs**: Neural text-to-speech | |
| - **Gradio 6.0**: Interactive UI | |
| **Sponsor Tech Used**: β Modal | β Google Gemini | β ElevenLabs | |
| """) | |
| username_state = gr.State("") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="πΉ Upload Video (MP4)", sources=["upload"]) | |
| gr.Markdown("**Supported:** MP4, max 100MB") | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="π¬ Conversation", height=500) | |
| msg = gr.Textbox( | |
| label="Your question...", | |
| placeholder="What is this video about?", | |
| lines=2 | |
| ) | |
| submit_btn = gr.Button("π Send", variant="primary") | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| ["What is happening in this video?"], | |
| ["Describe the main content of this video."], | |
| ["What are the key visual elements?"], | |
| ], | |
| inputs=msg | |
| ) | |
| # Get username from Gradio request | |
| def set_username(request: gr.Request): | |
| return request.username if hasattr(request, 'username') else "anonymous" | |
| demo.load(set_username, None, username_state) | |
| # Event handlers | |
| submit_btn.click( | |
| process_interaction, | |
| inputs=[msg, chatbot, video_input, username_state], | |
| outputs=[chatbot] | |
| ) | |
| msg.submit( | |
| process_interaction, | |
| inputs=[msg, chatbot, video_input, username_state], | |
| outputs=[chatbot] | |
| ) | |
| # ========================================== | |
| # Launch with Authentication | |
| # ========================================== | |
| if __name__ == "__main__": | |
| # Optional authentication (for Hackathon, usually not needed) | |
| auth_config = None | |
| if GRADIO_PASSWORD: | |
| auth_config = authenticate | |
| print(f"π Authentication enabled. Username: {GRADIO_USERNAME}") | |
| else: | |
| print("π Public access enabled (no authentication required)") | |
| print(" Rate limiting active to prevent abuse") | |
| print(f" Limit: {MAX_REQUESTS_PER_HOUR} requests/hour per user") | |
| demo.launch( | |
| auth=auth_config, | |
| show_error=True, | |
| share=False | |
| ) | |