🎙️🤖 Azure-Powered AI Conference Service
Advanced AI-powered conference analysis with transcription, computer vision, and intelligent summarization using Azure AI Foundry
import gradio as gr import time import json import os import subprocess from datetime import datetime, timedelta from typing import List, Tuple, Optional from backend import ( ALLOWED_LANGS, AUDIO_FORMATS, transcription_manager, allowed_file, User ) from ai_summary import ai_summary_manager def format_status(status): """Convert status to user-friendly format""" status_map = { 'pending': '⏳ Queued', 'processing': '🔄 Processing', 'completed': '✅ Done', 'failed': '❌ Failed' } return status_map.get(status, status) def format_processing_time(created_at, completed_at=None): """Calculate and format processing time""" try: start_time = datetime.fromisoformat(created_at) if completed_at: end_time = datetime.fromisoformat(completed_at) duration = end_time - start_time else: duration = datetime.now() - start_time total_seconds = int(duration.total_seconds()) if total_seconds < 60: return f"{total_seconds}s" elif total_seconds < 3600: minutes = total_seconds // 60 seconds = total_seconds % 60 return f"{minutes}m {seconds}s" else: hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return f"{hours}h {minutes}m" except: return "Unknown" def get_user_stats_display(user: User): """Get comprehensive user statistics for display""" if not user: return "👤 Please log in to view statistics" try: # Get transcript stats transcript_stats = transcription_manager.get_user_stats(user.user_id) # Get AI summary stats summary_stats = transcription_manager.get_user_summary_stats(user.user_id) total_transcripts = transcript_stats.get('total_jobs', 0) total_summaries = summary_stats.get('total_jobs', 0) stats_text = f"👤 {user.username} | 🎙️ Transcripts: {total_transcripts} | 🤖 AI Summaries: {total_summaries}" # Add processing status processing_transcripts = transcript_stats.get('by_status', {}).get('processing', 0) processing_summaries = summary_stats.get('by_status', {}).get('processing', 0) if processing_transcripts > 0: stats_text += f" | 🔄 Transcribing: {processing_transcripts}" if processing_summaries > 0: stats_text += f" | 🔄 Summarizing: {processing_summaries}" return stats_text except Exception as e: return f"👤 {user.username} | Stats error: {str(e)}" # Authentication Functions (same as before) def register_user(email, username, password, confirm_password, gdpr_consent, data_retention_consent, marketing_consent): """Register new user account""" try: print(f"📝 Registration attempt for: {username} ({email})") # Validate inputs if not email or not username or not password: return "❌ All fields are required", gr.update(visible=False) if password != confirm_password: return "❌ Passwords do not match", gr.update(visible=False) if not gdpr_consent: return "❌ GDPR consent is required to create an account", gr.update(visible=False) if not data_retention_consent: return "❌ Data retention agreement is required", gr.update(visible=False) # Attempt registration success, message, user_id = transcription_manager.register_user( email, username, password, gdpr_consent, data_retention_consent, marketing_consent ) print(f"📝 Registration result: success={success}, message={message}") if success: print(f"✅ User registered successfully: {username}") return f"✅ {message}! Please log in with your credentials.", gr.update(visible=True) else: print(f"❌ Registration failed: {message}") return f"❌ {message}", gr.update(visible=False) except Exception as e: print(f"❌ Registration error: {str(e)}") return f"❌ Registration error: {str(e)}", gr.update(visible=False) def login_user(login, password): """Login user""" try: print(f"🔐 Login attempt for: {login}") if not login or not password: return "❌ Please enter both username/email and password", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." success, message, user = transcription_manager.login_user(login, password) print(f"🔐 Login result: success={success}, message={message}") if success and user: print(f"✅ User logged in successfully: {user.username}") stats_display = get_user_stats_display(user) return f"✅ Welcome back, {user.username}!", user, gr.update(visible=False), gr.update(visible=True), stats_display else: print(f"❌ Login failed: {message}") return f"❌ {message}", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." except Exception as e: print(f"❌ Login error: {str(e)}") return f"❌ Login error: {str(e)}", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." def logout_user(): """Logout user""" print("👋 User logged out") return None, "👋 You have been logged out. Please log in to continue.", gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." # Transcription Functions def submit_transcription(file, language, audio_format, diarization_enabled, speakers, profanity, punctuation, timestamps, lexical, user): """Submit transcription job - requires authenticated user""" if not user: return ( "❌ Please log in to submit transcriptions", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) if file is None: return ( "Please upload an audio or video file first.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) try: # Get file data try: if isinstance(file, str): if os.path.exists(file): with open(file, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file) else: return ( "File not found. Please try uploading again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) else: file_path = str(file) if os.path.exists(file_path): with open(file_path, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file_path) else: return ( "Unable to process file. Please try again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) except Exception as e: return ( f"Error reading file: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Validate file file_extension = original_filename.split('.')[-1].lower() if '.' in original_filename else "" supported_extensions = set(AUDIO_FORMATS) | { 'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v', 'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob' } if file_extension not in supported_extensions and file_extension != "": return ( f"Unsupported file format: .{file_extension}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Basic file size check if len(file_bytes) > 500 * 1024 * 1024: # 500MB limit return ( "File too large. Please upload files smaller than 500MB.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Prepare settings settings = { 'audio_format': audio_format, 'diarization_enabled': diarization_enabled, 'speakers': speakers, 'profanity': profanity, 'punctuation': punctuation, 'timestamps': timestamps, 'lexical': lexical } # Submit job job_id = transcription_manager.submit_transcription( file_bytes, original_filename, user.user_id, language, settings ) # Update job state job_state = { 'current_job_id': job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'pending' } # Get updated user stats stats_display = get_user_stats_display(user) return ( f"🚀 Transcription started for: {original_filename}\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Job ID: {job_id}", job_state, gr.update(visible=True, value="🔄 Auto-refresh active"), stats_display ) except Exception as e: print(f"❌ Error submitting transcription: {str(e)}") return ( f"Error: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) def check_current_job_status(job_state, user): """Check status of current job with improved transcript handling""" if not user: return ( "❌ Please log in to check status", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) if not job_state or 'current_job_id' not in job_state: return ( "No active job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return ( "Job not found or access denied", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Calculate processing time processing_time = format_processing_time(job.created_at, job.completed_at) # Enhanced status change logging last_status = job_state.get('last_status', '') if job.status != last_status: print(f"🔄 [{user.username}] Job status change: {last_status} → {job.status} ({job.original_filename})") job_state['last_status'] = job.status # Get updated user stats stats_display = get_user_stats_display(user) # Handle completed status with better transcript detection if job.status == 'completed' and job.transcript_text and job.transcript_text.strip(): # Job is complete and transcript is available, stop auto-refresh job_state['auto_refresh_active'] = False # Create downloadable file try: transcript_file = create_transcript_file(job.transcript_text, job_id) print(f"✅ [{user.username}] Transcription ready: {len(job.transcript_text)} characters") except Exception as e: print(f"⚠️ [{user.username}] Error creating transcript file: {str(e)}") transcript_file = None return ( f"✅ Transcription completed in {processing_time}", job.transcript_text, gr.update(visible=True, value=transcript_file) if transcript_file else gr.update(visible=False), f"Processed: {job.original_filename}", gr.update(visible=False), # Hide auto-refresh status stats_display ) elif job.status == 'failed': # Job failed, stop auto-refresh job_state['auto_refresh_active'] = False error_msg = job.error_message[:100] + "..." if job.error_message and len(job.error_message) > 100 else job.error_message or "Unknown error" return ( f"❌ Transcription failed after {processing_time}", "", gr.update(visible=False), f"Error: {error_msg}", gr.update(visible=False), stats_display ) elif job.status == 'processing': # Still processing, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"🔄 Processing... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Converting and analyzing: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) elif job.status == 'completed' and (not job.transcript_text or not job.transcript_text.strip()): # Job marked as completed but transcript not yet available - keep refreshing auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"🔄 Finalizing transcript... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Retrieving results: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) else: # pending # Still pending, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"⏳ Queued for processing... ({processing_time} waiting)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Waiting: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) except Exception as e: print(f"❌ Error checking job status: {str(e)}") return ( f"Error checking status: {str(e)}", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # AI Summary Functions def get_available_transcripts(user): """Get list of available transcripts for AI summarization""" if not user: return gr.update(choices=[], value=[]) try: # Get completed transcripts completed_jobs = transcription_manager.get_user_history(user.user_id, limit=50) completed_transcripts = [ job for job in completed_jobs if job.status == 'completed' and job.transcript_text ] # Create choices list choices = [] for job in completed_transcripts[:20]: # Limit to recent 20 label = f"{job.original_filename} ({job.created_at[:16]})" choices.append((label, job.job_id)) return gr.update(choices=choices, value=[]) except Exception as e: print(f"❌ Error getting available transcripts: {str(e)}") return gr.update(choices=[], value=[]) def submit_ai_summary_enhanced(existing_transcripts, new_audio_video_file, document_image_files, ai_instructions, summary_format, output_language, focus_areas, include_timestamps, include_action_items, user): """Enhanced AI summary submission with immediate transcript processing""" if not user: return ( "❌ Please log in to generate AI summaries", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Determine content type and validate inputs has_existing_transcripts = existing_transcripts and len(existing_transcripts) > 0 has_new_audio_video = new_audio_video_file is not None has_document_images = document_image_files and len(document_image_files) > 0 if not has_existing_transcripts and not has_new_audio_video and not has_document_images: return ( "❌ Please provide content: select existing transcripts, upload audio/video file, or upload documents/images", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) if not ai_instructions.strip(): return ( "❌ Please provide AI instructions for the summary", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) try: # Handle new audio/video file - submit for transcription but don't create AI job yet transcription_job_id = None if has_new_audio_video: # Submit audio/video file to transcription service first try: # Read the uploaded file if isinstance(new_audio_video_file, str): file_path = new_audio_video_file else: file_path = str(new_audio_video_file) with open(file_path, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file_path) # Use default transcription settings optimized for AI Summary transcription_settings = { 'audio_format': 'wav', 'diarization_enabled': True, 'speakers': 5, # Allow more speakers for conferences 'profanity': 'masked', 'punctuation': 'automatic', 'timestamps': True, 'lexical': False } # Submit to transcription service with Thai as default language transcription_job_id = transcription_manager.submit_transcription( file_bytes, original_filename, user.user_id, "th-TH", # Default to Thai transcription_settings ) print(f"🎙️ [{user.username}] Audio/video submitted for transcription: {transcription_job_id[:8]}...") # Create a special job state that will trigger AI summary when transcription completes summary_job_state = { 'waiting_for_transcription': True, 'transcription_job_id': transcription_job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'waiting_for_transcription', 'ai_instructions': ai_instructions, 'summary_format': summary_format, 'output_language': output_language, 'focus_areas': focus_areas, 'include_timestamps': include_timestamps, 'include_action_items': include_action_items, 'existing_transcripts': existing_transcripts if existing_transcripts else [], 'document_image_files': document_image_files if document_image_files else [], 'user_id': user.user_id } # Get updated user stats stats_display = get_user_stats_display(user) return ( f"🎙️ Audio/video submitted for transcription\n⏳ AI Summary will start automatically when transcription completes\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcription Job: {transcription_job_id[:8]}... → Will auto-trigger AI Summary", summary_job_state, gr.update(visible=True, value="🔄 Waiting for transcription"), stats_display ) except Exception as e: print(f"❌ Error submitting audio/video for transcription: {str(e)}") return ( f"❌ Error processing audio/video file: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # For existing transcripts or documents only (no audio/video transcription needed) else: transcript_ids = existing_transcripts if existing_transcripts else [] document_files = document_image_files if document_image_files else [] # Determine content mode if has_existing_transcripts and not has_document_images: content_mode = "Existing Transcripts" elif has_document_images and not has_existing_transcripts: content_mode = "Text Documents" else: content_mode = "Mixed Content" # Prepare settings settings = { 'content_mode': content_mode, 'format': summary_format, 'output_language': output_language, 'focus_areas': focus_areas, 'include_timestamps': include_timestamps, 'include_action_items': include_action_items, 'language': "th-TH" } # Submit AI summary job immediately (no transcription needed) job_id = ai_summary_manager.submit_summary_job_enhanced( user_id=user.user_id, content_mode=content_mode, summary_type=summary_format, user_prompt=ai_instructions, existing_transcript_ids=transcript_ids, audio_video_files=[], document_files=document_files, settings=settings ) # Update job state summary_job_state = { 'current_summary_job_id': job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'pending' } # Get updated user stats stats_display = get_user_stats_display(user) # Create source description source_parts = [] if has_existing_transcripts: source_parts.append(f"{len(transcript_ids)} existing transcripts") if has_document_images: source_parts.append(f"{len(document_files)} document/image files") source_info = " + ".join(source_parts) return ( f"🤖 AI Summary started with {source_info}\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Job ID: {job_id}", summary_job_state, gr.update(visible=True, value="🔄 AI Auto-refresh active"), stats_display ) except Exception as e: print(f"❌ Error submitting enhanced AI summary: {str(e)}") return ( f"❌ Error: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) def check_ai_summary_status(summary_job_state, user): """Check status of AI summary job with auto-trigger logic for completed transcriptions""" if not user: return ( "❌ Please log in to check AI summary status", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) if not summary_job_state: return ( "No active AI summary job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) try: # Handle special case: waiting for transcription to complete if summary_job_state.get('waiting_for_transcription'): transcription_job_id = summary_job_state.get('transcription_job_id') if not transcription_job_id: return ( "❌ Error: Missing transcription job ID", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Check transcription status transcription_job = transcription_manager.get_job_status(transcription_job_id) if not transcription_job: return ( "❌ Transcription job not found", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) processing_time = format_processing_time(summary_job_state['start_time']) if transcription_job.status == 'pending': return ( f"⏳ Transcription queued... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcription: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Waiting for transcription"), get_user_stats_display(user) ) elif transcription_job.status == 'processing': transcription_time = format_processing_time(transcription_job.created_at) return ( f"🎙️ Transcribing... ({transcription_time} transcribing, {processing_time} total)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcribing: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Transcription in progress"), get_user_stats_display(user) ) elif transcription_job.status == 'failed': summary_job_state['auto_refresh_active'] = False return ( f"❌ Transcription failed - Cannot proceed\nError: {transcription_job.error_message or 'Unknown error'}", "", gr.update(visible=False), f"Failed: {transcription_job.original_filename}", gr.update(visible=False), get_user_stats_display(user) ) elif transcription_job.status == 'completed': if not transcription_job.transcript_text or not transcription_job.transcript_text.strip(): return ( f"🔄 Transcription completed, retrieving text... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Getting transcript: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Getting transcript"), get_user_stats_display(user) ) # TRANSCRIPTION IS COMPLETE! NOW TRIGGER AI SUMMARY IMMEDIATELY print(f"✅ Transcription completed, triggering AI summary immediately...") try: # Prepare transcript IDs including the newly completed one transcript_ids = summary_job_state.get('existing_transcripts', []) transcript_ids.append(transcription_job_id) # Prepare settings settings = { 'content_mode': "New Audio/Video Files", 'format': summary_job_state.get('summary_format', 'บทสรุปผู้บริหาร'), 'output_language': summary_job_state.get('output_language', 'Thai'), 'focus_areas': summary_job_state.get('focus_areas', ''), 'include_timestamps': summary_job_state.get('include_timestamps', True), 'include_action_items': summary_job_state.get('include_action_items', True), 'language': "th-TH" } # Submit AI summary job NOW with completed transcript job_id = ai_summary_manager.submit_summary_job_enhanced( user_id=summary_job_state['user_id'], content_mode="New Audio/Video Files", summary_type=summary_job_state.get('summary_format', 'บทสรุปผู้บริหาร'), user_prompt=summary_job_state.get('ai_instructions', ''), existing_transcript_ids=transcript_ids, audio_video_files=[], document_files=summary_job_state.get('document_image_files', []), settings=settings ) print(f"🤖 AI Summary job created immediately: {job_id}") # Update job state to track AI summary instead of transcription summary_job_state.update({ 'waiting_for_transcription': False, 'current_summary_job_id': job_id, 'transcription_completed_at': datetime.now().isoformat(), 'last_status': 'ai_started' }) return ( f"✅ Transcription done! 🤖 AI Summary started immediately\n📊 Using transcript: {len(transcription_job.transcript_text):,} characters\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Processing: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 AI Summary active"), get_user_stats_display(user) ) except Exception as e: print(f"❌ Error triggering AI summary: {str(e)}") return ( f"❌ Transcription completed but AI summary failed to start: {str(e)}", "", gr.update(visible=False), "AI Summary creation failed", gr.update(visible=False), get_user_stats_display(user) ) # Normal AI summary job monitoring if 'current_summary_job_id' not in summary_job_state: return ( "No active AI summary job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) job_id = summary_job_state['current_summary_job_id'] job = ai_summary_manager.get_summary_status(job_id) if not job or job.user_id != user.user_id: return ( "AI summary job not found or access denied", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Calculate processing time processing_time = format_processing_time(job.created_at, job.completed_at) # Enhanced status change logging last_status = summary_job_state.get('last_status', '') if job.status != last_status: print(f"🔄 [{user.username}] AI Summary status: {last_status} → {job.status}") summary_job_state['last_status'] = job.status # Get updated user stats stats_display = get_user_stats_display(user) # Handle completed status if job.status == 'completed' and job.summary_text and job.summary_text.strip(): # Job is complete, stop auto-refresh summary_job_state['auto_refresh_active'] = False # Create downloadable file try: summary_file = create_summary_file(job.summary_text, job_id) print(f"✅ [{user.username}] AI Summary ready: {len(job.summary_text)} characters") except Exception as e: print(f"⚠️ [{user.username}] Error creating summary file: {str(e)}") summary_file = None total_time = format_processing_time(summary_job_state['start_time']) return ( f"✅ AI Summary completed! Total time: {total_time}\n📊 Generated: {len(job.summary_text):,} characters", job.summary_text, gr.update(visible=True, value=summary_file) if summary_file else gr.update(visible=False), f"Completed: {', '.join(job.original_files)}", gr.update(visible=False), stats_display ) elif job.status == 'failed': # Job failed, stop auto-refresh summary_job_state['auto_refresh_active'] = False error_msg = job.error_message[:100] + "..." if job.error_message else "Unknown error" total_time = format_processing_time(summary_job_state['start_time']) return ( f"❌ AI Summary failed after {total_time}", "", gr.update(visible=False), f"Error: {error_msg}", gr.update(visible=False), stats_display ) elif job.status == 'processing': # Still processing, continue auto-refresh return ( f"🤖 AI analyzing and generating summary... ({processing_time} AI processing)\n📊 Creating comprehensive analysis\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Processing: {', '.join(job.original_files[:2])}{'...' if len(job.original_files) > 2 else ''}", gr.update(visible=True, value="🔄 AI generating summary"), stats_display ) else: # pending # Still pending, continue auto-refresh return ( f"⏳ AI Summary queued... ({processing_time} waiting)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Queued: {', '.join(job.original_files[:2])}{'...' if len(job.original_files) > 2 else ''}", gr.update(visible=True, value="🔄 AI queued"), stats_display ) except Exception as e: print(f"❌ Error checking AI summary status: {str(e)}") return ( f"Error checking AI summary status: {str(e)}", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) def should_auto_refresh(job_state, user): """Check if auto-refresh should be active""" if not user or not job_state or not job_state.get('auto_refresh_active', False): return False if 'current_job_id' not in job_state: return False job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return False if job.status == 'failed': return False elif job.status == 'completed': if job.transcript_text and job.transcript_text.strip(): return False else: return True else: return True except Exception as e: print(f"❌ Error in should_auto_refresh: {str(e)}") return True def should_auto_refresh_summary(summary_job_state, user): """Check if AI summary auto-refresh should be active""" if not user or not summary_job_state or not summary_job_state.get('auto_refresh_active', False): return False if 'current_summary_job_id' not in summary_job_state: return False job_id = summary_job_state['current_summary_job_id'] try: job = ai_summary_manager.get_summary_status(job_id) if not job or job.user_id != user.user_id: return False if job.status in ['failed', 'completed']: return False else: return True except Exception as e: print(f"❌ Error in should_auto_refresh_summary: {str(e)}") return True def auto_refresh_status(job_state, user): """Auto-refresh function for transcription""" if not user: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) if should_auto_refresh(job_state, user): return check_current_job_status(job_state, user) else: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) def auto_refresh_ai_summary(summary_job_state, user): """Auto-refresh function for AI summary""" if not user: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) if should_auto_refresh_summary(summary_job_state, user): return check_ai_summary_status(summary_job_state, user) else: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) # History Functions def get_transcription_history_table(user, show_all=False): """Get transcription history table""" if not user: return [] try: limit = 100 if show_all else 20 transcript_jobs = transcription_manager.get_user_history(user.user_id, limit=limit) table_data = [] for job in transcript_jobs: try: created_time = datetime.fromisoformat(job.created_at) formatted_date = created_time.strftime("%Y-%m-%d %H:%M") except: formatted_date = job.created_at[:16] status_display = format_status(job.status) time_display = format_processing_time(job.created_at, job.completed_at) job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id language_display = ALLOWED_LANGS.get(job.language, job.language) if job.status == 'completed' and job.transcript_text: download_status = "Available" else: download_status = status_display table_data.append([ formatted_date, job.original_filename, language_display, status_display, time_display, job_id_display, download_status ]) return table_data except Exception as e: print(f"❌ Error loading transcription history: {str(e)}") return [] def get_ai_summary_history_table(user, show_all=False): """Get AI summary history table""" if not user: return [] try: limit = 100 if show_all else 20 summary_jobs = ai_summary_manager.get_user_summary_history(user.user_id, limit=limit) table_data = [] for job in summary_jobs: try: created_time = datetime.fromisoformat(job.created_at) formatted_date = created_time.strftime("%Y-%m-%d %H:%M") except: formatted_date = job.created_at[:16] status_display = format_status(job.status) time_display = format_processing_time(job.created_at, job.completed_at) job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id # Get source summary source_summary = f"{len(job.original_files)} sources" if len(job.original_files) <= 2: source_summary = ", ".join([f[:20] + "..." if len(f) > 20 else f for f in job.original_files]) if job.status == 'completed' and job.summary_text: download_status = "Available" else: download_status = status_display table_data.append([ formatted_date, source_summary, job.settings.get('output_language', 'Thai') if job.settings else 'Thai', status_display, time_display, job_id_display, download_status ]) return table_data except Exception as e: print(f"❌ Error loading AI summary history: {str(e)}") return [] def refresh_transcription_history(user, show_all=False): """Refresh transcription history and create download files""" if not user: return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) try: table_data = get_transcription_history_table(user, show_all) stats_display = get_user_stats_display(user) # Get completed transcription jobs for downloads completed_jobs = transcription_manager.get_user_history(user.user_id, limit=50) completed_transcripts = [ job for job in completed_jobs if job.status == 'completed' and job.transcript_text ] # Create download files download_updates = [] for i in range(5): if i < len(completed_transcripts): job = completed_transcripts[i] try: file_path = create_transcript_file(job.transcript_text, job.job_id) label = f"📄 {job.original_filename} ({job.created_at[:10]})" download_updates.append(gr.update(visible=True, value=file_path, label=label)) except Exception as e: print(f"Error creating transcript download: {e}") download_updates.append(gr.update(visible=False)) else: download_updates.append(gr.update(visible=False)) return [table_data, stats_display] + download_updates except Exception as e: print(f"❌ Error refreshing transcription history: {str(e)}") return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) def refresh_ai_summary_history(user, show_all=False): """Refresh AI summary history and create download files""" if not user: return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) try: table_data = get_ai_summary_history_table(user, show_all) stats_display = get_user_stats_display(user) # Get completed AI summary jobs for downloads completed_jobs = ai_summary_manager.get_user_summary_history(user.user_id, limit=50) completed_summaries = [ job for job in completed_jobs if job.status == 'completed' and job.summary_text ] # Create download files download_updates = [] for i in range(5): if i < len(completed_summaries): job = completed_summaries[i] try: file_path = create_summary_file(job.summary_text, job.job_id) source_name = job.original_files[0][:30] if job.original_files else "AI Summary" label = f"🤖 {source_name} ({job.created_at[:10]})" download_updates.append(gr.update(visible=True, value=file_path, label=label)) except Exception as e: print(f"Error creating summary download: {e}") download_updates.append(gr.update(visible=False)) else: download_updates.append(gr.update(visible=False)) return [table_data, stats_display] + download_updates except Exception as e: print(f"❌ Error refreshing AI summary history: {str(e)}") return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) # PDPA Compliance Functions (same as before, but updated for summaries too) def export_user_data(user): """Export comprehensive user data including summaries""" if not user: return "❌ Please log in to export your data", gr.update(visible=False) try: # Export transcript data transcript_export = transcription_manager.export_user_data(user.user_id) # Export AI summary data (if available) try: summary_export = { 'ai_summaries': [job.__dict__ for job in ai_summary_manager.get_user_summary_history(user.user_id, limit=1000)], 'summary_stats': transcription_manager.get_user_summary_stats(user.user_id) } except: summary_export = {'ai_summaries': [], 'summary_stats': {}} # Combine exports combined_export = { **transcript_export, **summary_export, 'export_type': 'comprehensive_azure_ai_service', 'services': ['transcription', 'ai_summarization'] } # Create export file os.makedirs("temp", exist_ok=True) filename = f"temp/user_data_export_{user.user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(combined_export, f, indent=2, ensure_ascii=False, default=str) print(f"📦 [{user.username}] Comprehensive data export created") return "✅ Your complete data (transcripts + AI summaries) has been exported successfully", gr.update(visible=True, value=filename, label="Download Your Complete Data Export") except Exception as e: print(f"❌ Error exporting comprehensive user data: {str(e)}") return f"❌ Export failed: {str(e)}", gr.update(visible=False) def update_marketing_consent(user, marketing_consent): """Update user's marketing consent""" if not user: return "❌ Please log in to update consent" try: success = transcription_manager.update_user_consent(user.user_id, marketing_consent) if success: user.marketing_consent = marketing_consent print(f"📧 [{user.username}] Marketing consent updated: {marketing_consent}") return f"✅ Marketing consent updated successfully" else: return "❌ Failed to update consent" except Exception as e: return f"❌ Error: {str(e)}" def delete_user_account(user, confirmation_text): """Delete user account and all data (transcripts + summaries)""" if not user: return "❌ Please log in to delete account", None, gr.update(visible=True), gr.update(visible=False) if confirmation_text != "DELETE MY ACCOUNT": return "❌ Please type 'DELETE MY ACCOUNT' to confirm", user, gr.update(visible=False), gr.update(visible=True) try: # Delete transcript data success = transcription_manager.delete_user_account(user.user_id) # Delete AI summary data (if backend supports it) try: transcription_manager.delete_user_summary_data(user.user_id) except Exception as e: print(f"⚠️ Warning: Could not delete AI summary data: {e}") if success: print(f"🗑️ [{user.username}] Complete account deleted (transcripts + AI summaries)") return "✅ Your account and all data (transcripts + AI summaries) have been permanently deleted", None, gr.update(visible=True), gr.update(visible=False) else: return "❌ Failed to delete account", user, gr.update(visible=False), gr.update(visible=True) except Exception as e: return f"❌ Error: {str(e)}", user, gr.update(visible=False), gr.update(visible=True) def on_user_login(user): """Update UI components when user logs in""" if user: return gr.update(value=user.marketing_consent) else: return gr.update(value=False) def create_transcript_file(transcript_text, job_id): """Create a downloadable transcript file""" os.makedirs("temp", exist_ok=True) filename = f"temp/transcript_{job_id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(transcript_text) return filename def create_summary_file(summary_text, job_id): """Create a downloadable AI summary file""" os.makedirs("temp", exist_ok=True) filename = f"temp/ai_summary_{job_id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(summary_text) return filename # Enhanced CSS with AI Summary styling enhanced_css = """ /* Main container styling */ .gradio-container { background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); font-family: 'Segoe UI', system-ui, sans-serif; color: #212529; } /* Enhanced header styling */ .main-header { background: linear-gradient(135deg, #007bff, #0056b3); color: white; border: none; border-radius: 12px; padding: 30px; text-align: center; margin-bottom: 20px; box-shadow: 0 4px 12px rgba(0,123,255,0.3); } .main-header h1 { color: white; margin-bottom: 10px; font-size: 2.5em; font-weight: 700; text-shadow: 0 2px 4px rgba(0,0,0,0.3); } .main-header p { color: rgba(255,255,255,0.9); font-size: 1.2em; margin: 0; } /* Card styling with enhanced AI theme */ .gr-box { background: white; border: 1px solid #dee2e6; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.08); padding: 25px; margin: 10px 0; transition: all 0.3s ease; } .gr-box:hover { box-shadow: 0 6px 16px rgba(0,0,0,0.12); transform: translateY(-2px); } /* AI-specific card styling */ .ai-summary-card { background: linear-gradient(135deg, #f8f9ff, #e8f2ff); border: 2px solid #007bff; border-radius: 12px; padding: 25px; margin: 15px 0; } /* Button styling with AI enhancements */ .gr-button { background: linear-gradient(135deg, #007bff, #0056b3); border: none; border-radius: 8px; color: white; font-weight: 600; padding: 14px 28px; transition: all 0.3s ease; box-shadow: 0 3px 6px rgba(0,123,255,0.3); text-transform: uppercase; letter-spacing: 0.5px; } .gr-button:hover { background: linear-gradient(135deg, #0056b3, #004085); transform: translateY(-2px); box-shadow: 0 5px 10px rgba(0,123,255,0.4); } /* AI Summary specific buttons */ .ai-button { background: linear-gradient(135deg, #28a745, #1e7e34); } .ai-button:hover { background: linear-gradient(135deg, #1e7e34, #155724); } /* Status displays with enhanced styling */ .status-display { background: linear-gradient(135deg, #e3f2fd, #bbdefb); border-left: 4px solid #2196f3; padding: 20px; border-radius: 0 12px 12px 0; margin: 15px 0; font-family: 'Monaco', 'Consolas', monospace; font-size: 14px; line-height: 1.6; } .ai-status-display { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border-left: 4px solid #28a745; } /* Auto-refresh indicators */ .auto-refresh-indicator { background: linear-gradient(135deg, #fff3cd, #ffeaa7); border: 2px solid #ffc107; border-radius: 8px; padding: 10px 16px; font-size: 12px; color: #856404; text-align: center; animation: pulse 2s infinite; font-weight: 600; } @keyframes pulse { 0%, 100% { opacity: 1; box-shadow: 0 0 0 0 rgba(255, 193, 7, 0.4); } 50% { opacity: 0.8; box-shadow: 0 0 0 10px rgba(255, 193, 7, 0); } } /* User stats with enhanced design */ .user-stats { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border: 2px solid #28a745; border-radius: 8px; padding: 12px 16px; font-size: 13px; color: #155724; text-align: center; font-weight: 600; box-shadow: 0 2px 8px rgba(40,167,69,0.2); } /* Enhanced input styling */ .gr-textbox, .gr-dropdown, .gr-file { border: 2px solid #e9ecef; border-radius: 10px; background: white; color: #212529; transition: all 0.3s ease; padding: 12px; font-size: 14px; } .gr-textbox:focus, .gr-dropdown:focus { border-color: #007bff; box-shadow: 0 0 0 4px rgba(0,123,255,0.1); background: #f8f9ff; } /* Tab styling with AI theme */ .tab-nav { background: white; border-bottom: 3px solid #007bff; border-radius: 12px 12px 0 0; box-shadow: 0 2px 8px rgba(0,0,0,0.1); } .tab-nav .tab-button { padding: 15px 25px; font-weight: 600; transition: all 0.3s ease; } .tab-nav .tab-button.selected { background: linear-gradient(135deg, #007bff, #0056b3); color: white; } /* History table with enhanced design and black text */ .history-table { background: white; border: 2px solid #dee2e6; border-radius: 12px; font-size: 14px; overflow: hidden; color: #000000; /* Force black text */ } .history-table thead th { background: linear-gradient(135deg, #343a40, #495057); color: white; font-weight: 700; padding: 16px 12px; text-align: center; border: none; } .history-table tbody tr { transition: all 0.2s ease; border-bottom: 1px solid #dee2e6; color: #000000; /* Force black text for rows */ } .history-table tbody tr:hover { background: linear-gradient(135deg, #f8f9ff, #e8f2ff); transform: scale(1.01); color: #000000; /* Ensure text stays black on hover */ } .history-table tbody td { padding: 12px; vertical-align: middle; text-align: center; border-right: 1px solid #dee2e6; color: #000000 !important; /* Force black text with !important */ font-weight: 500; } .history-table tbody td:last-child { border-right: none; } /* AI Summary specific elements */ .ai-content-sources { background: linear-gradient(135deg, #fff8e1, #ffecb3); border: 2px solid #ffa000; border-radius: 12px; padding: 20px; margin: 15px 0; } .ai-instructions { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border: 2px solid #4caf50; border-radius: 12px; padding: 20px; margin: 15px 0; } .ai-results { background: linear-gradient(135deg, #f3e5f5, #e1bee7); border: 2px solid #9c27b0; border-radius: 12px; padding: 20px; margin: 15px 0; } /* Enhanced file upload areas */ .file-upload-area { border: 3px dashed #007bff; border-radius: 12px; padding: 30px; text-align: center; background: linear-gradient(135deg, #f8f9ff, #e8f2ff); transition: all 0.3s ease; } .file-upload-area:hover { border-color: #0056b3; background: linear-gradient(135deg, #e8f2ff, #d3e8ff); } /* Progress indicators */ .progress-indicator { background: linear-gradient(90deg, #007bff, #28a745, #ffc107); height: 4px; border-radius: 2px; animation: progress 2s linear infinite; } @keyframes progress { 0% { width: 0%; } 50% { width: 70%; } 100% { width: 100%; } } /* Enhanced tooltips and help text */ .help-text { color: #6c757d; font-style: italic; font-size: 12px; margin-top: 5px; } /* Responsive design improvements */ @media (max-width: 768px) { .main-header h1 { font-size: 2em; } .gr-button { padding: 10px 20px; font-size: 14px; } .gr-box { padding: 15px; margin: 5px 0; } } """ # Create the main interface with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="green", neutral_hue="gray", font=["system-ui", "sans-serif"] ), css=enhanced_css, title="🎙️🤖 Azure-Powered AI Conference Service - Advanced Transcription & Intelligent Summarization" ) as demo: # Global state current_user = gr.State(None) job_state = gr.State({}) summary_job_state = gr.State({}) # Header with gr.Row(): gr.HTML("""
Advanced AI-powered conference analysis with transcription, computer vision, and intelligent summarization using Azure AI Foundry
Documents: PDF, DOCX, DOC, PPTX, PPT, XLSX, XLS, TXT, CSV, JSON
Images: JPG, JPEG, PNG, BMP, GIF, TIFF, WebP
OCR Process: Extract text from documents and images for AI analysis
📜 Existing Transcripts: Select from your completed transcriptions
🎥 New Audio/Video: Upload files for transcription + AI analysis
🖼️ OCR Images: Extract text from images using computer vision
🇹🇭 Thai Default: Optimized for Thai language processing
📚 Separate History: Independent tracking for each service
🔄 Auto-Refresh: Real-time status updates for all processes
🔹 Video: MP4, MOV, AVI, MKV, WMV, FLV, 3GP, WebM
🎵 Audio: WAV, MP3, OGG, OPUS, FLAC, WMA, AAC, M4A
📄 Documents: PDF, DOCX, DOC, PPTX, PPT, XLSX, XLS, CSV, TXT
🖼️ Images: JPG, JPEG, PNG, BMP, GIF, TIFF, WebP (with OCR)
🚀 Powered by Azure AI Foundry | 🔒 Enterprise-Grade Security | 🌐 GDPR Compliant | 🇹🇭 Thai Language Optimized