|
|
"""
|
|
|
Media processing module for audio, video, and image quizzes.
|
|
|
Handles speech-to-text, video frame extraction, OCR, and more.
|
|
|
"""
|
|
|
import os
|
|
|
import logging
|
|
|
import base64
|
|
|
import io
|
|
|
import re
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
import requests
|
|
|
import httpx
|
|
|
|
|
|
from app.llm import ask_gpt, ocr_image_with_llm
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class MediaProcessor:
|
|
|
"""Process audio, video, and image content for quizzes."""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.supported_audio_formats = ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.webm', '.opus']
|
|
|
self.supported_video_formats = ['.mp4', '.webm', '.ogg', '.mov', '.avi', '.mkv']
|
|
|
self.supported_image_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
|
|
|
|
|
|
async def process_audio_from_url(self, audio_url: str) -> Optional[str]:
|
|
|
"""
|
|
|
Download and transcribe audio from URL.
|
|
|
|
|
|
Args:
|
|
|
audio_url: URL to audio file
|
|
|
|
|
|
Returns:
|
|
|
Transcribed text or None
|
|
|
"""
|
|
|
try:
|
|
|
logger.info(f"Processing audio from URL: {audio_url}")
|
|
|
|
|
|
|
|
|
response = requests.get(audio_url, timeout=30)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
audio_data = response.content
|
|
|
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
prompt = """Transcribe the audio content. Extract all spoken words, numbers, and any important information.
|
|
|
Return only the transcribed text, nothing else."""
|
|
|
|
|
|
|
|
|
|
|
|
transcription = await self._transcribe_audio_with_llm(audio_base64, audio_url)
|
|
|
|
|
|
if transcription:
|
|
|
logger.info(f"Audio transcribed successfully: {transcription[:100]}...")
|
|
|
return transcription
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing audio: {e}")
|
|
|
return None
|
|
|
|
|
|
async def _transcribe_audio_with_llm(self, audio_base64: str, audio_url: str) -> Optional[str]:
|
|
|
"""
|
|
|
Transcribe audio using LLM or external service.
|
|
|
|
|
|
Args:
|
|
|
audio_base64: Base64 encoded audio
|
|
|
audio_url: Original audio URL
|
|
|
|
|
|
Returns:
|
|
|
Transcription or None
|
|
|
"""
|
|
|
|
|
|
openai_key = os.getenv("OPENAI_API_KEY")
|
|
|
if openai_key:
|
|
|
try:
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=60) as client:
|
|
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
except Exception as e:
|
|
|
logger.debug(f"OpenAI Whisper not available: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.warning(f"Cannot transcribe audio directly - audio transcription requires specialized API")
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def process_video_from_url(self, video_url: str) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
Process video from URL - extract frames, transcribe audio, OCR text.
|
|
|
|
|
|
Args:
|
|
|
video_url: URL to video file
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with extracted information
|
|
|
"""
|
|
|
try:
|
|
|
logger.info(f"Processing video from URL: {video_url}")
|
|
|
|
|
|
|
|
|
response = requests.get(video_url, timeout=30, stream=True)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
|
|
|
video_info = {
|
|
|
'url': video_url,
|
|
|
'content_type': response.headers.get('content-type', ''),
|
|
|
'size': response.headers.get('content-length', 'unknown')
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
prompt = f"""I have a video file from this URL: {video_url}
|
|
|
Please analyze what might be in this video:
|
|
|
1. Any text visible in frames
|
|
|
2. Any spoken audio content
|
|
|
3. Visual elements
|
|
|
4. Any quiz-related information
|
|
|
|
|
|
Provide a comprehensive description."""
|
|
|
|
|
|
analysis = await ask_gpt(prompt, max_tokens=2000)
|
|
|
|
|
|
if analysis:
|
|
|
video_info['analysis'] = analysis
|
|
|
logger.info(f"Video analyzed: {analysis[:100]}...")
|
|
|
|
|
|
return video_info
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing video: {e}")
|
|
|
return None
|
|
|
|
|
|
async def process_image_from_url(self, image_url: str) -> Optional[str]:
|
|
|
"""
|
|
|
Process image from URL - extract text using OCR.
|
|
|
|
|
|
Args:
|
|
|
image_url: URL to image file
|
|
|
|
|
|
Returns:
|
|
|
Extracted text or None
|
|
|
"""
|
|
|
try:
|
|
|
logger.info(f"Processing image from URL: {image_url}")
|
|
|
|
|
|
|
|
|
response = requests.get(image_url, timeout=30)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
image_data = response.content
|
|
|
image_base64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
|
|
|
|
|
|
text = await ocr_image_with_llm(image_base64)
|
|
|
|
|
|
if text:
|
|
|
logger.info(f"Image OCR successful: {text[:100]}...")
|
|
|
return text
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing image: {e}")
|
|
|
return None
|
|
|
|
|
|
def find_media_in_page(self, page_content: Dict[str, Any]) -> Dict[str, List[str]]:
|
|
|
"""
|
|
|
Find all media files (audio, video, images) in page content.
|
|
|
|
|
|
Args:
|
|
|
page_content: Page content dictionary
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with lists of media URLs by type
|
|
|
"""
|
|
|
media = {
|
|
|
'audio': [],
|
|
|
'video': [],
|
|
|
'images': []
|
|
|
}
|
|
|
|
|
|
base_url = page_content.get('url', '')
|
|
|
text = page_content.get('text', '') + ' ' + page_content.get('html', '')
|
|
|
|
|
|
|
|
|
audio_patterns = [
|
|
|
r'<audio[^>]+src=["\']([^"\']+)["\']',
|
|
|
r'<source[^>]+src=["\']([^"\']+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))["\']',
|
|
|
r'(https?://[^\s<>"\'\)]+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))',
|
|
|
r'(/[^\s<>"\'\)]+\.(?:mp3|wav|ogg|m4a|flac|webm|opus))',
|
|
|
]
|
|
|
|
|
|
for pattern in audio_patterns:
|
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
|
for match in matches:
|
|
|
url = match if isinstance(match, str) else match[0] if match else ''
|
|
|
if url:
|
|
|
if url.startswith('/') and base_url:
|
|
|
from urllib.parse import urljoin
|
|
|
url = urljoin(base_url, url)
|
|
|
if url not in media['audio']:
|
|
|
media['audio'].append(url)
|
|
|
|
|
|
|
|
|
video_patterns = [
|
|
|
r'<video[^>]+src=["\']([^"\']+)["\']',
|
|
|
r'<source[^>]+src=["\']([^"\']+\.(?:mp4|webm|ogg|mov|avi|mkv))["\']',
|
|
|
r'(https?://[^\s<>"\'\)]+\.(?:mp4|webm|ogg|mov|avi|mkv))',
|
|
|
]
|
|
|
|
|
|
for pattern in video_patterns:
|
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
|
for match in matches:
|
|
|
url = match if isinstance(match, str) else match[0] if match else ''
|
|
|
if url:
|
|
|
if url.startswith('/') and base_url:
|
|
|
from urllib.parse import urljoin
|
|
|
url = urljoin(base_url, url)
|
|
|
if url not in media['video']:
|
|
|
media['video'].append(url)
|
|
|
|
|
|
|
|
|
existing_images = page_content.get('images', [])
|
|
|
for img in existing_images:
|
|
|
src = img.get('src', '')
|
|
|
if src and src not in media['images']:
|
|
|
if src.startswith('/') and base_url:
|
|
|
from urllib.parse import urljoin
|
|
|
src = urljoin(base_url, src)
|
|
|
media['images'].append(src)
|
|
|
|
|
|
|
|
|
image_patterns = [
|
|
|
r'<img[^>]+src=["\']([^"\']+)["\']',
|
|
|
r'(https?://[^\s<>"\'\)]+\.(?:jpg|jpeg|png|gif|bmp|webp))',
|
|
|
]
|
|
|
|
|
|
for pattern in image_patterns:
|
|
|
matches = re.findall(pattern, text, re.IGNORECASE)
|
|
|
for match in matches:
|
|
|
url = match if isinstance(match, str) else match[0] if match else ''
|
|
|
if url:
|
|
|
if url.startswith('/') and base_url:
|
|
|
from urllib.parse import urljoin
|
|
|
url = urljoin(base_url, url)
|
|
|
if url not in media['images']:
|
|
|
media['images'].append(url)
|
|
|
|
|
|
return media
|
|
|
|
|
|
|
|
|
|
|
|
_media_processor: Optional[MediaProcessor] = None
|
|
|
|
|
|
|
|
|
def get_media_processor() -> MediaProcessor:
|
|
|
"""Get or create media processor instance."""
|
|
|
global _media_processor
|
|
|
if _media_processor is None:
|
|
|
_media_processor = MediaProcessor()
|
|
|
return _media_processor
|
|
|
|
|
|
|