File size: 10,953 Bytes
a4777f8 8d6b09c a4777f8 479ddc9 a4777f8 479ddc9 a4777f8 8d6b09c a4777f8 8d6b09c a4777f8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
"""
Media processing module for audio, video, and image quizzes.
Handles speech-to-text, video frame extraction, OCR, and more.
"""
import os
import logging
import base64
import io
import re
from typing import Optional, Dict, Any, List
import requests
import httpx
from app.llm import ask_gpt, ocr_image_with_llm
logger = logging.getLogger(__name__)
class MediaProcessor:
"""Process audio, video, and image content for quizzes."""
def __init__(self):
self.supported_audio_formats = ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.webm', '.opus']
self.supported_video_formats = ['.mp4', '.webm', '.ogg', '.mov', '.avi', '.mkv']
self.supported_image_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
async def process_audio_from_url(self, audio_url: str) -> Optional[str]:
"""
Download and transcribe audio from URL.
Args:
audio_url: URL to audio file
Returns:
Transcribed text or None
"""
try:
logger.info(f"Processing audio from URL: {audio_url}")
# Download audio file
response = requests.get(audio_url, timeout=30)
response.raise_for_status()
audio_data = response.content
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
# Use LLM with vision/audio capabilities to transcribe
# OpenRouter supports some models with audio capabilities
prompt = """Transcribe the audio content. Extract all spoken words, numbers, and any important information.
Return only the transcribed text, nothing else."""
# Try using a model that supports audio (if available via OpenRouter)
# For now, we'll use a workaround with Whisper API or similar
transcription = await self._transcribe_audio_with_llm(audio_base64, audio_url)
if transcription:
logger.info(f"Audio transcribed successfully: {transcription[:100]}...")
return transcription
# Fallback: try to extract text from page if audio URL is embedded
return None
except Exception as e:
logger.error(f"Error processing audio: {e}")
return None
async def _transcribe_audio_with_llm(self, audio_base64: str, audio_url: str) -> Optional[str]:
"""
Transcribe audio using LLM or external service.
Args:
audio_base64: Base64 encoded audio
audio_url: Original audio URL
Returns:
Transcription or None
"""
# Try using OpenAI Whisper API if available
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
try:
# Use OpenAI Whisper API
async with httpx.AsyncClient(timeout=60) as client:
# Note: OpenAI Whisper API requires file upload, not base64
# We'll need to use a different approach
# For now, return None and use fallback
pass
except Exception as e:
logger.debug(f"OpenAI Whisper not available: {e}")
# For now, we can't directly transcribe audio via OpenRouter
# But we can try to download and analyze the audio file
# For passphrase quizzes, we need the actual transcription
# Try to use a vision-capable model that might support audio
# Or return a placeholder that indicates we need transcription
# Since we can't actually transcribe, return None and let the system
# use LLM to solve based on the question context
logger.warning(f"Cannot transcribe audio directly - audio transcription requires specialized API")
# Return None - the system will fall back to LLM solving
return None
async def process_video_from_url(self, video_url: str) -> Optional[Dict[str, Any]]:
"""
Process video from URL - extract frames, transcribe audio, OCR text.
Args:
video_url: URL to video file
Returns:
Dictionary with extracted information
"""
try:
logger.info(f"Processing video from URL: {video_url}")
# Download video (sample - first few MB for processing)
response = requests.get(video_url, timeout=30, stream=True)
response.raise_for_status()
# For now, we'll extract information about the video
# Full video processing would require ffmpeg or similar
video_info = {
'url': video_url,
'content_type': response.headers.get('content-type', ''),
'size': response.headers.get('content-length', 'unknown')
}
# Try to extract frames using LLM vision if video is short
# For longer videos, we'd need proper video processing libraries
prompt = f"""I have a video file from this URL: {video_url}
Please analyze what might be in this video:
1. Any text visible in frames
2. Any spoken audio content
3. Visual elements
4. Any quiz-related information
Provide a comprehensive description."""
analysis = await ask_gpt(prompt, max_tokens=2000)
if analysis:
video_info['analysis'] = analysis
logger.info(f"Video analyzed: {analysis[:100]}...")
return video_info
except Exception as e:
logger.error(f"Error processing video: {e}")
return None
async def process_image_from_url(self, image_url: str) -> Optional[str]:
"""
Process image from URL - extract text using OCR.
Args:
image_url: URL to image file
Returns:
Extracted text or None
"""
try:
logger.info(f"Processing image from URL: {image_url}")
# Download image
response = requests.get(image_url, timeout=30)
response.raise_for_status()
image_data = response.content
image_base64 = base64.b64encode(image_data).decode('utf-8')
# Use LLM OCR
text = await ocr_image_with_llm(image_base64)
if text:
logger.info(f"Image OCR successful: {text[:100]}...")
return text
return None
except Exception as e:
logger.error(f"Error processing image: {e}")
return None
def find_media_in_page(self, page_content: Dict[str, Any]) -> Dict[str, List[str]]:
"""
Find all media files (audio, video, images) in page content.
Args:
page_content: Page content dictionary
Returns:
Dictionary with lists of media URLs by type
"""
media = {
'audio': [],
'video': [],
'images': []
}
base_url = page_content.get('url', '')
text = page_content.get('text', '') + ' ' + page_content.get('html', '')
# Find audio files (including .opus)
audio_patterns = [
r'<audio[^>]+src=["\']([^"\']+)["\']',
r'<source[^>]+src=["\']([^"\']+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))["\']',
r'(https?://[^\s<>"\'\)]+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))',
r'(/[^\s<>"\'\)]+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))', # Relative paths
]
for pattern in audio_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
url = match if isinstance(match, str) else match[0] if match else ''
if url:
if url.startswith('/') and base_url:
from urllib.parse import urljoin
url = urljoin(base_url, url)
if url not in media['audio']:
media['audio'].append(url)
# Find video files
video_patterns = [
r'<video[^>]+src=["\']([^"\']+)["\']',
r'<source[^>]+src=["\']([^"\']+\.(?:mp4|webm|ogg|mov|avi|mkv))["\']',
r'(https?://[^\s<>"\'\)]+\.(?:mp4|webm|ogg|mov|avi|mkv))',
]
for pattern in video_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
url = match if isinstance(match, str) else match[0] if match else ''
if url:
if url.startswith('/') and base_url:
from urllib.parse import urljoin
url = urljoin(base_url, url)
if url not in media['video']:
media['video'].append(url)
# Find images (already extracted in browser.py, but also check text)
existing_images = page_content.get('images', [])
for img in existing_images:
src = img.get('src', '')
if src and src not in media['images']:
if src.startswith('/') and base_url:
from urllib.parse import urljoin
src = urljoin(base_url, src)
media['images'].append(src)
# Also find images in text/HTML
image_patterns = [
r'<img[^>]+src=["\']([^"\']+)["\']',
r'(https?://[^\s<>"\'\)]+\.(?:jpg|jpeg|png|gif|bmp|webp))',
]
for pattern in image_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
url = match if isinstance(match, str) else match[0] if match else ''
if url:
if url.startswith('/') and base_url:
from urllib.parse import urljoin
url = urljoin(base_url, url)
if url not in media['images']:
media['images'].append(url)
return media
# Global instance
_media_processor: Optional[MediaProcessor] = None
def get_media_processor() -> MediaProcessor:
"""Get or create media processor instance."""
global _media_processor
if _media_processor is None:
_media_processor = MediaProcessor()
return _media_processor
|