| """ |
| Multilingual Audio Intelligence System - FastAPI Web Application |
| |
| Professional web interface for the complete multilingual audio intelligence pipeline. |
| Built with FastAPI, HTML templates, and modern CSS for production deployment. |
| |
| Features: |
| - Clean, professional UI design |
| - Real-time audio processing |
| - Interactive visualizations |
| - Multiple output formats |
| - RESTful API endpoints |
| - Production-ready architecture |
| |
| Author: Audio Intelligence Team |
| """ |
|
|
| import os |
| import sys |
| import logging |
| import tempfile |
| import json |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional, Any |
| import traceback |
| import asyncio |
| from datetime import datetime |
| import requests |
| import hashlib |
| from urllib.parse import urlparse |
| import secrets |
| from collections import defaultdict |
|
|
| |
| from fastapi import FastAPI, UploadFile, File, Form, Request, HTTPException |
| from fastapi.responses import HTMLResponse, FileResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| import uvicorn |
|
|
| |
| import numpy as np |
| import pandas as pd |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| from src.main import AudioIntelligencePipeline |
| MAIN_AVAILABLE = True |
| except Exception as e: |
| logger.error(f"Failed to import main pipeline: {e}") |
| MAIN_AVAILABLE = False |
|
|
| try: |
| import plotly.graph_objects as go |
| import plotly.utils |
| PLOTLY_AVAILABLE = True |
| except Exception as e: |
| logger.error(f"Failed to import Plotly: {e}") |
| PLOTLY_AVAILABLE = False |
|
|
| try: |
| from utils import validate_audio_file, format_duration, get_system_info |
| UTILS_AVAILABLE = True |
| except Exception as e: |
| logger.error(f"Failed to import utils: {e}") |
| UTILS_AVAILABLE = False |
| |
| |
| app = FastAPI( |
| title="Multilingual Audio Intelligence System", |
| description="Professional AI-powered speaker diarization, transcription, and translation", |
| version="1.0.0", |
| docs_url="/api/docs", |
| redoc_url="/api/redoc" |
| ) |
|
|
| |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| os.makedirs("static", exist_ok=True) |
| os.makedirs("templates", exist_ok=True) |
| os.makedirs("uploads", exist_ok=True) |
| os.makedirs("outputs", exist_ok=True) |
|
|
| app.mount("/static", StaticFiles(directory="static"), name="static") |
| app.mount("/demo_audio", StaticFiles(directory="demo_audio"), name="demo_audio") |
|
|
| |
| pipeline = None |
|
|
| |
| processing_status = {} |
| processing_results = {} |
|
|
| |
| DEMO_FILES = { |
| "yuri_kizaki": { |
| "name": "Yuri Kizaki", |
| "filename": "Yuri_Kizaki.mp3", |
| "display_name": "🇯🇵 Japanese Business Communication", |
| "language": "ja", |
| "description": "Professional audio message about website communication and business enhancement", |
| "url": "https://www.mitsue.co.jp/service/audio_and_video/audio_production/media/narrators_sample/yuri_kizaki/03.mp3", |
| "expected_text": "音声メッセージが既存のウェブサイトを超えたコミュニケーションを実現。目で見るだけだったウェブサイトに音声情報をインクルードすることで、情報に新しい価値を与え、他者との差別化に効果を発揮します。", |
| "expected_translation": "Audio messages enable communication beyond existing websites. By incorporating audio information into visually-driven websites, you can add new value to the information and effectively differentiate your website from others.", |
| "category": "business", |
| "difficulty": "intermediate", |
| "duration": "00:01:45" |
| }, |
| "film_podcast": { |
| "name": "Film Podcast", |
| "filename": "Film_Podcast.mp3", |
| "display_name": "🇫🇷 French Cinema Discussion", |
| "language": "fr", |
| "description": "In-depth French podcast discussing recent movies including Social Network and Paranormal Activity", |
| "url": "https://www.lightbulblanguages.co.uk/resources/audio/film-podcast.mp3", |
| "expected_text": "Le film intitulé The Social Network traite de la création du site Facebook par Mark Zuckerberg et des problèmes judiciaires que cela a comporté pour le créateur de ce site.", |
| "expected_translation": "The film The Social Network deals with the creation of Facebook by Mark Zuckerberg and the legal problems this caused for the creator of this site.", |
| "category": "entertainment", |
| "difficulty": "advanced", |
| "duration": "00:03:32" |
| }, |
| "tamil_interview": { |
| "name": "Tamil Wikipedia Interview", |
| "filename": "Tamil_Wikipedia_Interview.ogg", |
| "display_name": "🇮🇳 Tamil Wikipedia Interview", |
| "language": "ta", |
| "description": "NEW: Tamil language interview about Wikipedia and collaborative knowledge sharing in South India", |
| "url": "https://upload.wikimedia.org/wikipedia/commons/5/54/Parvathisri-Wikipedia-Interview-Vanavil-fm.ogg", |
| "expected_text": "விக்கிபீடியா என்பது ஒரு கூட்டு முயற்சியாகும். இது தமிழ் மொழியில் அறிவைப் பகிர்ந்து கொள்வதற்கான ஒரு சிறந்த தளமாகும்.", |
| "expected_translation": "Wikipedia is a collaborative effort. It is an excellent platform for sharing knowledge in the Tamil language.", |
| "category": "education", |
| "difficulty": "advanced", |
| "duration": "00:36:17", |
| "featured": True, |
| "new": True, |
| "indian_language": True |
| }, |
| "car_trouble": { |
| "name": "Car Trouble", |
| "filename": "Car_Trouble.mp3", |
| "display_name": "🇮🇳 Hindi Daily Conversation", |
| "language": "hi", |
| "description": "NEW: Real-world Hindi conversation about car problems and waiting for a mechanic", |
| "url": "https://www.tuttlepublishing.com/content/docs/9780804844383/06-18%20Part2%20Car%20Trouble.mp3", |
| "expected_text": "गाड़ी खराब हो गई है। मैकेनिक का इंतज़ार कर रहे हैं। कुछ समय लगेगा।", |
| "expected_translation": "The car has broken down. We are waiting for the mechanic. It will take some time.", |
| "category": "daily_life", |
| "difficulty": "beginner", |
| "duration": "00:02:45", |
| "featured": True, |
| "new": True, |
| "indian_language": True |
| } |
| } |
|
|
| @app.get("/health") |
| async def health(): |
| """Simple health check endpoint.""" |
| try: |
| |
| import shutil |
| total, used, free = shutil.disk_usage(".") |
| |
| if free < 50 * 1024 * 1024: |
| return {"status": "error", "detail": "Low disk space"} |
| |
| |
| if not hasattr(app.state, "models_loaded") or not app.state.models_loaded: |
| return {"status": "error", "detail": "Models not loaded"} |
| |
| return {"status": "ok"} |
| |
| except Exception as e: |
| return {"status": "error", "detail": str(e)} |
|
|
| |
| demo_results_cache = {} |
|
|
| |
| user_sessions = defaultdict(dict) |
| session_files = defaultdict(list) |
|
|
| def transform_to_old_format(results): |
| """Transform new JSON format to old format expected by frontend.""" |
| try: |
| |
| if 'segments' in results and 'summary' in results: |
| return results |
| |
| |
| segments = [] |
| summary = {} |
| |
| |
| if 'outputs' in results and 'json' in results['outputs']: |
| |
| try: |
| parsed_outputs = json.loads(results['outputs']['json']) |
| if 'segments' in parsed_outputs: |
| segments = parsed_outputs['segments'] |
| except (json.JSONDecodeError, TypeError): |
| pass |
| |
| |
| if not segments and 'segments' in results: |
| segments = results['segments'] |
| |
| |
| if 'processing_stats' in results: |
| stats = results['processing_stats'] |
| summary = { |
| 'total_duration': results.get('audio_metadata', {}).get('duration_seconds', 0), |
| 'num_speakers': stats.get('num_speakers', 1), |
| 'num_segments': stats.get('num_segments', len(segments)), |
| 'languages': stats.get('languages_detected', ['unknown']), |
| 'processing_time': stats.get('total_time', 0) |
| } |
| else: |
| |
| summary = { |
| 'total_duration': 0, |
| 'num_speakers': 1, |
| 'num_segments': len(segments), |
| 'languages': ['unknown'], |
| 'processing_time': 0 |
| } |
| |
| |
| formatted_segments = [] |
| for seg in segments: |
| if isinstance(seg, dict): |
| formatted_seg = { |
| 'speaker': seg.get('speaker_id', seg.get('speaker', 'SPEAKER_00')), |
| 'start_time': seg.get('start_time', 0), |
| 'end_time': seg.get('end_time', 0), |
| 'text': seg.get('original_text', seg.get('text', '')), |
| 'translated_text': seg.get('translated_text', ''), |
| 'language': seg.get('original_language', seg.get('language', 'unknown')) |
| } |
| formatted_segments.append(formatted_seg) |
| |
| result = { |
| 'segments': formatted_segments, |
| 'summary': summary |
| } |
| |
| logger.info(f"✅ Transformed results: {len(formatted_segments)} segments, summary keys: {list(summary.keys())}") |
| return result |
| |
| except Exception as e: |
| logger.error(f"❌ Error transforming results to old format: {e}") |
| |
| return { |
| 'segments': [], |
| 'summary': { |
| 'total_duration': 0, |
| 'num_speakers': 0, |
| 'num_segments': 0, |
| 'languages': [], |
| 'processing_time': 0 |
| } |
| } |
|
|
| class SessionManager: |
| """Manages user sessions and cleanup.""" |
| |
| def __init__(self): |
| self.sessions = user_sessions |
| self.session_files = session_files |
| self.cleanup_interval = 3600 |
| |
| def generate_session_id(self, request: Request) -> str: |
| """Generate a unique session ID based on user fingerprint.""" |
| |
| fingerprint_data = [ |
| request.client.host if request.client else "unknown", |
| request.headers.get("user-agent", "")[:100], |
| request.headers.get("accept-language", "")[:50], |
| ] |
| |
| |
| fingerprint = "|".join(fingerprint_data) |
| session_id = hashlib.sha256(fingerprint.encode()).hexdigest()[:16] |
| |
| |
| if session_id not in self.sessions: |
| self.sessions[session_id] = { |
| "created_at": time.time(), |
| "last_activity": time.time(), |
| "ip": request.client.host if request.client else "unknown", |
| "user_agent": request.headers.get("user-agent", "")[:100] |
| } |
| logger.info(f"🔑 New session created: {session_id}") |
| else: |
| |
| self.sessions[session_id]["last_activity"] = time.time() |
| |
| return session_id |
| |
| def add_file_to_session(self, session_id: str, file_path: str): |
| """Associate a file with a user session.""" |
| self.session_files[session_id].append({ |
| "file_path": file_path, |
| "created_at": time.time() |
| }) |
| logger.info(f"📁 Added file to session {session_id}: {file_path}") |
| |
| def cleanup_session(self, session_id: str): |
| """Clean up all files associated with a session.""" |
| if session_id not in self.session_files: |
| return |
| |
| files_cleaned = 0 |
| for file_info in self.session_files[session_id]: |
| file_path = Path(file_info["file_path"]) |
| try: |
| if file_path.exists(): |
| file_path.unlink() |
| files_cleaned += 1 |
| logger.info(f"🗑️ Cleaned up file: {file_path}") |
| except Exception as e: |
| logger.warning(f"⚠️ Failed to delete {file_path}: {e}") |
| |
| |
| if session_id in self.sessions: |
| del self.sessions[session_id] |
| if session_id in self.session_files: |
| del self.session_files[session_id] |
| |
| logger.info(f"✅ Session cleanup completed for {session_id}: {files_cleaned} files removed") |
| return files_cleaned |
| |
| def cleanup_expired_sessions(self): |
| """Clean up sessions that haven't been active for a while.""" |
| current_time = time.time() |
| expired_sessions = [] |
| |
| for session_id, session_data in list(self.sessions.items()): |
| if current_time - session_data["last_activity"] > self.cleanup_interval: |
| expired_sessions.append(session_id) |
| |
| total_cleaned = 0 |
| for session_id in expired_sessions: |
| files_cleaned = self.cleanup_session(session_id) |
| total_cleaned += files_cleaned |
| |
| if expired_sessions: |
| logger.info(f"🕒 Expired session cleanup: {len(expired_sessions)} sessions, {total_cleaned} files") |
| |
| return len(expired_sessions), total_cleaned |
|
|
| |
| session_manager = SessionManager() |
|
|
| class DemoManager: |
| """Manages demo files and preprocessing.""" |
| |
| def __init__(self): |
| self.demo_dir = Path("demo_audio") |
| self.demo_dir.mkdir(exist_ok=True) |
| self.results_dir = Path("demo_results") |
| self.results_dir.mkdir(exist_ok=True) |
| |
| async def ensure_demo_files(self): |
| """Ensure demo files are available and processed.""" |
| logger.info("🔄 Checking demo files...") |
| |
| for demo_id, config in DEMO_FILES.items(): |
| logger.info(f"📁 Checking demo file: {config['filename']}") |
| file_path = self.demo_dir / config["filename"] |
| results_path = self.results_dir / f"{demo_id}_results.json" |
| |
| |
| if not file_path.exists(): |
| if config["url"] == "local": |
| logger.warning(f"❌ Local demo file not found: {config['filename']}") |
| logger.info(f" Expected location: {file_path}") |
| continue |
| else: |
| logger.info(f"⬇️ Downloading demo file: {config['filename']}") |
| try: |
| await self.download_demo_file(config["url"], file_path) |
| logger.info(f"✅ Downloaded: {config['filename']}") |
| except Exception as e: |
| logger.error(f"❌ Failed to download {config['filename']}: {e}") |
| continue |
| else: |
| logger.info(f"✅ Demo file exists: {config['filename']}") |
| |
| |
| if not results_path.exists(): |
| logger.info(f"🔄 Processing demo file: {config['filename']} (first time)") |
| try: |
| await self.process_demo_file(demo_id, file_path, results_path) |
| logger.info(f"✅ Demo processing completed: {config['filename']}") |
| except Exception as e: |
| logger.error(f"❌ Failed to process {config['filename']}: {e}") |
| continue |
| else: |
| logger.info(f"📋 Using cached results: {demo_id}") |
| |
| |
| try: |
| if results_path.exists() and results_path.stat().st_size > 0: |
| with open(results_path, 'r', encoding='utf-8') as f: |
| demo_results_cache[demo_id] = json.load(f) |
| logger.info(f"✅ Loaded cached results for {demo_id}") |
| else: |
| logger.warning(f"⚠️ Results file empty or missing for {demo_id}") |
| except json.JSONDecodeError as e: |
| logger.error(f"❌ Invalid JSON in {demo_id} results: {e}") |
| |
| if results_path.exists(): |
| results_path.unlink() |
| logger.info(f"🗑️ Deleted corrupted results for {demo_id}, will reprocess on next startup") |
| except Exception as e: |
| logger.error(f"❌ Failed to load cached results for {demo_id}: {e}") |
| |
| logger.info(f"✅ Demo files check completed. Available: {len(demo_results_cache)}") |
| |
| async def download_demo_file(self, url: str, file_path: Path): |
| """Download demo file from URL.""" |
| response = requests.get(url, timeout=30) |
| response.raise_for_status() |
| |
| with open(file_path, 'wb') as f: |
| f.write(response.content) |
| |
| logger.info(f"Downloaded demo file: {file_path.name}") |
| |
| async def process_demo_file(self, demo_id: str, file_path: Path, results_path: Path): |
| """Process a demo file and cache results.""" |
| logger.info(f"🎵 Starting demo processing: {file_path.name}") |
| |
| try: |
| |
| global pipeline |
| if pipeline is None: |
| from src.main import AudioIntelligencePipeline |
| pipeline = AudioIntelligencePipeline( |
| whisper_model_size="small", |
| target_language="en", |
| device="cpu" |
| ) |
| |
| |
| results = pipeline.process_audio( |
| audio_file=file_path, |
| output_dir=Path("outputs") |
| ) |
| |
| |
| with open(results_path, 'w', encoding='utf-8') as f: |
| json.dump(results, f, indent=2, ensure_ascii=False, default=str) |
| |
| |
| demo_results_cache[demo_id] = results |
| |
| logger.info(f"✅ Demo processing completed and cached: {file_path.name}") |
| return results |
| |
| except Exception as e: |
| logger.error(f"❌ Demo processing failed for {file_path.name}: {e}") |
| raise |
| |
| def format_demo_results(self, results: Dict, demo_id: str) -> Dict: |
| """Format pipeline results for demo display.""" |
| formatted_results = { |
| "segments": [], |
| "summary": { |
| "total_duration": 0, |
| "num_speakers": 0, |
| "num_segments": 0, |
| "languages": [], |
| "processing_time": 0 |
| } |
| } |
| |
| try: |
| |
| if 'processed_segments' in results: |
| for seg in results['processed_segments']: |
| formatted_results["segments"].append({ |
| "speaker": seg.speaker_id if hasattr(seg, 'speaker_id') else "Speaker 1", |
| "start_time": seg.start_time if hasattr(seg, 'start_time') else 0, |
| "end_time": seg.end_time if hasattr(seg, 'end_time') else 0, |
| "text": seg.original_text if hasattr(seg, 'original_text') else "", |
| "translated_text": seg.translated_text if hasattr(seg, 'translated_text') else "", |
| "language": seg.original_language if hasattr(seg, 'original_language') else "unknown" |
| }) |
| |
| |
| if 'audio_metadata' in results: |
| metadata = results['audio_metadata'] |
| formatted_results["summary"]["total_duration"] = metadata.get('duration_seconds', 0) |
| |
| if 'processing_stats' in results: |
| stats = results['processing_stats'] |
| formatted_results["summary"]["processing_time"] = stats.get('total_time', 0) |
| |
| |
| formatted_results["summary"]["num_segments"] = len(formatted_results["segments"]) |
| speakers = set(seg["speaker"] for seg in formatted_results["segments"]) |
| formatted_results["summary"]["num_speakers"] = len(speakers) |
| languages = set(seg["language"] for seg in formatted_results["segments"] if seg["language"] != 'unknown') |
| formatted_results["summary"]["languages"] = list(languages) if languages else ["unknown"] |
| |
| except Exception as e: |
| logger.error(f"Error formatting demo results: {e}") |
| |
| formatted_results["segments"] = [ |
| { |
| "speaker": "Speaker 1", |
| "start_time": 0.0, |
| "end_time": 5.0, |
| "text": f"Demo processing completed. Error in formatting: {str(e)}", |
| "translated_text": f"Demo processing completed. Error in formatting: {str(e)}", |
| "language": "en" |
| } |
| ] |
| formatted_results["summary"]["total_duration"] = 5.0 |
| formatted_results["summary"]["num_segments"] = 1 |
| formatted_results["summary"]["num_speakers"] = 1 |
| formatted_results["summary"]["languages"] = ["en"] |
| |
| return formatted_results |
| |
| def create_fallback_results(self, demo_id: str, error_msg: str) -> Dict: |
| """Create fallback results when demo processing fails.""" |
| config = DEMO_FILES[demo_id] |
| return { |
| "segments": [ |
| { |
| "speaker": "System", |
| "start_time": 0.0, |
| "end_time": 1.0, |
| "text": f"Demo processing failed: {error_msg}", |
| "translated_text": f"Demo processing failed: {error_msg}", |
| "language": "en" |
| } |
| ], |
| "summary": { |
| "total_duration": 1.0, |
| "num_speakers": 1, |
| "num_segments": 1, |
| "languages": ["en"], |
| "processing_time": 0.1 |
| } |
| } |
|
|
| |
| demo_manager = DemoManager() |
|
|
|
|
| class AudioProcessor: |
| """Audio processing class with error handling.""" |
| |
| def __init__(self): |
| self.pipeline = None |
| |
| def initialize_pipeline(self, whisper_model: str = "small", |
| target_language: str = "en", |
| hf_token: str = None): |
| """Initialize the audio intelligence pipeline.""" |
| if not MAIN_AVAILABLE: |
| raise Exception("Main pipeline module not available") |
| |
| if self.pipeline is None: |
| logger.info("Initializing Audio Intelligence Pipeline...") |
| try: |
| self.pipeline = AudioIntelligencePipeline( |
| whisper_model_size=whisper_model, |
| target_language=target_language, |
| device="auto", |
| hf_token=hf_token or os.getenv('HUGGINGFACE_TOKEN'), |
| output_dir="./outputs" |
| ) |
| logger.info("Pipeline initialization complete!") |
| except Exception as e: |
| logger.error(f"Pipeline initialization failed: {e}") |
| raise |
| |
| return self.pipeline |
| |
| async def process_audio_file(self, file_path: str, |
| whisper_model: str = "small", |
| target_language: str = "en", |
| hf_token: str = None, |
| task_id: str = None) -> Dict[str, Any]: |
| """Process audio file and return results.""" |
| try: |
| |
| if task_id: |
| processing_status[task_id] = {"status": "initializing", "progress": 10} |
| |
| |
| try: |
| pipeline = self.initialize_pipeline(whisper_model, target_language, hf_token) |
| except Exception as e: |
| logger.error(f"Pipeline initialization failed: {e}") |
| if task_id: |
| processing_status[task_id] = {"status": "error", "error": f"Pipeline initialization failed: {str(e)}"} |
| raise |
| |
| if task_id: |
| processing_status[task_id] = {"status": "processing", "progress": 30} |
| |
| |
| try: |
| logger.info(f"Processing audio file: {file_path}") |
| results = pipeline.process_audio( |
| file_path, |
| save_outputs=True, |
| output_formats=['json', 'srt_original', 'srt_translated', 'text', 'summary'] |
| ) |
| logger.info("Audio processing completed successfully") |
| except Exception as e: |
| logger.error(f"Audio processing failed: {e}") |
| if task_id: |
| processing_status[task_id] = {"status": "error", "error": f"Audio processing failed: {str(e)}"} |
| raise |
| |
| if task_id: |
| processing_status[task_id] = {"status": "generating_outputs", "progress": 80} |
| |
| |
| try: |
| viz_data = self.create_visualization_data(results) |
| results['visualization'] = viz_data |
| except Exception as e: |
| logger.warning(f"Visualization generation failed: {e}") |
| results['visualization'] = {"error": str(e)} |
| |
| |
| if task_id: |
| processing_results[task_id] = results |
| processing_status[task_id] = {"status": "complete", "progress": 100} |
| |
| return results |
| |
| except Exception as e: |
| logger.error(f"Audio processing failed: {e}") |
| if task_id: |
| processing_status[task_id] = {"status": "error", "error": str(e)} |
| raise |
| |
| def create_visualization_data(self, results: Dict) -> Dict: |
| """Create visualization data from processing results.""" |
| viz_data = {} |
| |
| try: |
| |
| if PLOTLY_AVAILABLE and results.get('processed_segments'): |
| segments = results['processed_segments'] |
| |
| |
| duration = results.get('audio_metadata', {}).get('duration_seconds', 30) |
| |
| |
| |
| time_points = np.linspace(0, duration, min(1000, int(duration * 50))) |
| waveform = np.random.randn(len(time_points)) * 0.1 |
| |
| |
| fig = go.Figure() |
| |
| |
| fig.add_trace(go.Scatter( |
| x=time_points, |
| y=waveform, |
| mode='lines', |
| name='Waveform', |
| line=dict(color='#2563eb', width=1) |
| )) |
| |
| |
| colors = ['#dc2626', '#059669', '#7c2d12', '#4338ca', '#be185d'] |
| for i, seg in enumerate(segments): |
| color = colors[i % len(colors)] |
| fig.add_vrect( |
| x0=seg.start_time, |
| x1=seg.end_time, |
| fillcolor=color, |
| opacity=0.2, |
| line_width=0, |
| annotation_text=f"{seg.speaker_id}", |
| annotation_position="top left" |
| ) |
| |
| fig.update_layout( |
| title="Audio Waveform with Speaker Segments", |
| xaxis_title="Time (seconds)", |
| yaxis_title="Amplitude", |
| height=400, |
| showlegend=False |
| ) |
| |
| viz_data['waveform'] = json.loads(fig.to_json()) |
| |
| except Exception as e: |
| logger.error(f"Visualization creation failed: {e}") |
| viz_data['waveform'] = None |
| |
| return viz_data |
|
|
|
|
| |
| audio_processor = AudioProcessor() |
|
|
|
|
|
|
|
|
| |
| @app.get("/", response_class=HTMLResponse) |
| async def home(request: Request): |
| """Home page.""" |
| return templates.TemplateResponse("index.html", {"request": request}) |
| |
| |
| @app.post("/api/upload") |
| async def upload_audio( |
| request: Request, |
| file: UploadFile = File(...), |
| whisper_model: str = Form("small"), |
| target_language: str = Form("en"), |
| hf_token: Optional[str] = Form(None) |
| ): |
| """Upload and process audio file.""" |
| try: |
| |
| session_id = session_manager.generate_session_id(request) |
| logger.info(f"🔑 Processing upload for session: {session_id}") |
| |
| |
| if not file.filename: |
| raise HTTPException(status_code=400, detail="No file provided") |
| |
| |
| allowed_types = ['.wav', '.mp3', '.ogg', '.flac', '.m4a'] |
| file_ext = Path(file.filename).suffix.lower() |
| if file_ext not in allowed_types: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported file type. Allowed: {', '.join(allowed_types)}" |
| ) |
| |
| |
| file_path = f"uploads/{session_id}_{int(time.time())}_{file.filename}" |
| with open(file_path, "wb") as buffer: |
| content = await file.read() |
| buffer.write(content) |
| |
| |
| session_manager.add_file_to_session(session_id, file_path) |
| |
| |
| task_id = f"task_{session_id}_{int(time.time())}" |
| |
| |
| asyncio.create_task( |
| audio_processor.process_audio_file( |
| file_path, whisper_model, target_language, hf_token, task_id |
| )) |
| |
| return JSONResponse({ |
| "task_id": task_id, |
| "message": "Processing started", |
| "filename": file.filename |
| }) |
| |
| except Exception as e: |
| logger.error(f"Upload failed: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| |
|
|
| @app.get("/api/status/{task_id}") |
| async def get_status(task_id: str): |
| """Get processing status.""" |
| if task_id not in processing_status: |
| raise HTTPException(status_code=404, detail="Task not found") |
| |
| return JSONResponse(processing_status[task_id]) |
|
|
|
|
| @app.get("/api/results/{task_id}") |
| async def get_results(task_id: str): |
| """Get processing results.""" |
| if task_id not in processing_status: |
| raise HTTPException(status_code=404, detail="Task not found") |
|
|
| status = processing_status[task_id] |
| if status.get("status") != "complete": |
| raise HTTPException(status_code=202, detail="Processing not complete") |
|
|
| |
| if task_id in processing_results: |
| results = processing_results[task_id] |
| logger.info(f"📊 Found results for task {task_id}: {type(results)}") |
| logger.info(f"📊 Results keys: {list(results.keys()) if isinstance(results, dict) else 'Not a dict'}") |
|
|
| |
| formatted_results = { |
| "segments": [], |
| "summary": { |
| "total_duration": 0, |
| "num_speakers": 0, |
| "num_segments": 0, |
| "languages": [], |
| "processing_time": 0 |
| } |
| } |
|
|
| try: |
| |
| if 'processed_segments' in results: |
| for seg in results['processed_segments']: |
| formatted_results["segments"].append({ |
| "speaker": seg.speaker_id if hasattr(seg, 'speaker_id') else "Unknown Speaker", |
| "start_time": seg.start_time if hasattr(seg, 'start_time') else 0, |
| "end_time": seg.end_time if hasattr(seg, 'end_time') else 0, |
| "text": seg.original_text if hasattr(seg, 'original_text') else "", |
| "translated_text": seg.translated_text if hasattr(seg, 'translated_text') else "", |
| "language": seg.original_language if hasattr(seg, 'original_language') else "unknown", |
| }) |
|
|
| |
| if 'audio_metadata' in results: |
| metadata = results['audio_metadata'] |
| formatted_results["summary"]["total_duration"] = metadata.get('duration_seconds', 0) |
|
|
| if 'processing_stats' in results: |
| stats = results['processing_stats'] |
| formatted_results["summary"]["processing_time"] = stats.get('total_time', 0) |
|
|
| |
| formatted_results["summary"]["num_segments"] = len(formatted_results["segments"]) |
| speakers = set(seg["speaker"] for seg in formatted_results["segments"]) |
| formatted_results["summary"]["num_speakers"] = len(speakers) |
| languages = set( |
| seg["language"] for seg in formatted_results["segments"] if seg["language"] != 'unknown' |
| ) |
| formatted_results["summary"]["languages"] = list(languages) if languages else ["unknown"] |
|
|
| except Exception as e: |
| logger.error(f"Error formatting results: {e}") |
| |
| formatted_results = { |
| "segments": [ |
| { |
| "speaker": "Speaker 1", |
| "start_time": 0.0, |
| "end_time": 5.0, |
| "text": f"Processed audio from file. Full results processing encountered an error: {str(e)}", |
| "language": "en", |
| } |
| ], |
| "summary": { |
| "total_duration": 5.0, |
| "num_speakers": 1, |
| "num_segments": 1, |
| "languages": ["en"], |
| "processing_time": 2.0 |
| } |
| } |
|
|
| logger.info(f"📤 Returning formatted results for task {task_id}: {len(formatted_results.get('segments', []))} segments") |
| return JSONResponse({ |
| "task_id": task_id, |
| "status": "complete", |
| "results": formatted_results |
| }) |
| |
| else: |
| |
| return JSONResponse({ |
| "task_id": task_id, |
| "status": "complete", |
| "results": { |
| "segments": [ |
| { |
| "speaker": "System", |
| "start_time": 0.0, |
| "end_time": 1.0, |
| "text": "Audio processing completed but results are not available for display.", |
| "language": "en", |
| } |
| ], |
| "summary": { |
| "total_duration": 1.0, |
| "num_speakers": 1, |
| "num_segments": 1, |
| "languages": ["en"], |
| "processing_time": 0.1 |
| } |
| } |
| }) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| @app.get("/api/download/{task_id}/{format}") |
| async def download_results(task_id: str, format: str): |
| """Download results in specified format.""" |
| if task_id not in processing_status: |
| raise HTTPException(status_code=404, detail="Task not found") |
| |
| status = processing_status[task_id] |
| if status.get("status") != "complete": |
| raise HTTPException(status_code=202, detail="Processing not complete") |
| |
| |
| if task_id in processing_results: |
| results = processing_results[task_id] |
| else: |
| |
| results = { |
| 'processed_segments': [ |
| type('Segment', (), { |
| 'speaker': 'Speaker 1', |
| 'start_time': 0.0, |
| 'end_time': 3.5, |
| 'text': 'Sample transcript content for download.', |
| 'language': 'en' |
| })() |
| ] |
| } |
| |
| |
| if format == "json": |
| try: |
| |
| json_path = f"outputs/{task_id}_complete_results.json" |
| if os.path.exists(json_path): |
| with open(json_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| else: |
| |
| export_data = { |
| "task_id": task_id, |
| "timestamp": datetime.now().isoformat(), |
| "segments": [] |
| } |
| |
| if 'processed_segments' in results: |
| for seg in results['processed_segments']: |
| export_data["segments"].append({ |
| "speaker": seg.speaker_id if hasattr(seg, 'speaker_id') else "Unknown", |
| "start_time": seg.start_time if hasattr(seg, 'start_time') else 0, |
| "end_time": seg.end_time if hasattr(seg, 'end_time') else 0, |
| "text": seg.original_text if hasattr(seg, 'original_text') else "", |
| "language": seg.original_language if hasattr(seg, 'original_language') else "unknown" |
| }) |
| |
| content = json.dumps(export_data, indent=2, ensure_ascii=False) |
| except Exception as e: |
| logger.error(f"Error generating JSON: {e}") |
| content = json.dumps({"error": f"Failed to generate JSON: {str(e)}"}, indent=2) |
| |
| filename = f"results_{task_id}.json" |
| media_type = "application/json" |
| |
| elif format == "srt": |
| try: |
| |
| srt_path = f"outputs/{task_id}_subtitles_original.srt" |
| if os.path.exists(srt_path): |
| with open(srt_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| else: |
| |
| srt_lines = [] |
| if 'processed_segments' in results: |
| for i, seg in enumerate(results['processed_segments'], 1): |
| start_time = seg.start_time if hasattr(seg, 'start_time') else 0 |
| end_time = seg.end_time if hasattr(seg, 'end_time') else 0 |
| text = seg.original_text if hasattr(seg, 'original_text') else "" |
| |
| |
| start_srt = format_srt_time(start_time) |
| end_srt = format_srt_time(end_time) |
| |
| srt_lines.extend([ |
| str(i), |
| f"{start_srt} --> {end_srt}", |
| text, |
| "" |
| ]) |
| |
| content = "\n".join(srt_lines) |
| except Exception as e: |
| logger.error(f"Error generating SRT: {e}") |
| content = f"1\n00:00:00,000 --> 00:00:05,000\nError generating SRT: {str(e)}\n" |
| |
| filename = f"subtitles_{task_id}.srt" |
| media_type = "text/plain" |
| |
| elif format == "txt": |
| try: |
| |
| txt_path = f"outputs/{task_id}_transcript.txt" |
| if os.path.exists(txt_path): |
| with open(txt_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| else: |
| |
| text_lines = [] |
| if 'processed_segments' in results: |
| for seg in results['processed_segments']: |
| speaker = seg.speaker_id if hasattr(seg, 'speaker_id') else "Unknown" |
| text = seg.original_text if hasattr(seg, 'original_text') else "" |
| text_lines.append(f"{speaker}: {text}") |
| |
| content = "\n".join(text_lines) |
| except Exception as e: |
| logger.error(f"Error generating text: {e}") |
| content = f"Error generating transcript: {str(e)}" |
| |
| filename = f"transcript_{task_id}.txt" |
| media_type = "text/plain" |
| |
| else: |
| raise HTTPException(status_code=400, detail="Unsupported format") |
| |
| |
| temp_path = f"outputs/{filename}" |
| os.makedirs("outputs", exist_ok=True) |
| |
| try: |
| with open(temp_path, "w", encoding="utf-8") as f: |
| f.write(content) |
| except Exception as e: |
| logger.error(f"Error saving file: {e}") |
| raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") |
| |
| return FileResponse( |
| temp_path, |
| media_type=media_type, |
| filename=filename |
| ) |
|
|
|
|
| def format_srt_time(seconds: float) -> str: |
| """Convert seconds to SRT time format (HH:MM:SS,mmm).""" |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
| milliseconds = int((seconds % 1) * 1000) |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{milliseconds:03d}" |
|
|
|
|
| @app.get("/api/system-info") |
| async def get_system_info(): |
| """Get system information.""" |
| |
| |
| info = { |
| "version": "1.0.0", |
| "features": [ |
| "Speaker Diarization", |
| "Speech Recognition", |
| "Neural Translation", |
| "Interactive Visualization" |
| ], |
| "status": "Live", |
| "statusColor": "green" |
| } |
| |
| if UTILS_AVAILABLE: |
| try: |
| |
|
|
| |
| health_status = "Live" |
| health_color = "green" |
| |
| |
| import psutil |
| import platform |
| |
| try: |
| cpu_percent = psutil.cpu_percent(interval=1) |
| memory = psutil.virtual_memory() |
| disk = psutil.disk_usage('/') |
| |
| info.update({ |
| "system": { |
| "platform": platform.system(), |
| "python_version": platform.python_version(), |
| "cpu_usage": f"{cpu_percent}%", |
| "memory_usage": f"{memory.percent}%", |
| "disk_usage": f"{disk.percent}%" |
| } |
| }) |
| except ImportError: |
| |
| info.update({ |
| "system": { |
| "platform": platform.system(), |
| "python_version": platform.python_version() |
| } |
| }) |
| except Exception as e: |
| logger.warning(f"Failed to get system metrics: {e}") |
| |
| info["status"] = health_status |
| info["statusColor"] = health_color |
| |
|
|
| except Exception as e: |
| logger.error(f"Failed to get system info: {e}") |
| |
| return JSONResponse(info) |
|
|
|
|
| |
|
|
|
|
| @app.get("/api/demo-files") |
| async def get_demo_files(): |
| """Get available demo files with status.""" |
| try: |
| demo_files = [] |
| |
| logger.info(f"📋 Building demo files list from {len(DEMO_FILES)} configurations") |
| |
| for demo_id, config in DEMO_FILES.items(): |
| file_path = demo_manager.demo_dir / config["filename"] |
| results_cached = demo_id in demo_results_cache |
| |
| demo_file_info = { |
| "id": demo_id, |
| "name": config.get("name", config.get("display_name", demo_id)), |
| "filename": config["filename"], |
| "language": config["language"], |
| "description": config["description"], |
| "category": config.get("category", "general"), |
| "difficulty": config.get("difficulty", "intermediate"), |
| "duration": config.get("duration", "unknown"), |
| "featured": config.get("featured", False), |
| "new": config.get("new", False), |
| "indian_language": config.get("indian_language", False), |
| "available": file_path.exists(), |
| "processed": results_cached, |
| "status": "ready" if results_cached else "processing" if file_path.exists() else "downloading" |
| } |
| |
| demo_files.append(demo_file_info) |
| logger.info(f"📁 Added demo file: {demo_id} -> {demo_file_info['name']}") |
| |
| logger.info(f"✅ Returning {len(demo_files)} demo files to frontend") |
| return JSONResponse(demo_files) |
| |
| except Exception as e: |
| logger.error(f"❌ Error building demo files list: {e}") |
| return JSONResponse({"demo_files": [], "error": str(e)}) |
|
|
|
|
| @app.get("/demo_audio/{filename}") |
| async def get_demo_audio(filename: str): |
| """Serve demo audio files.""" |
| try: |
| |
| filename = filename.replace('..', '').replace('/', '').replace('\\', '') |
| |
| |
| audio_path = Path("demo_audio") / filename |
| if not audio_path.exists(): |
| |
| for ext in ['.mp3', '.wav', '.ogg', '.m4a']: |
| audio_path_with_ext = Path("demo_audio") / f"{filename}{ext}" |
| if audio_path_with_ext.exists(): |
| audio_path = audio_path_with_ext |
| break |
| else: |
| raise HTTPException(status_code=404, detail="Demo audio file not found") |
| |
| |
| content_type = "audio/mpeg" |
| if audio_path.suffix.lower() == '.ogg': |
| content_type = "audio/ogg" |
| elif audio_path.suffix.lower() == '.wav': |
| content_type = "audio/wav" |
| elif audio_path.suffix.lower() == '.m4a': |
| content_type = "audio/mp4" |
| |
| logger.info(f"📻 Serving demo audio: {audio_path}") |
| return FileResponse( |
| path=str(audio_path), |
| media_type=content_type, |
| filename=audio_path.name |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error serving demo audio {filename}: {e}") |
| raise HTTPException(status_code=500, detail="Failed to serve demo audio") |
|
|
|
|
| @app.post("/api/process-demo/{demo_id}") |
| async def process_demo_by_id(demo_id: str): |
| """Process demo file by ID and return cached results.""" |
| try: |
| logger.info(f"🎯 Processing demo file: {demo_id}") |
| |
| |
| if demo_id not in DEMO_FILES: |
| raise HTTPException(status_code=404, detail=f"Demo file '{demo_id}' not found") |
| |
| |
| results_path = Path("demo_results") / f"{demo_id}_results.json" |
| |
| if results_path.exists(): |
| logger.info(f"📁 Loading cached results for {demo_id}") |
| try: |
| with open(results_path, 'r', encoding='utf-8') as f: |
| results = json.load(f) |
| |
| |
| transformed_results = transform_to_old_format(results) |
| |
| return JSONResponse({ |
| "status": "complete", |
| "results": transformed_results |
| }) |
| |
| except json.JSONDecodeError as e: |
| logger.error(f"❌ Failed to parse cached results for {demo_id}: {e}") |
| |
| |
| |
| logger.info(f"⚡ Processing demo file {demo_id} on-demand") |
| file_path = demo_manager.demo_dir / DEMO_FILES[demo_id]["filename"] |
| |
| if not file_path.exists(): |
| |
| try: |
| config = DEMO_FILES[demo_id] |
| await demo_manager.download_demo_file(config["url"], file_path) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to download demo file: {str(e)}") |
| |
| |
| results = await demo_manager.process_demo_file(demo_id, file_path, results_path) |
| |
| |
| transformed_results = transform_to_old_format(results) |
| |
| return JSONResponse({ |
| "status": "complete", |
| "results": transformed_results |
| }) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"❌ Error processing demo {demo_id}: {e}") |
| return JSONResponse({ |
| "status": "error", |
| "error": str(e) |
| }, status_code=500) |
|
|
|
|
| @app.post("/api/cleanup") |
| async def cleanup_session(request: Request): |
| """Clean up user session files.""" |
| try: |
| session_id = session_manager.generate_session_id(request) |
| files_cleaned = session_manager.cleanup_session(session_id) |
| |
| return JSONResponse({ |
| "status": "success", |
| "message": f"Cleaned up {files_cleaned} files for session {session_id}", |
| "files_cleaned": files_cleaned |
| }) |
| |
| except Exception as e: |
| logger.error(f"❌ Cleanup error: {e}") |
| return JSONResponse( |
| status_code=500, |
| content={"error": f"Cleanup failed: {str(e)}"} |
| ) |
|
|
|
|
| @app.post("/api/cleanup-expired") |
| async def cleanup_expired(): |
| """Clean up expired sessions (admin endpoint).""" |
| try: |
| sessions_cleaned, files_cleaned = session_manager.cleanup_expired_sessions() |
| |
| return JSONResponse({ |
| "status": "success", |
| "message": f"Cleaned up {sessions_cleaned} expired sessions", |
| "sessions_cleaned": sessions_cleaned, |
| "files_cleaned": files_cleaned |
| }) |
| |
| except Exception as e: |
| logger.error(f"❌ Expired cleanup error: {e}") |
| return JSONResponse( |
| status_code=500, |
| content={"error": f"Expired cleanup failed: {str(e)}"} |
| ) |
|
|
|
|
| @app.get("/api/session-info") |
| async def get_session_info(request: Request): |
| """Get current session information.""" |
| try: |
| session_id = session_manager.generate_session_id(request) |
| session_data = session_manager.sessions.get(session_id, {}) |
| files_count = len(session_manager.session_files.get(session_id, [])) |
| |
| return JSONResponse({ |
| "session_id": session_id, |
| "created_at": session_data.get("created_at"), |
| "last_activity": session_data.get("last_activity"), |
| "files_count": files_count, |
| "status": "active" |
| }) |
| |
| except Exception as e: |
| logger.error(f"❌ Session info error: {e}") |
| return JSONResponse( |
| status_code=500, |
| content={"error": f"Session info failed: {str(e)}"} |
| ) |
|
|
|
|
| async def startup_event(): |
| """Application startup tasks""" |
| logger.info("🚀 Starting Multilingual Audio Intelligence System...") |
| try: |
| system_info = get_system_info() |
| logger.info(f"📊 System Info: {system_info}") |
| except Exception as e: |
| logger.warning(f"⚠️ Could not get system info: {e}") |
| logger.info("📊 System Info: [System info unavailable]") |
| |
| |
| global demo_manager |
| demo_manager = DemoManager() |
| await demo_manager.ensure_demo_files() |
| |
| |
| sessions_cleaned, files_cleaned = session_manager.cleanup_expired_sessions() |
| if sessions_cleaned > 0: |
| logger.info(f"🧹 Startup cleanup: {sessions_cleaned} expired sessions, {files_cleaned} files") |
| |
| logger.info("✅ Startup completed successfully!") |
|
|
| async def shutdown_event(): |
| """Application shutdown tasks""" |
| logger.info("🛑 Shutting down Multilingual Audio Intelligence System...") |
| |
| |
| total_sessions = len(session_manager.sessions) |
| total_files = 0 |
| for session_id in list(session_manager.sessions.keys()): |
| files_cleaned = session_manager.cleanup_session(session_id) |
| total_files += files_cleaned |
| |
| if total_sessions > 0: |
| logger.info(f"🧹 Shutdown cleanup: {total_sessions} sessions, {total_files} files") |
|
|
| |
| app.add_event_handler("startup", startup_event) |
| app.add_event_handler("shutdown", shutdown_event) |
|
|
| |
| @app.middleware("http") |
| async def log_requests(request: Request, call_next): |
| start_time = time.time() |
| |
| |
| logger.info(f"📥 {request.method} {request.url.path}") |
| |
| response = await call_next(request) |
| |
| |
| process_time = time.time() - start_time |
| logger.info(f"📤 {request.method} {request.url.path} → {response.status_code} ({process_time:.2f}s)") |
| |
| return response |
|
|
| if __name__ == "__main__": |
| |
| uvicorn.run( |
| app, |
| host="0.0.0.0", |
| port=8000, |
| log_level="info" |
| ) |