Spaces:
Running
Running
| """ | |
| MEXAR Core Engine - Multimodal Input Processing Module | |
| Handles audio, image, and video input conversion to text. | |
| """ | |
| import os | |
| import base64 | |
| import logging | |
| import tempfile | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| from utils.groq_client import get_groq_client, GroqClient | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MultimodalProcessor: | |
| """ | |
| Processes multimodal inputs (audio, image, video) and converts them to text. | |
| Uses Groq Whisper for audio and Groq Vision for images. | |
| """ | |
| # Supported file types | |
| AUDIO_EXTENSIONS = {'.mp3', '.wav', '.m4a', '.ogg', '.flac', '.webm'} | |
| IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'} | |
| VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.webm'} | |
| def __init__(self, groq_client: Optional[GroqClient] = None): | |
| """ | |
| Initialize the multimodal processor. | |
| Args: | |
| groq_client: Optional pre-configured Groq client | |
| """ | |
| self.client = groq_client or get_groq_client() | |
| def process_audio(self, audio_path: str, language: str = "en") -> Dict[str, Any]: | |
| """ | |
| Transcribe audio file using Groq Whisper. | |
| Args: | |
| audio_path: Path to audio file | |
| language: Language code for transcription | |
| Returns: | |
| Dict with transcription results | |
| """ | |
| path = Path(audio_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Audio file not found: {audio_path}") | |
| if path.suffix.lower() not in self.AUDIO_EXTENSIONS: | |
| raise ValueError(f"Unsupported audio format: {path.suffix}") | |
| try: | |
| logger.info(f"Transcribing audio: {path.name}") | |
| transcript = self.client.transcribe_audio(audio_path, language) | |
| return { | |
| "success": True, | |
| "type": "audio", | |
| "file_name": path.name, | |
| "transcript": transcript, | |
| "language": language, | |
| "word_count": len(transcript.split()) | |
| } | |
| except Exception as e: | |
| logger.error(f"Audio transcription failed: {e}") | |
| return { | |
| "success": False, | |
| "type": "audio", | |
| "file_name": path.name, | |
| "error": str(e) | |
| } | |
| def process_image( | |
| self, | |
| image_path: str, | |
| prompt: str = "Describe this image in detail, including all visible text, objects, and relevant information." | |
| ) -> Dict[str, Any]: | |
| """ | |
| Describe image using Groq Vision. | |
| Args: | |
| image_path: Path to image file | |
| prompt: Question or instruction for the vision model | |
| Returns: | |
| Dict with image description | |
| """ | |
| path = Path(image_path) | |
| if not path.exists(): | |
| logger.error(f"Image file not found: {image_path}") | |
| raise FileNotFoundError(f"Image file not found: {image_path}") | |
| if path.suffix.lower() not in self.IMAGE_EXTENSIONS: | |
| logger.error(f"Unsupported image format: {path.suffix}") | |
| raise ValueError(f"Unsupported image format: {path.suffix}") | |
| try: | |
| logger.info(f"Analyzing image: {path.name} (size: {path.stat().st_size} bytes)") | |
| # Call Groq Vision API | |
| description = self.client.describe_image(image_path, prompt) | |
| logger.info(f"Image analysis successful: {len(description)} chars returned") | |
| return { | |
| "success": True, | |
| "type": "image", | |
| "file_name": path.name, | |
| "description": description, | |
| "prompt_used": prompt | |
| } | |
| except Exception as e: | |
| logger.error(f"Image analysis failed for {path.name}: {type(e).__name__}: {e}") | |
| return { | |
| "success": False, | |
| "type": "image", | |
| "file_name": path.name, | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| } | |
| def process_video( | |
| self, | |
| video_path: str, | |
| max_frames: int = 5, | |
| extract_audio: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process video by extracting keyframes and audio. | |
| Args: | |
| video_path: Path to video file | |
| max_frames: Maximum number of keyframes to extract | |
| extract_audio: Whether to extract and transcribe audio | |
| Returns: | |
| Dict with video analysis results | |
| """ | |
| path = Path(video_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| if path.suffix.lower() not in self.VIDEO_EXTENSIONS: | |
| raise ValueError(f"Unsupported video format: {path.suffix}") | |
| result = { | |
| "success": True, | |
| "type": "video", | |
| "file_name": path.name, | |
| "frames": [], | |
| "audio_transcript": None | |
| } | |
| try: | |
| # Try to import OpenCV | |
| try: | |
| import cv2 | |
| has_opencv = True | |
| except ImportError: | |
| logger.warning("OpenCV not available, skipping video frame extraction") | |
| has_opencv = False | |
| if has_opencv: | |
| # Extract keyframes | |
| frames = self._extract_keyframes(video_path, max_frames) | |
| # Analyze each frame | |
| for i, frame_path in enumerate(frames): | |
| frame_result = self.process_image( | |
| frame_path, | |
| f"This is frame {i+1} from a video. Describe what you see, focusing on actions, objects, and any text visible." | |
| ) | |
| result["frames"].append(frame_result) | |
| # Clean up temp frame | |
| try: | |
| os.remove(frame_path) | |
| except: | |
| pass | |
| # Extract and transcribe audio | |
| if extract_audio: | |
| audio_path = self._extract_audio(video_path) | |
| if audio_path: | |
| audio_result = self.process_audio(audio_path) | |
| result["audio_transcript"] = audio_result.get("transcript", "") | |
| # Clean up temp audio | |
| try: | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| logger.info(f"Video processed: {len(result['frames'])} frames, audio: {result['audio_transcript'] is not None}") | |
| except Exception as e: | |
| logger.error(f"Video processing failed: {e}") | |
| result["success"] = False | |
| result["error"] = str(e) | |
| return result | |
| def _extract_keyframes(self, video_path: str, max_frames: int = 5) -> List[str]: | |
| """Extract keyframes from video using OpenCV.""" | |
| import cv2 | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames == 0: | |
| cap.release() | |
| return [] | |
| # Calculate frame intervals | |
| interval = max(1, total_frames // max_frames) | |
| frame_paths = [] | |
| frame_count = 0 | |
| while cap.isOpened() and len(frame_paths) < max_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % interval == 0: | |
| # Save frame to temp file | |
| temp_path = tempfile.mktemp(suffix=".jpg") | |
| cv2.imwrite(temp_path, frame) | |
| frame_paths.append(temp_path) | |
| frame_count += 1 | |
| cap.release() | |
| return frame_paths | |
| def _extract_audio(self, video_path: str) -> Optional[str]: | |
| """Extract audio track from video.""" | |
| try: | |
| # Try using ffmpeg via subprocess | |
| import subprocess | |
| temp_audio = tempfile.mktemp(suffix=".mp3") | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", video_path, | |
| "-vn", # No video | |
| "-acodec", "libmp3lame", | |
| "-q:a", "2", | |
| "-y", # Overwrite | |
| temp_audio | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=120 | |
| ) | |
| if os.path.exists(temp_audio) and os.path.getsize(temp_audio) > 0: | |
| return temp_audio | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Audio extraction failed: {e}") | |
| return None | |
| def fuse_inputs( | |
| self, | |
| text: str = "", | |
| audio_result: Optional[Dict] = None, | |
| image_result: Optional[Dict] = None, | |
| video_result: Optional[Dict] = None | |
| ) -> str: | |
| """ | |
| Fuse all multimodal inputs into a unified text context. | |
| Args: | |
| text: Direct text input | |
| audio_result: Result from process_audio | |
| image_result: Result from process_image | |
| video_result: Result from process_video | |
| Returns: | |
| Unified text context | |
| """ | |
| context_parts = [] | |
| # Add text input | |
| if text and text.strip(): | |
| context_parts.append(f"[USER TEXT]\n{text.strip()}") | |
| # Add audio transcript | |
| if audio_result and audio_result.get("success"): | |
| transcript = audio_result.get("transcript", "") | |
| if transcript: | |
| context_parts.append(f"[AUDIO TRANSCRIPT]\n{transcript}") | |
| # Add image description | |
| if image_result and image_result.get("success"): | |
| description = image_result.get("description", "") | |
| if description: | |
| context_parts.append(f"[IMAGE DESCRIPTION]\n{description}") | |
| # Add video content | |
| if video_result and video_result.get("success"): | |
| video_context = [] | |
| # Add frame descriptions | |
| for i, frame in enumerate(video_result.get("frames", [])): | |
| if frame.get("success"): | |
| video_context.append(f"Frame {i+1}: {frame.get('description', '')}") | |
| # Add audio transcript | |
| if video_result.get("audio_transcript"): | |
| video_context.append(f"Audio: {video_result['audio_transcript']}") | |
| if video_context: | |
| context_parts.append(f"[VIDEO ANALYSIS]\n" + "\n".join(video_context)) | |
| # Combine all parts | |
| fused_context = "\n\n".join(context_parts) | |
| logger.info(f"Fused context: {len(fused_context)} characters from {len(context_parts)} sources") | |
| return fused_context | |
| def process_upload( | |
| self, | |
| file_path: str, | |
| additional_text: str = "" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Automatically detect file type and process accordingly. | |
| Args: | |
| file_path: Path to uploaded file | |
| additional_text: Additional text context | |
| Returns: | |
| Processing result with fused context | |
| """ | |
| path = Path(file_path) | |
| ext = path.suffix.lower() | |
| result = { | |
| "success": True, | |
| "file_type": "unknown", | |
| "processing_result": None, | |
| "fused_context": "" | |
| } | |
| try: | |
| if ext in self.AUDIO_EXTENSIONS: | |
| result["file_type"] = "audio" | |
| audio_result = self.process_audio(file_path) | |
| result["processing_result"] = audio_result | |
| result["fused_context"] = self.fuse_inputs( | |
| text=additional_text, | |
| audio_result=audio_result | |
| ) | |
| elif ext in self.IMAGE_EXTENSIONS: | |
| result["file_type"] = "image" | |
| image_result = self.process_image(file_path) | |
| result["processing_result"] = image_result | |
| result["fused_context"] = self.fuse_inputs( | |
| text=additional_text, | |
| image_result=image_result | |
| ) | |
| elif ext in self.VIDEO_EXTENSIONS: | |
| result["file_type"] = "video" | |
| video_result = self.process_video(file_path) | |
| result["processing_result"] = video_result | |
| result["fused_context"] = self.fuse_inputs( | |
| text=additional_text, | |
| video_result=video_result | |
| ) | |
| else: | |
| # Treat as text file | |
| result["file_type"] = "text" | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| file_text = f.read() | |
| result["fused_context"] = self.fuse_inputs( | |
| text=f"{additional_text}\n\n[FILE CONTENT]\n{file_text}" | |
| ) | |
| except Exception as e: | |
| result["success"] = False | |
| result["error"] = str(e) | |
| logger.error(f"Upload processing failed: {e}") | |
| return result | |
| # Factory function | |
| def create_multimodal_processor() -> MultimodalProcessor: | |
| """Create a new MultimodalProcessor instance.""" | |
| return MultimodalProcessor() | |