Spaces:
Paused
Paused
| import gradio as gr | |
| import torch | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import spaces | |
| import base64 | |
| import io | |
| from transformers import BlipProcessor, BlipForConditionalGeneration, AutoTokenizer, AutoModelForCausalLM | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Global variables | |
| vision_model = None | |
| vision_processor = None | |
| text_model = None | |
| text_tokenizer = None | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model_loaded = False | |
| def load_models(): | |
| """Load BLIP for vision and a language model for analysis""" | |
| global vision_model, vision_processor, text_model, text_tokenizer, model_loaded | |
| try: | |
| print("π Loading AI models for video analysis...") | |
| # Load BLIP for image understanding | |
| print("Loading BLIP vision model...") | |
| vision_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") | |
| vision_model = BlipForConditionalGeneration.from_pretrained( | |
| "Salesforce/blip-image-captioning-large", | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| # Load a conversational model for analysis | |
| print("Loading language model...") | |
| text_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium") | |
| text_model = AutoModelForCausalLM.from_pretrained( | |
| "microsoft/DialoGPT-medium", | |
| torch_dtype=torch.float16, | |
| device_map="auto" | |
| ) | |
| # Add padding token if needed | |
| if text_tokenizer.pad_token is None: | |
| text_tokenizer.pad_token = text_tokenizer.eos_token | |
| model_loaded = True | |
| success_msg = "β AI models loaded successfully! You can now analyze videos." | |
| print(success_msg) | |
| return success_msg | |
| except Exception as e: | |
| model_loaded = False | |
| error_msg = f"β Failed to load models: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def extract_key_frames(video_path, max_frames=8): | |
| """Extract key frames from video""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| if total_frames == 0: | |
| return [], None | |
| # Get evenly spaced frames | |
| frame_indices = np.linspace(0, total_frames-1, min(max_frames, total_frames), dtype=int) | |
| frames = [] | |
| timestamps = [] | |
| for frame_idx in frame_indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| # Convert BGR to RGB | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Resize if too large | |
| if max(width, height) > 512: | |
| scale = 512 / max(width, height) | |
| new_width = int(width * scale) | |
| new_height = int(height * scale) | |
| frame_rgb = cv2.resize(frame_rgb, (new_width, new_height)) | |
| frames.append(Image.fromarray(frame_rgb)) | |
| timestamp = frame_idx / fps if fps > 0 else frame_idx | |
| timestamps.append(timestamp) | |
| cap.release() | |
| video_info = { | |
| "duration": duration, | |
| "fps": fps, | |
| "total_frames": total_frames, | |
| "resolution": f"{width}x{height}", | |
| "extracted_frames": len(frames) | |
| } | |
| return frames, video_info, timestamps | |
| except Exception as e: | |
| print(f"Error extracting frames: {e}") | |
| return [], None, [] | |
| def analyze_frame_with_blip(frame, custom_question=None): | |
| """Analyze a single frame with BLIP""" | |
| try: | |
| if custom_question: | |
| # Use BLIP for visual question answering | |
| inputs = vision_processor(frame, custom_question, return_tensors="pt").to(device) | |
| else: | |
| # Use BLIP for image captioning | |
| inputs = vision_processor(frame, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| if custom_question: | |
| output_ids = vision_model.generate(**inputs, max_new_tokens=100) | |
| else: | |
| output_ids = vision_model.generate(**inputs, max_new_tokens=50) | |
| caption = vision_processor.decode(output_ids[0], skip_special_tokens=True) | |
| return caption | |
| except Exception as e: | |
| return f"Error analyzing frame: {str(e)}" | |
| def synthesize_video_analysis(frame_descriptions, question, video_info): | |
| """Create comprehensive video analysis from frame descriptions""" | |
| # Combine all frame descriptions | |
| all_descriptions = " ".join(frame_descriptions) | |
| # Create analysis based on question type | |
| question_lower = question.lower() | |
| analysis = f"""π₯ **AI Video Analysis** | |
| β **Your Question:** {question} | |
| π€ **Detailed Analysis:** | |
| """ | |
| if any(word in question_lower for word in ['what', 'happening', 'describe', 'see']): | |
| analysis += f"Based on my analysis of {len(frame_descriptions)} key frames from the video:\n\n" | |
| for i, desc in enumerate(frame_descriptions): | |
| timestamp = i * (video_info['duration'] / len(frame_descriptions)) | |
| analysis += f"β’ **At {timestamp:.1f}s:** {desc}\n" | |
| analysis += f"\n**Overall Summary:** This {video_info['duration']:.1f}-second video shows {all_descriptions.lower()}. " | |
| # Add contextual insights | |
| if len(set(frame_descriptions)) < len(frame_descriptions) * 0.3: | |
| analysis += "The scene appears relatively static with consistent elements throughout." | |
| else: | |
| analysis += "The video shows dynamic content with changing scenes and activities." | |
| elif any(word in question_lower for word in ['people', 'person', 'human', 'who']): | |
| people_mentions = [desc for desc in frame_descriptions if any(word in desc.lower() for word in ['person', 'people', 'man', 'woman', 'child', 'human'])] | |
| if people_mentions: | |
| analysis += f"**People in the video:** {' '.join(people_mentions)}\n\n" | |
| else: | |
| analysis += "**People analysis:** No clear human figures were detected in the analyzed frames.\n\n" | |
| elif any(word in question_lower for word in ['object', 'item', 'thing']): | |
| analysis += "**Objects and items visible:**\n" | |
| for desc in frame_descriptions: | |
| analysis += f"β’ {desc}\n" | |
| elif any(word in question_lower for word in ['setting', 'location', 'place', 'where']): | |
| analysis += "**Setting and location analysis:**\n" | |
| analysis += f"Based on the visual elements: {all_descriptions}\n\n" | |
| elif any(word in question_lower for word in ['mood', 'emotion', 'feeling', 'atmosphere']): | |
| analysis += "**Mood and atmosphere:**\n" | |
| analysis += f"The visual elements suggest: {all_descriptions}\n\n" | |
| else: | |
| # General analysis | |
| analysis += f"**Frame-by-frame analysis:**\n" | |
| for i, desc in enumerate(frame_descriptions): | |
| analysis += f"{i+1}. {desc}\n" | |
| return analysis | |
| def analyze_video_with_ai(video_file, question, progress=gr.Progress()): | |
| """Main video analysis function""" | |
| if video_file is None: | |
| return "β Please upload a video file first." | |
| if not question.strip(): | |
| return "β Please enter a question about the video." | |
| if not model_loaded: | |
| return "β AI models are not loaded. Please click 'Load AI Models' first and wait for completion." | |
| try: | |
| progress(0.1, desc="Extracting video frames...") | |
| # Extract frames | |
| frames, video_info, timestamps = extract_key_frames(video_file, max_frames=8) | |
| if not frames or video_info is None: | |
| return "β Could not process video. Please check the video format." | |
| progress(0.3, desc="Analyzing frames with AI...") | |
| # Analyze each frame | |
| frame_descriptions = [] | |
| for i, frame in enumerate(frames): | |
| progress(0.3 + (i / len(frames)) * 0.5, desc=f"Analyzing frame {i+1}/{len(frames)}...") | |
| # Create frame-specific question if relevant | |
| if any(word in question.lower() for word in ['what', 'describe', 'see', 'happening']): | |
| frame_question = f"What do you see in this image? {question}" | |
| description = analyze_frame_with_blip(frame, frame_question) | |
| else: | |
| description = analyze_frame_with_blip(frame) | |
| frame_descriptions.append(description) | |
| progress(0.8, desc="Synthesizing analysis...") | |
| # Create comprehensive analysis | |
| analysis = synthesize_video_analysis(frame_descriptions, question, video_info) | |
| # Add technical information | |
| analysis += f""" | |
| π **Technical Information:** | |
| β’ Duration: {video_info['duration']:.1f} seconds | |
| β’ Frame Rate: {video_info['fps']:.1f} FPS | |
| β’ Total Frames: {video_info['total_frames']:,} | |
| β’ Analyzed Frames: {video_info['extracted_frames']} | |
| β’ Resolution: {video_info['resolution']} | |
| β‘ **Powered by:** BLIP Vision AI + Advanced Analysis | |
| """ | |
| progress(1.0, desc="Analysis complete!") | |
| return analysis | |
| except Exception as e: | |
| error_msg = f"β Error during analysis: {str(e)}" | |
| print(error_msg) | |
| return error_msg | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks(title="AI Video Analyzer", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π₯ AI Video Analysis Tool") | |
| gr.Markdown("Upload videos and get detailed AI-powered analysis using advanced computer vision!") | |
| # Model loading section | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| model_status = gr.Textbox( | |
| label="π€ Model Status", | |
| value="Models not loaded - Click the button to load AI models β", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| with gr.Column(scale=1): | |
| load_btn = gr.Button("π Load AI Models", variant="primary", size="lg") | |
| load_btn.click(load_models, outputs=model_status) | |
| gr.Markdown("---") | |
| # Main interface | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video( | |
| label="πΉ Upload Video (MP4, AVI, MOV, WebM)", | |
| height=350 | |
| ) | |
| question_input = gr.Textbox( | |
| label="β Ask about the video", | |
| placeholder="What is happening in this video? Describe it in detail.", | |
| lines=3, | |
| max_lines=5 | |
| ) | |
| analyze_btn = gr.Button("π Analyze Video with AI", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| output = gr.Textbox( | |
| label="π― AI Analysis Results", | |
| lines=25, | |
| max_lines=30, | |
| show_copy_button=True | |
| ) | |
| # Example questions | |
| gr.Markdown("### π‘ Example Questions (click to use):") | |
| example_questions = [ | |
| "What is happening in this video? Describe the scene in detail.", | |
| "Who are the people in this video and what are they doing?", | |
| "Describe the setting, location, and environment shown.", | |
| "What objects, animals, or items can you see in the video?", | |
| "What is the mood, atmosphere, or emotion conveyed?", | |
| "Summarize the key events that occur chronologically." | |
| ] | |
| with gr.Row(): | |
| for i in range(0, len(example_questions), 2): | |
| with gr.Column(): | |
| if i < len(example_questions): | |
| btn1 = gr.Button(example_questions[i], size="sm") | |
| btn1.click(lambda x=example_questions[i]: x, outputs=question_input) | |
| if i+1 < len(example_questions): | |
| btn2 = gr.Button(example_questions[i+1], size="sm") | |
| btn2.click(lambda x=example_questions[i+1]: x, outputs=question_input) | |
| # Connect the analyze button | |
| analyze_btn.click( | |
| analyze_video_with_ai, | |
| inputs=[video_input, question_input], | |
| outputs=output, | |
| show_progress=True | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown(""" | |
| ### π Instructions: | |
| 1. **First:** Click "Load AI Models" and wait for it to complete (~3-5 minutes) | |
| 2. **Then:** Upload your video file (works with most formats) | |
| 3. **Ask:** Type your question about the video content | |
| 4. **Analyze:** Click "Analyze Video with AI" to get detailed insights | |
| π‘ **How it works:** | |
| - Extracts key frames from your video | |
| - Analyzes each frame with BLIP vision AI | |
| - Synthesizes comprehensive analysis based on your question | |
| - Works reliably with standard video formats | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch() |