Spaces:
Sleeping
Sleeping
| """ | |
| Gradio Demo for Media Optimization Pipeline | |
| This app allows users to upload images or videos and see the AI analysis results | |
| in a user-friendly interface. It connects to the Modal-deployed Qwen3 Omni model. | |
| """ | |
| import sys | |
| import traceback | |
| print("=" * 60) | |
| print("π Starting Media Optimization Gradio App...") | |
| print("=" * 60) | |
| try: | |
| import gradio as gr | |
| print("β Gradio imported successfully") | |
| except Exception as e: | |
| print(f"β Failed to import Gradio: {e}") | |
| sys.exit(1) | |
| import json | |
| import uuid | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional | |
| print("β Standard libraries imported") | |
| # Import the main analysis function | |
| DEMO_MODE = False | |
| try: | |
| from main_media import run_media_file | |
| print("β Successfully imported run_media_file from main_media") | |
| except ImportError as e: | |
| print(f"β οΈ Could not import main_media: {e}") | |
| print("β οΈ Running in DEMO MODE - will return sample data") | |
| DEMO_MODE = True | |
| def run_media_file(media_file, request_id, segment_id): | |
| """Demo mode - returns sample analysis""" | |
| return { | |
| "mediaSummary": "This is a demo response. The backend Modal service is not accessible from this HF Space. To use the full analysis, you need to add your Modal API credentials to the Space secrets.", | |
| "viralPotential": {"radScore": 75, "reasoning": "Demo data - backend not connected"}, | |
| "engagementMetrics": {"likelihoodOfShares": 8, "estimatedViewership": 10000}, | |
| "qualityScore": 85, | |
| "emotions": ["excitement", "curiosity"], | |
| "platformAnalysis": {"bestPlatforms": ["Instagram", "TikTok"]}, | |
| "videoHighlights": [] | |
| } | |
| except Exception as e: | |
| print(f"β Unexpected error importing main_media: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| DEMO_MODE = True | |
| def run_media_file(media_file, request_id, segment_id): | |
| """Error mode""" | |
| return { | |
| "error": f"Failed to initialize backend: {str(e)}" | |
| } | |
| def analyze_media(media_file) -> Dict[str, Any]: | |
| """ | |
| Analyze uploaded media file (image or video). | |
| Args: | |
| media_file: Gradio file upload object | |
| Returns: | |
| Dictionary with analysis results including processing time | |
| """ | |
| # Check if media_file is None or empty (handle numpy/pandas arrays) | |
| try: | |
| if media_file is None or (hasattr(media_file, '__len__') and len(media_file) == 0): | |
| return {"error": "Please upload a media file"} | |
| except: | |
| if not media_file: | |
| return {"error": "Please upload a media file"} | |
| # Generate unique IDs for this request | |
| request_id = str(uuid.uuid4()) | |
| segment_id = str(uuid.uuid4()) | |
| try: | |
| import time | |
| start_time = time.time() | |
| if DEMO_MODE: | |
| print("β οΈ Running in DEMO MODE") | |
| # Convert media_file to string path if needed | |
| if hasattr(media_file, 'name'): | |
| media_path = media_file.name | |
| elif isinstance(media_file, (str, Path)): | |
| media_path = str(media_file) | |
| else: | |
| media_path = media_file | |
| print(f"π Analyzing media file: {media_path}") | |
| # Run the analysis | |
| result = run_media_file(media_path, request_id, segment_id) | |
| # Calculate processing time | |
| processing_time = time.time() - start_time | |
| # Ensure result is a dictionary | |
| if not isinstance(result, dict): | |
| return {"error": f"Invalid result type: {type(result).__name__}", "raw": str(result)[:500]} | |
| # Add processing time to result | |
| result['processing_time_seconds'] = round(processing_time, 2) | |
| if DEMO_MODE: | |
| result['demo_mode'] = True | |
| return result | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"β Analysis error: {error_details}") | |
| return {"error": f"Analysis failed: {str(e)}", "details": error_details[:500]} | |
| def format_analysis_results(result: Dict[str, Any]) -> tuple: | |
| """ | |
| Format the analysis results for display in Gradio. | |
| Returns a tuple of formatted strings/data for each output component. | |
| """ | |
| # Safety check | |
| if not isinstance(result, dict): | |
| error_msg = f"Invalid result format: {type(result)}" | |
| return ("", error_msg, "", "", "", "", "", "", json.dumps({"error": error_msg}, indent=2)) | |
| # Get processing time first (before error check) | |
| processing_time = result.get('processing_time_seconds', 'N/A') | |
| if processing_time != 'N/A': | |
| time_display = f"β±οΈ **Processing Time:** {processing_time} seconds" | |
| else: | |
| time_display = "β±οΈ **Processing Time:** Not available" | |
| if "error" in result: | |
| return ( | |
| time_display, # Processing time | |
| result["error"], # Summary | |
| "", # Viral Potential | |
| "", # Engagement | |
| "", # Quality & Memorability | |
| "", # Emotions & Moods | |
| "", # Platform Analysis | |
| "", # Highlights (video only) | |
| json.dumps(result, indent=2) # Raw JSON | |
| ) | |
| # Summary (without processing time - it's displayed separately now) | |
| summary = result.get("mediaSummary", "N/A") | |
| # mediaSummary is a string, not a dict | |
| if not isinstance(summary, str): | |
| summary = str(summary) if summary else "N/A" | |
| summary_text = f"**Summary:**\n{summary}" | |
| # Viral Potential | |
| viral = result.get("mediaViralPotential", {}) | |
| viral_text = f"""**Viral Potential:** {viral.get('viral_potential', 'N/A')} | |
| **Score:** {viral.get('viral_potential_score', 'N/A')}/100 | |
| **Summary:** {viral.get('summary', 'N/A')} | |
| **Key Strengths:** | |
| {chr(10).join('β’ ' + s for s in viral.get('key_strengths', []))} | |
| **Key Weaknesses:** | |
| {chr(10).join('β’ ' + w for w in viral.get('key_weaknesses', []))} | |
| **Rationale:** {viral.get('rationale', 'N/A')} | |
| **Suggestions:** | |
| {chr(10).join('β’ ' + s for s in viral.get('suggestions', []))} | |
| """ | |
| # Engagement | |
| engagement = result.get("mediaEngagement", {}) | |
| engagement_text = f"""**Engagement Level:** {engagement.get('engagement_level', 'N/A')} | |
| **Score:** {engagement.get('engagement_score', 'N/A')}/100 | |
| **Audience Interests:** {engagement.get('audience_interests', 'N/A')} | |
| **Summary:** {engagement.get('summary', 'N/A')} | |
| **Key Strengths:** | |
| {chr(10).join('β’ ' + s for s in engagement.get('key_strengths', []))} | |
| **Key Weaknesses:** | |
| {chr(10).join('β’ ' + w for w in engagement.get('key_weaknesses', []))} | |
| **Rationale:** {engagement.get('rationale', 'N/A')} | |
| **Suggestions:** | |
| {chr(10).join('β’ ' + s for s in engagement.get('suggestions', []))} | |
| """ | |
| # Quality & Memorability | |
| quality = result.get("mediaQuality", {}) | |
| memorability = result.get("mediaMemorability", {}) | |
| quality_mem_text = f"""**Quality:** {quality.get('quality', 'N/A')} | |
| **Rationale:** {quality.get('rationale', 'N/A')} | |
| **Memorability:** {memorability.get('memorability', 'N/A')} | |
| **Rationale:** {memorability.get('rationale', 'N/A')} | |
| """ | |
| # Emotions & Moods | |
| emotion = result.get("mediaEmotion", {}) | |
| mood = result.get("mediaMood", {}) | |
| emotion_scores = emotion.get('emotions', {}) | |
| emotion_list = "\n".join(f"β’ {k}: {v}" for k, v in emotion_scores.items()) if emotion_scores else "N/A" | |
| mood_scores = "\n".join(f"β’ {k}: {v}" for k, v in mood.items()) if mood else "N/A" | |
| emotion_mood_text = f"""**Dominant Emotion:** {emotion.get('dominant_emotion', 'N/A')} | |
| **Emotion Scores:** | |
| {emotion_list} | |
| **Mood Scores:** | |
| {mood_scores} | |
| """ | |
| # Platform Analysis | |
| platform_analysis = result.get("platformEngageAnalysis", {}) | |
| platform_text = "" | |
| for platform, data in platform_analysis.items(): | |
| analysis_key = f"{platform.lower()}_analysis" | |
| grade_key = f"{platform.lower()}_grade" | |
| analysis = data.get(analysis_key, {}) | |
| grade = data.get(grade_key, "N/A") | |
| platform_text += f"\n**{platform}** - Grade: {grade}\n" | |
| if isinstance(analysis, dict): | |
| strengths = analysis.get('strengths', []) | |
| weaknesses = analysis.get('weaknesses', []) | |
| recommendations = analysis.get('recommendations', []) | |
| comments = analysis.get('comments', '') | |
| if strengths: | |
| platform_text += f"Strengths:\n{chr(10).join('β’ ' + s for s in strengths)}\n" | |
| if weaknesses: | |
| platform_text += f"Weaknesses:\n{chr(10).join('β’ ' + w for w in weaknesses)}\n" | |
| if recommendations: | |
| platform_text += f"Recommendations:\n{chr(10).join('β’ ' + r for r in recommendations)}\n" | |
| if comments: | |
| platform_text += f"Comments: {comments}\n" | |
| platform_text += "\n---\n" | |
| # Video Highlights (only for videos) | |
| highlights_text = "" | |
| if "videoHighlight" in result: | |
| highlights = result["videoHighlight"] | |
| timestamps = highlights.get('hl_clips_timestamps', []) | |
| summaries = highlights.get('hl_clips_summaries', []) | |
| reasons = highlights.get('hl_clips_reasons', []) | |
| scores = highlights.get('hl_clips_scores', []) | |
| if timestamps: | |
| highlights_text = "**Video Highlights:**\n\n" | |
| for i, (ts, summ, reason, score) in enumerate(zip(timestamps, summaries, reasons, scores), 1): | |
| highlights_text += f"**Clip {i}** (Score: {score}/100)\n" | |
| highlights_text += f"Timestamps: {ts}\n" | |
| highlights_text += f"Reason: {reason}\n" | |
| highlights_text += f"Summary: {summ}\n\n" | |
| else: | |
| highlights_text = "No highlights detected" | |
| else: | |
| highlights_text = "Not applicable for images" | |
| # Raw JSON | |
| raw_json = json.dumps(result, indent=2) | |
| return ( | |
| time_display, # Processing time display | |
| summary_text, | |
| viral_text, | |
| engagement_text, | |
| quality_mem_text, | |
| emotion_mood_text, | |
| platform_text, | |
| highlights_text, | |
| raw_json | |
| ) | |
| def preview_media(media_file): | |
| """ | |
| Preview the uploaded media immediately without analysis. | |
| """ | |
| if media_file is None: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| # Detect media type | |
| ext = media_file.split(".")[-1].lower() | |
| is_video = ext in ["mp4", "mov", "avi", "mkv", "webm"] | |
| is_image = ext in ["jpg", "jpeg", "png", "webp", "gif"] | |
| # Return image and video separately (only one will be visible) | |
| if is_image: | |
| return gr.update(value=media_file, visible=True), gr.update(visible=False) | |
| elif is_video: | |
| return gr.update(visible=False), gr.update(value=media_file, visible=True) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| def process_media(image_file, video_file): | |
| """ | |
| Main processing function that analyzes the uploaded media. | |
| Returns analysis results with persistent processing time. | |
| """ | |
| # Determine which file was uploaded | |
| media_file = None | |
| if image_file is not None: | |
| media_file = image_file | |
| print(f"π¬ Processing image: {media_file}") | |
| elif video_file is not None: | |
| media_file = video_file | |
| print(f"π¬ Processing video: {media_file}") | |
| if media_file is None: | |
| empty_results = format_analysis_results({"error": "Please upload a media file"}) | |
| return empty_results | |
| # Run analysis | |
| result = analyze_media(media_file) | |
| formatted_results = format_analysis_results(result) | |
| return formatted_results | |
| # Custom CSS for better styling | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| margin: auto; | |
| } | |
| .output-markdown { | |
| font-size: 14px; | |
| } | |
| #processing_time_display { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| padding: 20px; | |
| border-radius: 12px; | |
| color: white; | |
| font-size: 20px; | |
| font-weight: bold; | |
| text-align: center; | |
| margin: 20px 0; | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15); | |
| min-height: 60px; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| } | |
| """ | |
| print("=" * 60) | |
| print("π¨ Building Gradio Interface...") | |
| print("=" * 60) | |
| # Build the Gradio interface | |
| try: | |
| demo = gr.Blocks(css=custom_css, title="Media Optimization AI", theme=gr.themes.Soft()) | |
| except Exception as e: | |
| print(f"β Failed to create Blocks: {e}") | |
| traceback.print_exc() | |
| sys.exit(1) | |
| with demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; margin-bottom: 20px;"> | |
| <h1>π¬ Media Optimization AI</h1> | |
| <p style="font-size: 16px; color: #666;"> | |
| Upload an image or video to get AI-powered analysis including viral potential, | |
| engagement insights, platform-specific recommendations, and more. | |
| </p> | |
| </div> | |
| """) | |
| # Show demo mode warning if applicable | |
| if DEMO_MODE: | |
| gr.Markdown(""" | |
| ### β οΈ Demo Mode Active | |
| Backend Modal service is not connected. The app will return sample data. | |
| To use the full analysis, configure Modal API credentials in Space secrets. | |
| """, elem_classes=["warning-box"]) | |
| # Single large upload area at the top | |
| with gr.Group(): | |
| gr.Markdown("### π Upload Image or Video") | |
| # Separate upload components for images and videos | |
| media_image = gr.Image( | |
| label="Upload Image", | |
| type="filepath", | |
| height=400, | |
| sources=["upload"] | |
| ) | |
| media_video = gr.Video( | |
| label="Upload Video", | |
| height=400, | |
| sources=["upload"] | |
| ) | |
| analyze_btn = gr.Button("π Analyze Media", variant="primary", size="lg", scale=1) | |
| gr.Markdown(""" | |
| ### π€ How to Use | |
| 1. Click above to upload an image or video | |
| 2. Preview will appear automatically | |
| 3. Click "Analyze Media" button to get AI insights | |
| ### Supported Formats | |
| - **Images**: JPG, PNG, WEBP, GIF, etc. | |
| - **Videos**: MP4, MOV, AVI, WEBM, etc. | |
| """) | |
| # Processing time display - always visible and prominent | |
| processing_time_output = gr.Markdown( | |
| value="", | |
| label="", | |
| show_label=False, | |
| elem_id="processing_time_display" | |
| ) | |
| # Results tabs below | |
| with gr.Tabs(): | |
| with gr.Tab("π Overview"): | |
| summary_output = gr.Markdown(label="Summary") | |
| with gr.Tab("π₯ Viral Potential"): | |
| viral_output = gr.Markdown(label="Viral Potential Analysis") | |
| with gr.Tab("π¬ Engagement"): | |
| engagement_output = gr.Markdown(label="Engagement Analysis") | |
| with gr.Tab("β¨ Quality & Memorability"): | |
| quality_output = gr.Markdown(label="Quality & Memorability") | |
| with gr.Tab("π Emotions & Moods"): | |
| emotion_output = gr.Markdown(label="Emotions & Moods") | |
| with gr.Tab("π± Platform Analysis"): | |
| platform_output = gr.Markdown(label="Platform-Specific Insights") | |
| with gr.Tab("π₯ Highlights"): | |
| highlights_output = gr.Markdown(label="Video Highlights") | |
| with gr.Tab("π Raw JSON"): | |
| json_output = gr.Code(label="Full JSON Response", language="json") | |
| try: | |
| # Connect the analyze button to the processing function | |
| analyze_btn.click( | |
| fn=process_media, | |
| inputs=[media_image, media_video], | |
| outputs=[ | |
| processing_time_output, # Processing time (persistent) | |
| summary_output, | |
| viral_output, | |
| engagement_output, | |
| quality_output, | |
| emotion_output, | |
| platform_output, | |
| highlights_output, | |
| json_output | |
| ] | |
| ) | |
| print("β Analyze handler registered") | |
| except Exception as e: | |
| print(f"β Failed to register event handlers: {e}") | |
| traceback.print_exc() | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Tips | |
| - For best results, upload high-quality media files | |
| - Videos should be under 60 seconds for faster processing | |
| - The analysis considers multiple factors including visual elements, emotions, and platform-specific trends | |
| - RAD Score represents the overall viral potential (0-100) | |
| """) | |
| print("=" * 60) | |
| print("β Gradio interface built successfully!") | |
| print(f" Demo mode: {DEMO_MODE}") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| import sys | |
| # Check if running on Hugging Face Spaces or Modal | |
| if os.getenv("SPACE_ID") or os.getenv("SYSTEM") == "spaces": | |
| # Running on Hugging Face Spaces | |
| print("π Running on Hugging Face Spaces") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) | |
| elif os.getenv("MODAL_IS_REMOTE"): | |
| # Running on Modal (actual Modal environment, not just MODAL_ENVIRONMENT='dev') | |
| print("βοΈ Running on Modal") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=8080, | |
| share=False | |
| ) | |
| else: | |
| # Running locally - try multiple ports and create share link | |
| print("π» Running locally with share link") | |
| for port in range(7860, 7870): | |
| try: | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=True # Create temporary 72-hour share link | |
| ) | |
| break # If launch successful, exit the loop | |
| except OSError as e: | |
| if "Cannot find empty port" in str(e) or "Address already in use" in str(e): | |
| print(f"Port {port} is busy, trying next port...") | |
| continue | |
| else: | |
| raise | |