Spaces:
Sleeping
Sleeping
| """ | |
| Multimodal Processing Framework for GAIA Agent | |
| Handles Audio, Video, and Image processing for GAIA benchmark questions. | |
| """ | |
| import os | |
| import re | |
| import tempfile | |
| import requests | |
| from typing import Optional, Dict, Any | |
| from dataclasses import dataclass | |
| class MultimodalResult: | |
| """Result from multimodal processing""" | |
| success: bool | |
| content: Optional[str] | |
| modality: str | |
| metadata: Dict[str, Any] | |
| error: Optional[str] = None | |
| class AudioProcessor: | |
| """ | |
| Process audio files using OpenAI Whisper API via OpenRouter or local. | |
| """ | |
| def __init__(self, openai_client=None): | |
| self.client = openai_client | |
| def transcribe(self, audio_path: str = None, audio_url: str = None) -> MultimodalResult: | |
| """ | |
| Transcribe audio file to text. | |
| Args: | |
| audio_path: Local path to audio file | |
| audio_url: URL to audio file | |
| Returns: | |
| MultimodalResult with transcription | |
| """ | |
| try: | |
| # If URL provided, download first | |
| if audio_url and not audio_path: | |
| audio_path = self._download_audio(audio_url) | |
| if not audio_path or not os.path.exists(audio_path): | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="audio", | |
| metadata={}, | |
| error=f"Audio file not found: {audio_path}" | |
| ) | |
| # Try using OpenAI Whisper API | |
| if self.client: | |
| return self._transcribe_with_api(audio_path) | |
| # Fallback: Try local whisper | |
| return self._transcribe_local(audio_path) | |
| except Exception as e: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="audio", | |
| metadata={}, | |
| error=str(e) | |
| ) | |
| def _download_audio(self, url: str) -> Optional[str]: | |
| """Download audio from URL to temp file""" | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| # Determine extension | |
| ext = ".mp3" | |
| if ".wav" in url.lower(): | |
| ext = ".wav" | |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f: | |
| f.write(response.content) | |
| return f.name | |
| except Exception as e: | |
| print(f"❌ Failed to download audio: {e}") | |
| return None | |
| def _transcribe_with_api(self, audio_path: str) -> MultimodalResult: | |
| """Transcribe using OpenAI Whisper API (DISABLED - not free)""" | |
| # OpenAI Whisper API is NOT free, so we skip this | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="audio", | |
| metadata={}, | |
| error="OpenAI Whisper API disabled (not free). Use local whisper instead." | |
| ) | |
| def _transcribe_local(self, audio_path: str) -> MultimodalResult: | |
| """Transcribe using local faster-whisper (100% free)""" | |
| try: | |
| from faster_whisper import WhisperModel | |
| # Use base model for better accuracy (74MB, still fast) | |
| model = WhisperModel("base", device="cpu", compute_type="int8") | |
| segments, info = model.transcribe(audio_path, beam_size=5) | |
| # Combine all segments | |
| full_text = " ".join([segment.text for segment in segments]) | |
| return MultimodalResult( | |
| success=True, | |
| content=full_text, | |
| modality="audio", | |
| metadata={ | |
| "method": "faster-whisper", | |
| "model": "base", | |
| "file": audio_path, | |
| "language": info.language | |
| } | |
| ) | |
| except ImportError: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="audio", | |
| metadata={}, | |
| error="faster-whisper not installed. Run: pip install faster-whisper" | |
| ) | |
| except Exception as e: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="audio", | |
| metadata={}, | |
| error=f"Local whisper error: {e}" | |
| ) | |
| class VideoProcessor: | |
| """ | |
| Process video files and YouTube links. | |
| Extracts transcripts/subtitles for analysis. | |
| """ | |
| def __init__(self): | |
| pass | |
| def process(self, video_url: str = None, video_path: str = None) -> MultimodalResult: | |
| """ | |
| Process video and extract transcript. | |
| Args: | |
| video_url: YouTube URL or video URL | |
| video_path: Local path to video file | |
| Returns: | |
| MultimodalResult with video transcript/content | |
| """ | |
| try: | |
| # Check for YouTube URL | |
| if video_url and ("youtube.com" in video_url or "youtu.be" in video_url): | |
| return self._process_youtube(video_url) | |
| # Local video file | |
| if video_path: | |
| return self._process_local_video(video_path) | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error="No video URL or path provided" | |
| ) | |
| except Exception as e: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error=str(e) | |
| ) | |
| def _process_youtube(self, url: str) -> MultimodalResult: | |
| """Extract transcript from YouTube video""" | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| # Extract video ID | |
| video_id = self._extract_video_id(url) | |
| if not video_id: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error=f"Could not extract video ID from: {url}" | |
| ) | |
| # Get transcript | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| # Combine transcript segments | |
| full_transcript = " ".join([entry["text"] for entry in transcript_list]) | |
| return MultimodalResult( | |
| success=True, | |
| content=full_transcript, | |
| modality="video", | |
| metadata={ | |
| "method": "youtube-transcript", | |
| "video_id": video_id, | |
| "url": url, | |
| "segments": len(transcript_list) | |
| } | |
| ) | |
| except ImportError: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error="youtube-transcript-api not installed. Run: pip install youtube-transcript-api" | |
| ) | |
| except Exception as e: | |
| # Try fallback method | |
| return self._youtube_fallback(url, str(e)) | |
| def _extract_video_id(self, url: str) -> Optional[str]: | |
| """Extract YouTube video ID from URL""" | |
| patterns = [ | |
| r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/)([a-zA-Z0-9_-]{11})', | |
| r'youtube\.com\/watch\?.*v=([a-zA-Z0-9_-]{11})' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def _youtube_fallback(self, url: str, original_error: str) -> MultimodalResult: | |
| """Fallback method for YouTube when transcript API fails""" | |
| # Try using yt-dlp to get info | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ["yt-dlp", "--get-title", "--get-description", url], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| content = f"Video Title and Description:\n{result.stdout}" | |
| return MultimodalResult( | |
| success=True, | |
| content=content, | |
| modality="video", | |
| metadata={"method": "yt-dlp-metadata", "url": url} | |
| ) | |
| except: | |
| pass | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error=f"YouTube transcript failed: {original_error}. Install: pip install youtube-transcript-api" | |
| ) | |
| def _process_local_video(self, video_path: str) -> MultimodalResult: | |
| """Process local video file (extract audio and transcribe)""" | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="video", | |
| metadata={}, | |
| error="Local video processing requires ffmpeg + whisper. Not yet implemented." | |
| ) | |
| class ImageProcessor: | |
| """ | |
| Process images using vision-capable LLM. | |
| """ | |
| def __init__(self, openrouter_client=None, model: str = "google/gemma-3-27b:free"): | |
| """ | |
| Initialize image processor. | |
| Args: | |
| openrouter_client: OpenAI client configured for OpenRouter | |
| model: Vision-capable model to use | |
| """ | |
| self.client = openrouter_client | |
| self.model = model | |
| def analyze(self, image_path: str = None, image_url: str = None, | |
| question: str = "Describe this image in detail.") -> MultimodalResult: | |
| """ | |
| Analyze image and answer question about it. | |
| Args: | |
| image_path: Local path to image | |
| image_url: URL to image | |
| question: Question to answer about the image | |
| Returns: | |
| MultimodalResult with analysis | |
| """ | |
| try: | |
| if not self.client: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="image", | |
| metadata={}, | |
| error="No OpenRouter client configured for vision" | |
| ) | |
| # Prepare image for API | |
| if image_path: | |
| image_data = self._encode_image(image_path) | |
| if not image_data: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="image", | |
| metadata={}, | |
| error=f"Failed to encode image: {image_path}" | |
| ) | |
| image_content = { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{image_data}"} | |
| } | |
| elif image_url: | |
| image_content = { | |
| "type": "image_url", | |
| "image_url": {"url": image_url} | |
| } | |
| else: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="image", | |
| metadata={}, | |
| error="No image path or URL provided" | |
| ) | |
| # Call vision model | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": question}, | |
| image_content | |
| ] | |
| } | |
| ], | |
| max_tokens=500 | |
| ) | |
| content = response.choices[0].message.content | |
| return MultimodalResult( | |
| success=True, | |
| content=content, | |
| modality="image", | |
| metadata={ | |
| "method": "vision-llm", | |
| "model": self.model, | |
| "image_source": image_path or image_url | |
| } | |
| ) | |
| except Exception as e: | |
| return MultimodalResult( | |
| success=False, | |
| content=None, | |
| modality="image", | |
| metadata={}, | |
| error=f"Vision analysis error: {e}" | |
| ) | |
| def _encode_image(self, image_path: str) -> Optional[str]: | |
| """Encode image to base64""" | |
| try: | |
| import base64 | |
| with open(image_path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| except Exception as e: | |
| print(f"❌ Failed to encode image: {e}") | |
| return None | |
| class MultimodalProcessor: | |
| """ | |
| Unified multimodal processor for GAIA agent. | |
| Routes to appropriate handler based on modality. | |
| """ | |
| def __init__(self, openrouter_client=None, openai_client=None): | |
| """ | |
| Initialize multimodal processor. | |
| Args: | |
| openrouter_client: Client for vision models | |
| openai_client: Client for Whisper API (optional) | |
| """ | |
| self.audio = AudioProcessor(openai_client) | |
| self.video = VideoProcessor() | |
| self.image = ImageProcessor(openrouter_client) | |
| def process_audio(self, audio_path: str = None, audio_url: str = None) -> MultimodalResult: | |
| """Process audio file""" | |
| print("🎵 Processing audio...") | |
| return self.audio.transcribe(audio_path, audio_url) | |
| def process_video(self, video_url: str = None, video_path: str = None) -> MultimodalResult: | |
| """Process video file or YouTube URL""" | |
| print("🎬 Processing video...") | |
| return self.video.process(video_url, video_path) | |
| def process_image(self, image_path: str = None, image_url: str = None, | |
| question: str = "Describe this image.") -> MultimodalResult: | |
| """Process image file""" | |
| print("🖼️ Processing image...") | |
| return self.image.analyze(image_path, image_url, question) | |
| if __name__ == "__main__": | |
| # Test multimodal processors | |
| print("=" * 60) | |
| print("Multimodal Processor Test") | |
| print("=" * 60) | |
| processor = MultimodalProcessor() | |
| # Test YouTube processing | |
| print("\n📺 Testing YouTube transcript extraction...") | |
| result = processor.process_video(video_url="https://www.youtube.com/watch?v=dQw4w9WgXcQ") | |
| print(f"Success: {result.success}") | |
| if result.success: | |
| print(f"Content preview: {result.content[:200]}...") | |
| else: | |
| print(f"Error: {result.error}") | |